parolus

Форк
0
/
main.py 
439 строк · 15.5 Кб
1
from fastapi import FastAPI, HTTPException, Body, Depends
2
from fastapi.encoders import jsonable_encoder
3
from passlib.context import CryptContext
4
from fastapi.security import OAuth2PasswordBearer
5
from starlette.middleware import Middleware
6
from starlette.middleware.cors import CORSMiddleware
7
from cryptography.fernet import Fernet
8

9
import datetime
10
import jwt
11

12
from db_conn import (
13
    add_user,
14
    update_user_password,
15
    retrieve_user,
16
    retrieve_user_full,
17
    get_user,
18
    delete_user,
19
    add_parolus,
20
    delete_parolus,
21
    delete_all_paroluses,
22
    retrieve_parolus,
23
    retrieve_paroluses,
24
    update_parolus,
25
    update_user_status,
26
    retrieve_users,
27
    check_app_config,
28
    change_app_config
29
)
30
from db_models import (
31
    UserSchema,
32
    TokenData,
33
    ErrorResponseModel,
34
    ResponseModel,
35
    ParolusSchema,
36
    uidSchema,
37
    UpdateParolusModel,
38
    PasswordChangeRequestModel,
39
    PasswordNewRequestModel,
40
    StatusChangeRequestModel,
41
    ConfigChangeRequestModel,
42
)
43

44

45
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
46

47
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
48

49
SECRET_KEY = "f7383b6c-63a0-4aa8-9072-8772826ea9c8"
50
ALGORITHM = "HS256"
51
ACCESS_TOKEN_EXPIRE_MINUTES = 60
52

53
origins = [
54
    "*",
55
]
56

57
middleware = [
58
    Middleware(CORSMiddleware, allow_origins=origins)
59
]
60

61
app = FastAPI(
62
    middleware=middleware,
63
    title="Пар*лус - FastAPI",
64
    docs_url="/api/docs",
65
    openapi_url="/api/openapi.json",
66
    summary="https://gitverse.ru/iamintfriend/parolus",
67
    version="1.0.0",
68
    contact={
69
        "name": "Сreator: iamintfriend",
70
        "url": "https://gitverse.ru/iamintfriend/parolus",
71
        "email": "vanuko1679@yandex.ru",
72
    }
73
)
74

75
# ---------------------------------------------------------------------------------------------->>>>>>>>>
76
# ---------------------------------------------------------------------------------------------->>>>>>>>>
77
# ---------------------------------------------------------------------------------------------->>>>>>>>>
78
def verify_password(plain_password, hashed_password) -> bool:
79
    """Сравнивает хэш пароля"""
80
    return pwd_context.verify(plain_password, hashed_password)
81

82

83
def create_access_token(data: dict, expires_delta: datetime.timedelta):
84
    """Создает JWT-токен для аутентификации"""
85
    to_encode = data.copy()
86
    expire = datetime.datetime.utcnow() + expires_delta
87
    to_encode.update({"exp": expire})
88
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
89
    return encoded_jwt
90

91

92
def generate_fernet_key() -> str:
93
    """Генерирует уникальный ключ шифрования паролей для пользователя"""
94
    return Fernet.generate_key().decode()
95

96

97
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
98
    """Проверяет пользователя для допуска к эндпоинтам"""
99
    credentials_exception = HTTPException(
100
        status_code=401,
101
        detail="Could not validate credentials",
102
        headers={"WWW-Authenticate": "Bearer"},
103
    )
104
    try:
105
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
106
        id: str = payload.get("sub")
107
        if id is None:
108
            raise credentials_exception
109
        token_data = TokenData(id=id)
110
    except jwt.PyJWTError:
111
        raise credentials_exception
112
    user = await retrieve_user(token_data.id)
113
    if user is None:
114
        raise credentials_exception
115
    return user
116

117

118
# ---------------------------------------------------------------------------------------------->>>>>>>>>
119
# ---------------------------------------------------------------------------------------------->>>>>>>>>
120
# ---------------------------------------------------------------------------------------------->>>>>>>>>
121
@app.get("/api/", tags=["Root"])
122
async def read_root():
123
    """Корневая страница - пустая :( Можете добавить сюда что угодно!"""
