fastapi
53 строки · 1.3 Кб
1from typing import Union2
3from couchbase import LOCKMODE_WAIT4from couchbase.bucket import Bucket5from couchbase.cluster import Cluster, PasswordAuthenticator6from fastapi import FastAPI7from pydantic import BaseModel8
9USERPROFILE_DOC_TYPE = "userprofile"10
11
12def get_bucket():13cluster = Cluster(14"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"15)16authenticator = PasswordAuthenticator("username", "password")17cluster.authenticate(authenticator)18bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)19bucket.timeout = 3020bucket.n1ql_timeout = 30021return bucket22
23
24class User(BaseModel):25username: str26email: Union[str, None] = None27full_name: Union[str, None] = None28disabled: Union[bool, None] = None29
30
31class UserInDB(User):32type: str = USERPROFILE_DOC_TYPE33hashed_password: str34
35
36def get_user(bucket: Bucket, username: str):37doc_id = f"userprofile::{username}"38result = bucket.get(doc_id, quiet=True)39if not result.value:40return None41user = UserInDB(**result.value)42return user43
44
45# FastAPI specific code
46app = FastAPI()47
48
49@app.get("/users/{username}", response_model=User)50def read_user(username: str):51bucket = get_bucket()52user = get_user(bucket=bucket, username=username)53return user54