1
from asyncio import current_task
2
from typing import Callable
4
from sqlalchemy import MetaData
5
from sqlalchemy.ext.asyncio import (
12
from sqlalchemy.orm import declarative_base
13
from sqlalchemy.pool.impl import AsyncAdaptedQueuePool
15
from app.settings import settings
19
AsyncSessionFactory = Callable[..., AsyncSession]
22
def make_url_async(url: str) -> str:
23
"""Add +asyncpg to url scheme."""
24
return "postgresql+asyncpg" + url[url.find(":") :]
27
def make_url_sync(url: str) -> str:
28
"""Remove +asyncpg from url scheme."""
29
return "postgresql" + url[url.find(":") :]
32
"ix": "ix_%(column_0_label)s",
33
"uq": "uq_%(table_name)s_%(column_0_name)s",
34
"ck": "ck_%(table_name)s_%(constraint_name)s",
35
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
36
"pk": "pk_%(table_name)s",
39
Base = declarative_base(metadata= MetaData(naming_convention=convention))
43
engine: AsyncEngine = create_async_engine(
44
make_url_async(settings.POSTGRES_DSN), poolclass=AsyncAdaptedQueuePool
48
async def build_db_session_factory() -> AsyncSessionFactory:
49
await verify_db_connection(engine)
51
return async_scoped_session(
52
async_sessionmaker(bind=engine, expire_on_commit=False),
53
scopefunc=current_task,
57
async def verify_db_connection(engine: AsyncEngine) -> None:
58
connection = await engine.connect()
59
await connection.close()
62
async def close_db_connections() -> None:
63
await engine.dispose()