124

125
    return {"message": "Welcome to this fantastic app!"}
126

127

128
@app.post("/api/login", response_description="Login successful")
129
async def login(username: str = Body(...), password: str = Body(...)):
130
    """Основной эндпоинт для аутентификации"""
131

132
    user = await get_user(username)
133

134
    if not user or not verify_password(password, user["password"]):
135
        raise HTTPException(
136
            status_code=401,
137
            detail="Incorrect username or password"
138
        )
139
    access_token_expires = datetime.timedelta(
140
        minutes=ACCESS_TOKEN_EXPIRE_MINUTES
141
    )
142
    access_token = create_access_token(
143
        data={"sub": str(user["_id"]), "sts": bool(user["recievingShares"])},
144
        expires_delta=access_token_expires
145
    )
146

147
    return {"access_token": access_token, "token_type": "bearer"}
148

149

150
# ---------------------------------------------------------------------------------------------->>>>>>>>>
151
# ---------------------------------------------------------------------------------------------->>>>>>>>>
152
# ---------------------------------------------------------------------------------------------->>>>>>>>>
153
@app.post("/api/users/add", response_description="User data added into the database")
154
async def add_user_data(user: UserSchema = Body(...)):
155
    """Добавить нового пользователя"""
156

157
    user = jsonable_encoder(user)
158
    user['key'] = generate_fernet_key()
159
    user['recievingShares'] = True
160
    new_user = await add_user(user)
161

162
    return ResponseModel(new_user, "User added successfully.")
163

164

165
@app.get("/api/users/list", response_description="Users retrieved")
166
async def get_users():
167
    """Получить список всех пользователей"""
168

169
    users = await retrieve_users()
170

171
    if users:
172
        return ResponseModel(users, "Users data retrieved successfully")
173
    return ResponseModel(users, "Empty list returned")
174

175

176
@app.put("/api/users/change-password", response_description="Password changed successfully")
177
async def change_user_password(
178
    password_change_request: PasswordChangeRequestModel,
179
    current_user: TokenData = Depends(get_current_user)
180
    ):
181
    """Сменить пароль (только уже аутентифицированному пользователю)"""
182

183
    user_id = str(current_user['id'])
184
    user = await retrieve_user(user_id)
185

186
    if user:
187
        updated_user = await update_user_password(user_id, password_change_request.new_password)
188
        if updated_user:
189
            return ResponseModel(
190
                "Password changed successfully",
191
                "Password updated for user with ID: {}".format(user_id)
192
            )
193
        raise HTTPException(status_code=500, detail="Failed to update password")
194
    raise HTTPException(status_code=404, detail="User not found")
195

196

197
@app.put("/api/users/new-password", response_description="Password updated successfully")
198
async def new_user_password(
199
    password_change_request: PasswordNewRequestModel,
200
    ):
201
    """Сменить пароль любому пользователю (для администраторов)"""
202

203
    updating_user_id = password_change_request.updating_user_id
204
    user = await retrieve_user_full(updating_user_id)
205

206
    if user:
207
        updated_user = await update_user_password(updating_user_id, password_change_request.new_password)
208
        if updated_user:
209
            return ResponseModel(
210
                "New password set successfully",
211
                "New password set for user with ID: {}".format(updating_user_id)
212
            )
213
        raise HTTPException(status_code=500, detail="Failed to set new password")
214
    raise HTTPException(status_code=404, detail="User not found")
215

216

217
@app.put("/api/users/change-status", response_description="Status changed successfully")
218
async def change_user_status(
219
    status_change_request: StatusChangeRequestModel,
220
    current_user: TokenData = Depends(get_current_user)
221
    ):
222
    """Сменить статус recievingShares (для получения паролусов)"""
223

224
    user_id = str(current_user['id'])
225
    user = await retrieve_user(user_id)
226

227
    if user:
