fastapi

Форк
0
53 строки · 1.3 Кб
1
from typing import Union
2

3
from couchbase import LOCKMODE_WAIT
4
from couchbase.bucket import Bucket
5
from couchbase.cluster import Cluster, PasswordAuthenticator
6
from fastapi import FastAPI
7
from pydantic import BaseModel
8

9
USERPROFILE_DOC_TYPE = "userprofile"
10

11

12
def get_bucket():
13
    cluster = Cluster(
14
        "couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
15
    )
16
    authenticator = PasswordAuthenticator("username", "password")
17
    cluster.authenticate(authenticator)
18
    bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
19
    bucket.timeout = 30
20
    bucket.n1ql_timeout = 300
21
    return bucket
22

23

24
class User(BaseModel):
25
    username: str
26
    email: Union[str, None] = None
27
    full_name: Union[str, None] = None
28
    disabled: Union[bool, None] = None
29

30

31
class UserInDB(User):
32
    type: str = USERPROFILE_DOC_TYPE
33
    hashed_password: str
34

35

36
def get_user(bucket: Bucket, username: str):
37
    doc_id = f"userprofile::{username}"
38
    result = bucket.get(doc_id, quiet=True)
39
    if not result.value:
40
        return None
41
    user = UserInDB(**result.value)
42
    return user
43

44

45
# FastAPI specific code
46
app = FastAPI()
47

48

49
@app.get("/users/{username}", response_model=User)
50
def read_user(username: str):
51
    bucket = get_bucket()
52
    user = get_user(bucket=bucket, username=username)
53
    return user
54

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.