1
from fastapi import FastAPI, HTTPException, Body, Depends
2
from fastapi.encoders import jsonable_encoder
3
from passlib.context import CryptContext
4
from fastapi.security import OAuth2PasswordBearer
5
from starlette.middleware import Middleware
6
from starlette.middleware.cors import CORSMiddleware
7
from cryptography.fernet import Fernet
30
from db_models import (
38
PasswordChangeRequestModel,
39
PasswordNewRequestModel,
40
StatusChangeRequestModel,
41
ConfigChangeRequestModel,
45
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
47
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
49
SECRET_KEY = "f7383b6c-63a0-4aa8-9072-8772826ea9c8"
51
ACCESS_TOKEN_EXPIRE_MINUTES = 60
58
Middleware(CORSMiddleware, allow_origins=origins)
62
middleware=middleware,
63
title="Пар*лус - FastAPI",
65
openapi_url="/api/openapi.json",
66
summary="https://gitverse.ru/iamintfriend/parolus",
69
"name": "Сreator: iamintfriend",
70
"url": "https://gitverse.ru/iamintfriend/parolus",
71
"email": "vanuko1679@yandex.ru",
75
# ---------------------------------------------------------------------------------------------->>>>>>>>>
76
# ---------------------------------------------------------------------------------------------->>>>>>>>>
77
# ---------------------------------------------------------------------------------------------->>>>>>>>>
78
def verify_password(plain_password, hashed_password) -> bool:
79
"""Сравнивает хэш пароля"""
80
return pwd_context.verify(plain_password, hashed_password)
83
def create_access_token(data: dict, expires_delta: datetime.timedelta):
84
"""Создает JWT-токен для аутентификации"""
85
to_encode = data.copy()
86
expire = datetime.datetime.utcnow() + expires_delta
87
to_encode.update({"exp": expire})
88
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
92
def generate_fernet_key() -> str:
93
"""Генерирует уникальный ключ шифрования паролей для пользователя"""
94
return Fernet.generate_key().decode()
97
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
98
"""Проверяет пользователя для допуска к эндпоинтам"""
99
credentials_exception = HTTPException(
101
detail="Could not validate credentials",
102
headers={"WWW-Authenticate": "Bearer"},
105
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
106
id: str = payload.get("sub")
108
raise credentials_exception
109
token_data = TokenData(id=id)
110
except jwt.PyJWTError:
111
raise credentials_exception
112
user = await retrieve_user(token_data.id)
114
raise credentials_exception
118
# ---------------------------------------------------------------------------------------------->>>>>>>>>
119
# ---------------------------------------------------------------------------------------------->>>>>>>>>
120
# ---------------------------------------------------------------------------------------------->>>>>>>>>
121
@app.get("/api/", tags=["Root"])
122
async def read_root():
123
"""Корневая страница - пустая :( Можете добавить сюда что угодно!"""
125
return {"message": "Welcome to this fantastic app!"}
128
@app.post("/api/login", response_description="Login successful")
129
async def login(username: str = Body(...), password: str = Body(...)):
130
"""Основной эндпоинт для аутентификации"""
132
user = await get_user(username)
134
if not user or not verify_password(password, user["password"]):
137
detail="Incorrect username or password"
139
access_token_expires = datetime.timedelta(
140
minutes=ACCESS_TOKEN_EXPIRE_MINUTES
142
access_token = create_access_token(
143
data={"sub": str(user["_id"]), "sts": bool(user["recievingShares"])},
144
expires_delta=access_token_expires
147
return {"access_token": access_token, "token_type": "bearer"}
150
# ---------------------------------------------------------------------------------------------->>>>>>>>>
151
# ---------------------------------------------------------------------------------------------->>>>>>>>>
152
# ---------------------------------------------------------------------------------------------->>>>>>>>>
153
@app.post("/api/users/add", response_description="User data added into the database")
154
async def add_user_data(user: UserSchema = Body(...)):
155
"""Добавить нового пользователя"""
157
user = jsonable_encoder(user)
158
user['key'] = generate_fernet_key()
159
user['recievingShares'] = True
160
new_user = await add_user(user)
162
return ResponseModel(new_user, "User added successfully.")
165
@app.get("/api/users/list", response_description="Users retrieved")
166
async def get_users():
167
"""Получить список всех пользователей"""
169
users = await retrieve_users()
172
return ResponseModel(users, "Users data retrieved successfully")
173
return ResponseModel(users, "Empty list returned")
176
@app.put("/api/users/change-password", response_description="Password changed successfully")
177
async def change_user_password(
178
password_change_request: PasswordChangeRequestModel,
179
current_user: TokenData = Depends(get_current_user)
181
"""Сменить пароль (только уже аутентифицированному пользователю)"""
183
user_id = str(current_user['id'])
184
user = await retrieve_user(user_id)
187
updated_user = await update_user_password(user_id, password_change_request.new_password)
189
return ResponseModel(
190
"Password changed successfully",
191
"Password updated for user with ID: {}".format(user_id)
193
raise HTTPException(status_code=500, detail="Failed to update password")
194
raise HTTPException(status_code=404, detail="User not found")
197
@app.put("/api/users/new-password", response_description="Password updated successfully")
198
async def new_user_password(
199
password_change_request: PasswordNewRequestModel,
201
"""Сменить пароль любому пользователю (для администраторов)"""
203
updating_user_id = password_change_request.updating_user_id
204
user = await retrieve_user_full(updating_user_id)
207
updated_user = await update_user_password(updating_user_id, password_change_request.new_password)
209
return ResponseModel(
210
"New password set successfully",
211
"New password set for user with ID: {}".format(updating_user_id)
213
raise HTTPException(status_code=500, detail="Failed to set new password")
214
raise HTTPException(status_code=404, detail="User not found")
217
@app.put("/api/users/change-status", response_description="Status changed successfully")
218
async def change_user_status(
219
status_change_request: StatusChangeRequestModel,
220
current_user: TokenData = Depends(get_current_user)
222
"""Сменить статус recievingShares (для получения паролусов)"""
224
user_id = str(current_user['id'])
225
user = await retrieve_user(user_id)
228
updated_user = await update_user_status(user_id, status_change_request.new_status)
230
return ResponseModel(
231
"Status changed successfully",
232
"Status updated for user with ID: {}".format(user_id)
234
raise HTTPException(status_code=500, detail="Failed to update status")
235
raise HTTPException(status_code=404, detail="User not found")
238
@app.delete("/api/users/delete-self", response_description="User account deleted along with associated paroluses")
239
async def delete_self_user_account(
240
current_user: TokenData = Depends(get_current_user)
242
"""Удаление пользователя (только для уже аутентифицированного пользователя)"""
244
deleted_user = await delete_user(str(current_user['id']))
247
await delete_all_paroluses(str(current_user['id']))
249
return ResponseModel(
250
"User account with ID: {} and associated paroluses deleted".format(current_user['id']),
251
"User account deleted successfully"
253
raise HTTPException(status_code=404, detail="User account doesn't exist")
256
@app.post("/api/users/delete-one", response_description="User account deleted along with associated paroluses")
257
async def delete_one_user(deleting_uid: uidSchema = Body(...)):
258
"""Удаление пользователя (для панели администратора)"""
260
deleting_uid = deleting_uid.dict()
261
deleted_user = await delete_user(str(deleting_uid['uid']))
264
await delete_all_paroluses(str(deleting_uid['uid']))
266
return ResponseModel(
267
"User account with ID: {} and associated paroluses deleted".format(str(deleting_uid['uid'])),
268
"User account deleted successfully"
270
raise HTTPException(status_code=404, detail="User account doesn't exist")
273
@app.post("/api/paroluses/add", response_description="Parolus added successfully")
274
async def add_parolus_data(
275
parolus: ParolusSchema = Body(...),
276
current_user: TokenData = Depends(get_current_user)
278
"""Создать новый паролус"""
280
parolus_dict = parolus.dict()
281
parolus_dict["user_id"] = str(current_user['id'])
282
parolus_dict["shared"] = bool(False)
283
user = await retrieve_user_full(str(current_user['id']))
286
hash_key = user['key']
287
new_parolus = await add_parolus(parolus_dict, hash_key)
289
return ResponseModel(new_parolus, "Parolus added successfully.")
292
@app.get("/api/paroluses/list", response_description="Paroluses retrieved")
293
async def get_paroluses(current_user: TokenData = Depends(get_current_user)):
294
"""Получить список всех паролусов (только для уже аутентифицированного пользователя)"""
296
user = await retrieve_user_full(str(current_user['id']))
299
hash_key = user['key']
300
paroluses = await retrieve_paroluses(current_user['id'], hash_key)
302
return ResponseModel(paroluses, "Paroluses data retrieved successfully")
304
return ResponseModel(paroluses, "Empty list returned")
307
@app.get("/api/paroluses/{id}", response_description="Parolus data retrieved")
308
async def get_parolus_data(id):
309
"""Получить паролус по ID"""
311
parolus = await retrieve_parolus(id)
314
return ResponseModel(parolus, "Parolus data retrieved successfully")
316
return ErrorResponseModel("An error occurred.", 404, "Parolus doesn't exist.")
319
@app.put("/api/paroluses/{id}")
320
async def update_parolus_data(
322
req: UpdateParolusModel = Body(...),
323
current_user: TokenData = Depends(get_current_user)
325
"""Внести изменения в паролус"""
327
parolus_dict = req.dict()
328
parolus_dict["user_id"] = str(current_user['id'])
330
req = {k: v for k, v in parolus_dict.items() if v is not None}
332
user = await retrieve_user_full(str(current_user['id']))
334
hash_key = user['key']
336
updated_parolus = await update_parolus(id, req, hash_key)
339
return ResponseModel(
340
"Parolus with ID: {} field update is successful".format(id),
341
"Parolus fields updated successfully",
343
return ErrorResponseModel(
346
"There was an error updating the parolus data.",
350
@app.delete("/api/paroluses/{id}", response_description="Parolus data deleted from the database")
351
async def delete_parolus_data(
353
current_user: TokenData = Depends(get_current_user)
355
"""Удалить паролус"""
357
deleted_parolus = await delete_parolus(id)
360
return ResponseModel(
361
"Parolus with ID: {} removed".format(id), "Parolus deleted successfully"
363
return ErrorResponseModel(
364
"An error occurred", 404, "Parolus with id {0} doesn't exist".format(id)
368
@app.post("/api/paroluses/share/{parolus_id}/{user_id}", response_description="Parolus shared successfully")
369
async def share_parolus(
372
current_user: TokenData = Depends(get_current_user)
374
"""Поделиться паролусом"""
376
parolus = await retrieve_parolus(parolus_id)
379
raise HTTPException(status_code=404, detail="Parolus not found")
381
if parolus["user_id"] != str(current_user['id']):
382
raise HTTPException(status_code=403, detail="Forbidden: You are not the owner of this parolus")
384
reciever = await retrieve_user_full(user_id)
385
reciever_key = reciever['key']
388
raise HTTPException(status_code=404, detail="Reciever user not found")
390
if not reciever["recievingShares"] == True:
391
raise HTTPException(status_code=403, detail="Forbidden: User has blocked recieving function.")
393
user = await retrieve_user_full(str(current_user['id']))
394
user_key = user['key']
397
raise HTTPException(status_code=500, detail="Failed to retrieve user key")
399
cipher_suite = Fernet(user_key)
400
decrypted_title = cipher_suite.decrypt(parolus['title'].encode()).decode()
401
decrypted_login = cipher_suite.decrypt(parolus['login'].encode()).decode()
402
decrypted_password = cipher_suite.decrypt(parolus['password'].encode()).decode()
405
"title": decrypted_title,
406
"login": decrypted_login,
407
"password": decrypted_password,
412
shared_parolus_doc = await add_parolus(shared_parolus, reciever_key)
414
return ResponseModel(shared_parolus_doc, "Parolus shared successfully.")
417
@app.get("/api/app-config", response_description="App config retrieved")
418
async def get_app_config():
419
"""Получить конфигурацию приложения"""
421
config = await check_app_config()
424
return ResponseModel(config, "App config retrieved successfully")
426
return ErrorResponseModel("An error occurred.", 404, "Parolus doesn't exist.")
429
@app.put("/api/app-config/change", response_description="Config changed successfully")
430
async def change_config(
431
config_change_request: ConfigChangeRequestModel,
433
"""Сменить статус enabled_registration (для панели администратора)"""
435
updated_config = await change_app_config(config_change_request.new_config)
437
return ResponseModel(updated_config, "New status updated")
439
raise HTTPException(status_code=500, detail="Failed to update app config")