228
        updated_user = await update_user_status(user_id, status_change_request.new_status)
229
        if updated_user:
230
            return ResponseModel(
231
                "Status changed successfully",
232
                "Status updated for user with ID: {}".format(user_id)
233
            )
234
        raise HTTPException(status_code=500, detail="Failed to update status")
235
    raise HTTPException(status_code=404, detail="User not found")
236

237

238
@app.delete("/api/users/delete-self", response_description="User account deleted along with associated paroluses")
239
async def delete_self_user_account(
240
    current_user: TokenData = Depends(get_current_user)
241
    ):
242
    """Удаление пользователя (только для уже аутентифицированного пользователя)"""
243

244
    deleted_user = await delete_user(str(current_user['id']))
245

246
    if deleted_user:
247
        await delete_all_paroluses(str(current_user['id']))
248

249
        return ResponseModel(
250
            "User account with ID: {} and associated paroluses deleted".format(current_user['id']), 
251
            "User account deleted successfully"
252
        )
253
    raise HTTPException(status_code=404, detail="User account doesn't exist")
254

255

256
@app.post("/api/users/delete-one", response_description="User account deleted along with associated paroluses")
257
async def delete_one_user(deleting_uid: uidSchema = Body(...)):
258
    """Удаление пользователя (для панели администратора)"""
259

260
    deleting_uid = deleting_uid.dict()
261
    deleted_user = await delete_user(str(deleting_uid['uid']))
262

263
    if deleted_user:
264
        await delete_all_paroluses(str(deleting_uid['uid']))
265

266
        return ResponseModel(
267
            "User account with ID: {} and associated paroluses deleted".format(str(deleting_uid['uid'])), 
268
            "User account deleted successfully"
269
        )
270
    raise HTTPException(status_code=404, detail="User account doesn't exist")
271

272

273
@app.post("/api/paroluses/add", response_description="Parolus added successfully")
274
async def add_parolus_data(
275
    parolus: ParolusSchema = Body(...),
276
    current_user: TokenData = Depends(get_current_user)
277
    ):
278
    """Создать новый паролус"""
279

280
    parolus_dict = parolus.dict()
281
    parolus_dict["user_id"] = str(current_user['id'])
282
    parolus_dict["shared"] = bool(False)
283
    user = await retrieve_user_full(str(current_user['id']))
284

285
    if user:
286
        hash_key = user['key']
287
    new_parolus = await add_parolus(parolus_dict, hash_key)
288

289
    return ResponseModel(new_parolus, "Parolus added successfully.")
290

291

292
@app.get("/api/paroluses/list", response_description="Paroluses retrieved")
293
async def get_paroluses(current_user: TokenData = Depends(get_current_user)):
294
    """Получить список всех паролусов (только для уже аутентифицированного пользователя)"""
295

296
    user = await retrieve_user_full(str(current_user['id']))
297

298
    if user:
299
        hash_key = user['key']
300
    paroluses = await retrieve_paroluses(current_user['id'], hash_key)
301
    if paroluses:
302
        return ResponseModel(paroluses, "Paroluses data retrieved successfully")
303

304
    return ResponseModel(paroluses, "Empty list returned")
305

306

307
@app.get("/api/paroluses/{id}", response_description="Parolus data retrieved")
308
async def get_parolus_data(id):
309
    """Получить паролус по ID"""
310

311
    parolus = await retrieve_parolus(id)
312

313
    if parolus:
314
        return ResponseModel(parolus, "Parolus data retrieved successfully")
315

316
    return ErrorResponseModel("An error occurred.", 404, "Parolus doesn't exist.")
317

318

319
@app.put("/api/paroluses/{id}")
320
async def update_parolus_data(
321
    id: str,
322
    req: UpdateParolusModel = Body(...),
323
    current_user: TokenData = Depends(get_current_user)
324
    ):
325
    """Внести изменения в паролус"""
326

327
    parolus_dict = req.dict()
328
    parolus_dict["user_id"] = str(current_user['id'])
329

