gigacook
/
db.py
73 строки · 2.0 Кб
1from os import environ
2
3from sqlalchemy import delete, insert, inspect, select, update
4from sqlalchemy.ext.asyncio import AsyncAttrs, AsyncSession, async_sessionmaker, create_async_engine
5from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
6
7async_session = AsyncSession
8
9
10class Base(AsyncAttrs, DeclarativeBase):
11@classmethod
12def __get_pk(cls):
13return getattr(cls, inspect(cls).primary_key[0].name)
14
15@classmethod
16async def create(cls, **kwargs):
17async with async_session() as session:
18await session.execute(insert(cls).values(**kwargs))
19await session.commit()
20
21@classmethod
22async def read(cls, user):
23async with async_session() as session:
24return list(await session.scalars(select(cls).where(cls.user == user)))
25
26@classmethod
27async def update(cls, pk, **kwargs):
28async with async_session() as session:
29await session.execute(update(cls).where(cls.__get_pk() == pk).values(**kwargs))
30await session.commit()
31
32@classmethod
33async def delete(cls, pk):
34async with async_session() as session:
35await session.execute(delete(cls).where(cls.__get_pk() == pk))
36await session.commit()
37
38
39class Stage(Base):
40__tablename__ = "stage"
41
42user: Mapped[int] = mapped_column(primary_key=True)
43name: Mapped[str]
44
45@classmethod
46async def set(cls, user: int, name: str):
47if await cls.read(user):
48await cls.update(user, name=name)
49else:
50await cls.create(user=user, name=name)
51
52
53class Product(Base):
54__tablename__ = "product"
55
56id: Mapped[int] = mapped_column(primary_key=True)
57user: Mapped[int]
58name: Mapped[str]
59
60
61class Bookmark(Base):
62__tablename__ = "bookmark"
63
64id: Mapped[int] = mapped_column(primary_key=True)
65user: Mapped[int]
66text: Mapped[str]
67
68
69def run_async_session() -> None:
70global async_session
71
72engine = create_async_engine(environ["DB_URL"], echo=True)
73async_session = async_sessionmaker(engine, expire_on_commit=False)
74