parolus

Форк
0
/
db_conn.py 
201 строка · 6.2 Кб
1
import motor.motor_asyncio
2
from bson.objectid import ObjectId
3
from passlib.context import CryptContext
4
from cryptography.fernet import Fernet
5

6

7
MONGO_DETAILS = "mongodb://mongo-db:27017"
8

9
client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_DETAILS)
10
database = client.parolus
11

12
paroluses_collection = database.get_collection("paroluses_collection")
13
users_collection = database.get_collection("users_collection")
14
app_collection = database.get_collection("app_collection")
15

16
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
17

18

19
def key_helper(user) -> dict:
20
    return {
21
        "id": str(user["_id"]),
22
        "key": user["key"],
23
        "recievingShares": user["recievingShares"]
24
    }
25

26

27
def user_helper(user) -> dict:
28
    return {
29
        "id": str(user["_id"]),
30
        "username": user["username"],
31
        "password": user["password"],
32
        "recievingShares": user["recievingShares"]
33
    }
34

35

36
def recieving_helper(user) -> dict:
37
    return {
38
        "id": str(user["_id"]),
39
        "username": user["username"],
40
        "recievingShares": user["recievingShares"]
41
    }
42

43

44
def parolus_helper(parolus) -> dict:
45
    return {
46
        "id": str(parolus["_id"]),
47
        "title": parolus["title"],
48
        "login": parolus["login"],
49
        "password": parolus["password"],
50
        "user_id": parolus["user_id"],
51
        "shared": parolus["shared"]
52
    }
53

54

55
def config_helper(config) -> dict:
56
    return {
57
        "enabled_registration": config["enabled_registration"]
58
    }
59

60

61
async def add_user(user_data: dict) -> dict:
62
    existing_user = await users_collection.find_one({"username": user_data['username']})
63
    if existing_user:
64
        raise ValueError("Username already exists")
65
    hashed_password = pwd_context.hash(user_data['password'])
66
    user_data['password'] = hashed_password
67
    user = await users_collection.insert_one(user_data)
68
    new_user = await users_collection.find_one({"_id": user.inserted_id})
69
    return user_helper(new_user)
70

71

72
async def update_user_password(user_id: str, new_password: str):
73
    hashed_password = pwd_context.hash(new_password)
74
    await users_collection.update_one(
75
        {"_id": ObjectId(user_id)},
76
        {"$set": {"password": hashed_password}}
77
    )
78
    return True
79

80

81
async def update_user_status(user_id: str, new_status: bool):
82
    await users_collection.update_one(
83
        {"_id": ObjectId(user_id)},
84
        {"$set": {"recievingShares": new_status}}
85
    )
86
    return True
87

88

89
async def retrieve_users():
90
    users = []
91
    async for user in users_collection.find():
92
        users.append(user_helper(user))
93
    return users
94

95

96
async def retrieve_user(id: str) -> dict:
97
    user = await users_collection.find_one({"_id": ObjectId(id)})
98
    if user:
99
        return user_helper(user)
100

101

102
async def retrieve_user_full(id: str) -> dict:
103
    user = await users_collection.find_one({"_id": ObjectId(id)})
104
    if user:
105
        return key_helper(user)
106

107

108
async def get_user(username: str):
109
    return await users_collection.find_one({"username": username})
110

111

112
async def retrieve_parolus(id: str) -> dict:
113
    parolus = await paroluses_collection.find_one({"_id": ObjectId(id)})
114
    if parolus:
115
        return parolus_helper(parolus)
116

117

118
async def add_parolus(parolus_data: dict, hash_key: str) -> dict:
119
    cipher_suite = Fernet(hash_key)
120
    encrypted_title = cipher_suite.encrypt(parolus_data['title'].encode())
121
    encrypted_login = cipher_suite.encrypt(parolus_data['login'].encode())
122
    encrypted_password = cipher_suite.encrypt(parolus_data['password'].encode())
123
    parolus_data['title'] = encrypted_title.decode()
124
    parolus_data['login'] = encrypted_login.decode()
125
    parolus_data['password'] = encrypted_password.decode()
126

127
    parolus = await paroluses_collection.insert_one(parolus_data)
128
    new_parolus = await paroluses_collection.find_one({"_id": parolus.inserted_id})
129
    return parolus_helper(new_parolus)
130

131

132
async def retrieve_paroluses(user_id: str, hash_key: str):
133
    paroluses = []
134
    async for parolus in paroluses_collection.find({"user_id": user_id}):
135
        cipher_suite = Fernet(hash_key)
136
        decrypted_title = cipher_suite.decrypt(parolus['title'].encode()).decode()
137
        decrypted_login = cipher_suite.decrypt(parolus['login'].encode()).decode()
138
        decrypted_password = cipher_suite.decrypt(parolus['password'].encode()).decode()
139
        parolus['title'] = decrypted_title
140
        parolus['login'] = decrypted_login
141
        parolus['password'] = decrypted_password
142
        paroluses.append(parolus_helper(parolus))
143
    return paroluses
144

145

146
async def update_parolus(id: str, data: dict, hash_key: str):
147
    if len(data) < 1:
148
        return False
149

150
    cipher_suite = Fernet(hash_key)
151
    encrypted_title = cipher_suite.encrypt(data['title'].encode())
152
    encrypted_login = cipher_suite.encrypt(data['login'].encode())
153
    encrypted_password = cipher_suite.encrypt(data['password'].encode())
154
    data['title'] = encrypted_title.decode()
155
    data['login'] = encrypted_login.decode()
156
    data['password'] = encrypted_password.decode()
157

158
    parolus = await paroluses_collection.find_one({"_id": ObjectId(id)})
159
    if parolus:
160
        updated_parolus = await paroluses_collection.update_one(
161
            {"_id": ObjectId(id)}, {"$set": data}
162
        )
163
        if updated_parolus:
164
            return True
165
        return False
166

167

168
async def delete_parolus(id: str):
169
    parolus = await paroluses_collection.find_one({"_id": ObjectId(id)})
170
    if parolus:
171
        await paroluses_collection.delete_one({"_id": ObjectId(id)})
172
        return True
173

174

175
async def delete_user(user_id: str):
176
    user = await users_collection.find_one({"_id": ObjectId(user_id)})
177
    if user:
178
        await users_collection.delete_one({"_id": ObjectId(user_id)})
179
        return True
180
    return False
181

182

183
async def delete_all_paroluses(user_id: str):
184
    await paroluses_collection.delete_many({"user_id": user_id})
185

186

187
async def check_app_config() -> bool:
188
    config = await app_collection.find_one({'primary': True})
189
    if config:
190
        return config_helper(config)
191
    else:
192
        await app_collection.insert_one({'primary': True, 'enabled_registration': False})
193
        return True
194

195

196
async def change_app_config(new_config) -> bool:
197
    config = await app_collection.update_one({'primary': True}, {"$set": {"enabled_registration": new_config}})
198
    if config:
199
        return True
200
    else:
201
        return False
202

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.