1
import motor.motor_asyncio
2
from bson.objectid import ObjectId
3
from passlib.context import CryptContext
4
from cryptography.fernet import Fernet
7
MONGO_DETAILS = "mongodb://mongo-db:27017"
9
client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_DETAILS)
10
database = client.parolus
12
paroluses_collection = database.get_collection("paroluses_collection")
13
users_collection = database.get_collection("users_collection")
14
app_collection = database.get_collection("app_collection")
16
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
19
def key_helper(user) -> dict:
21
"id": str(user["_id"]),
23
"recievingShares": user["recievingShares"]
27
def user_helper(user) -> dict:
29
"id": str(user["_id"]),
30
"username": user["username"],
31
"password": user["password"],
32
"recievingShares": user["recievingShares"]
36
def recieving_helper(user) -> dict:
38
"id": str(user["_id"]),
39
"username": user["username"],
40
"recievingShares": user["recievingShares"]
44
def parolus_helper(parolus) -> dict:
46
"id": str(parolus["_id"]),
47
"title": parolus["title"],
48
"login": parolus["login"],
49
"password": parolus["password"],
50
"user_id": parolus["user_id"],
51
"shared": parolus["shared"]
55
def config_helper(config) -> dict:
57
"enabled_registration": config["enabled_registration"]
61
async def add_user(user_data: dict) -> dict:
62
existing_user = await users_collection.find_one({"username": user_data['username']})
64
raise ValueError("Username already exists")
65
hashed_password = pwd_context.hash(user_data['password'])
66
user_data['password'] = hashed_password
67
user = await users_collection.insert_one(user_data)
68
new_user = await users_collection.find_one({"_id": user.inserted_id})
69
return user_helper(new_user)
72
async def update_user_password(user_id: str, new_password: str):
73
hashed_password = pwd_context.hash(new_password)
74
await users_collection.update_one(
75
{"_id": ObjectId(user_id)},
76
{"$set": {"password": hashed_password}}
81
async def update_user_status(user_id: str, new_status: bool):
82
await users_collection.update_one(
83
{"_id": ObjectId(user_id)},
84
{"$set": {"recievingShares": new_status}}
89
async def retrieve_users():
91
async for user in users_collection.find():
92
users.append(user_helper(user))
96
async def retrieve_user(id: str) -> dict:
97
user = await users_collection.find_one({"_id": ObjectId(id)})
99
return user_helper(user)
102
async def retrieve_user_full(id: str) -> dict:
103
user = await users_collection.find_one({"_id": ObjectId(id)})
105
return key_helper(user)
108
async def get_user(username: str):
109
return await users_collection.find_one({"username": username})
112
async def retrieve_parolus(id: str) -> dict:
113
parolus = await paroluses_collection.find_one({"_id": ObjectId(id)})
115
return parolus_helper(parolus)
118
async def add_parolus(parolus_data: dict, hash_key: str) -> dict:
119
cipher_suite = Fernet(hash_key)
120
encrypted_title = cipher_suite.encrypt(parolus_data['title'].encode())
121
encrypted_login = cipher_suite.encrypt(parolus_data['login'].encode())
122
encrypted_password = cipher_suite.encrypt(parolus_data['password'].encode())
123
parolus_data['title'] = encrypted_title.decode()
124
parolus_data['login'] = encrypted_login.decode()
125
parolus_data['password'] = encrypted_password.decode()
127
parolus = await paroluses_collection.insert_one(parolus_data)
128
new_parolus = await paroluses_collection.find_one({"_id": parolus.inserted_id})
129
return parolus_helper(new_parolus)
132
async def retrieve_paroluses(user_id: str, hash_key: str):
134
async for parolus in paroluses_collection.find({"user_id": user_id}):
135
cipher_suite = Fernet(hash_key)
136
decrypted_title = cipher_suite.decrypt(parolus['title'].encode()).decode()
137
decrypted_login = cipher_suite.decrypt(parolus['login'].encode()).decode()
138
decrypted_password = cipher_suite.decrypt(parolus['password'].encode()).decode()
139
parolus['title'] = decrypted_title
140
parolus['login'] = decrypted_login
141
parolus['password'] = decrypted_password
142
paroluses.append(parolus_helper(parolus))
146
async def update_parolus(id: str, data: dict, hash_key: str):
150
cipher_suite = Fernet(hash_key)
151
encrypted_title = cipher_suite.encrypt(data['title'].encode())
152
encrypted_login = cipher_suite.encrypt(data['login'].encode())
153
encrypted_password = cipher_suite.encrypt(data['password'].encode())
154
data['title'] = encrypted_title.decode()
155
data['login'] = encrypted_login.decode()
156
data['password'] = encrypted_password.decode()
158
parolus = await paroluses_collection.find_one({"_id": ObjectId(id)})
160
updated_parolus = await paroluses_collection.update_one(
161
{"_id": ObjectId(id)}, {"$set": data}
168
async def delete_parolus(id: str):
169
parolus = await paroluses_collection.find_one({"_id": ObjectId(id)})
171
await paroluses_collection.delete_one({"_id": ObjectId(id)})
175
async def delete_user(user_id: str):
176
user = await users_collection.find_one({"_id": ObjectId(user_id)})
178
await users_collection.delete_one({"_id": ObjectId(user_id)})
183
async def delete_all_paroluses(user_id: str):
184
await paroluses_collection.delete_many({"user_id": user_id})
187
async def check_app_config() -> bool:
188
config = await app_collection.find_one({'primary': True})
190
return config_helper(config)
192
await app_collection.insert_one({'primary': True, 'enabled_registration': False})
196
async def change_app_config(new_config) -> bool:
197
config = await app_collection.update_one({'primary': True}, {"$set": {"enabled_registration": new_config}})