330
    req = {k: v for k, v in parolus_dict.items() if v is not None}
331

332
    user = await retrieve_user_full(str(current_user['id']))
333
    if user:
334
        hash_key = user['key']
335

336
    updated_parolus = await update_parolus(id, req, hash_key)
337

338
    if updated_parolus:
339
        return ResponseModel(
340
            "Parolus with ID: {} field update is successful".format(id),
341
            "Parolus fields updated successfully",
342
        )
343
    return ErrorResponseModel(
344
        "An error occurred",
345
        404,
346
        "There was an error updating the parolus data.",
347
    )
348

349

350
@app.delete("/api/paroluses/{id}", response_description="Parolus data deleted from the database")
351
async def delete_parolus_data(
352
    id: str,
353
    current_user: TokenData = Depends(get_current_user)
354
    ):
355
    """Удалить паролус"""
356

357
    deleted_parolus = await delete_parolus(id)
358

359
    if deleted_parolus:
360
        return ResponseModel(
361
            "Parolus with ID: {} removed".format(id), "Parolus deleted successfully"
362
        )
363
    return ErrorResponseModel(
364
        "An error occurred", 404, "Parolus with id {0} doesn't exist".format(id)
365
    )
366

367

368
@app.post("/api/paroluses/share/{parolus_id}/{user_id}", response_description="Parolus shared successfully")
369
async def share_parolus(
370
    parolus_id: str,
371
    user_id: str,
372
    current_user: TokenData = Depends(get_current_user)
373
    ):
374
    """Поделиться паролусом"""
375

376
    parolus = await retrieve_parolus(parolus_id)
377

378
    if not parolus:
379
        raise HTTPException(status_code=404, detail="Parolus not found")
380

381
    if parolus["user_id"] != str(current_user['id']):
382
        raise HTTPException(status_code=403, detail="Forbidden: You are not the owner of this parolus")
383

384
    reciever = await retrieve_user_full(user_id)
385
    reciever_key = reciever['key']
386

387
    if not reciever:
388
        raise HTTPException(status_code=404, detail="Reciever user not found")
389

390
    if not reciever["recievingShares"] == True:
391
        raise HTTPException(status_code=403, detail="Forbidden: User has blocked recieving function.")
392

393
    user = await retrieve_user_full(str(current_user['id']))
394
    user_key = user['key']
395

396
    if not user_key:
397
        raise HTTPException(status_code=500, detail="Failed to retrieve user key")
398

399
    cipher_suite = Fernet(user_key)
400
    decrypted_title = cipher_suite.decrypt(parolus['title'].encode()).decode()
401
    decrypted_login = cipher_suite.decrypt(parolus['login'].encode()).decode()
402
    decrypted_password = cipher_suite.decrypt(parolus['password'].encode()).decode()
403

404
    shared_parolus = {
405
        "title": decrypted_title,
406
        "login": decrypted_login,
407
        "password": decrypted_password,
408
        "user_id": user_id,
409
        "shared": True
410
    }
411

412
    shared_parolus_doc = await add_parolus(shared_parolus, reciever_key)
413

414
    return ResponseModel(shared_parolus_doc, "Parolus shared successfully.")
415

416

417
@app.get("/api/app-config", response_description="App config retrieved")
418
async def get_app_config():
419
    """Получить конфигурацию приложения"""
420

421
    config = await check_app_config()
422

423
    if config:
424
        return ResponseModel(config, "App config retrieved successfully")
425

426
    return ErrorResponseModel("An error occurred.", 404, "Parolus doesn't exist.")
427

428

429
@app.put("/api/app-config/change", response_description="Config changed successfully")
430
async def change_config(
431
    config_change_request: ConfigChangeRequestModel,
432
    ):
433
    """Сменить статус enabled_registration (для панели администратора)"""
434

435
    updated_config = await change_app_config(config_change_request.new_config)
436
    if updated_config:
437
        return ResponseModel(updated_config, "New status updated")
438
    else:
439
        raise HTTPException(status_code=500, detail="Failed to update app config")
440

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.