1
from asyncio import current_task
2
from typing import Callable
4
from sqlalchemy import MetaData
5
from sqlalchemy.ext.asyncio import (
12
from sqlalchemy.orm import declarative_base
13
from sqlalchemy.pool.impl import AsyncAdaptedQueuePool
15
from settings import settings
17
AsyncSessionFactory = Callable[..., AsyncSession]
20
def make_url_async(url: str) -> str:
21
"""Add +asyncpg to url scheme."""
22
return "postgresql+asyncpg" + url[url.find(":") :] # noqa: WPS336
25
def make_url_sync(url: str) -> str:
26
"""Remove +asyncpg from url scheme."""
27
return "postgresql" + url[url.find(":") :] # noqa: WPS336
31
"ix": "ix_%(column_0_label)s",
32
"uq": "uq_%(table_name)s_%(column_0_name)s",
33
"ck": "ck_%(table_name)s_%(constraint_name)s",
34
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
35
"pk": "pk_%(table_name)s",
38
Base = declarative_base(metadata=MetaData(naming_convention=convention))
40
engine: AsyncEngine = create_async_engine(
41
make_url_async(settings.POSTGRES_DSN), poolclass=AsyncAdaptedQueuePool
45
async def build_db_session_factory() -> AsyncSessionFactory:
46
await verify_db_connection(engine)
48
return async_scoped_session(
49
async_sessionmaker(bind=engine, expire_on_commit=False),
50
scopefunc=current_task,
54
async def verify_db_connection(engine: AsyncEngine) -> None:
55
connection = await engine.connect()
56
await connection.close()
59
async def close_db_connections() -> None:
60
await engine.dispose()