2
from contextvars import ContextVar, Token
3
from idlelib.pyparse import trans
4
from typing import Union
6
from sqlalchemy.ext.asyncio import (
11
from sqlalchemy.orm import registry
12
from sqlalchemy.orm import sessionmaker, Session
13
from sqlalchemy.orm.decl_api import DeclarativeMeta
14
from sqlalchemy.sql.expression import Update, Delete, Insert
15
from core.helpers.broker import list_brocker
16
from core.db_config import config
17
from httpx import AsyncClient as asyncclient
19
from core.helpers.cache import CacheTag
21
logging.basicConfig(level=logging.INFO)
22
logger = logging.getLogger(__name__)
23
session_context: ContextVar[str] = ContextVar("session_context")
26
def get_session_context() -> str:
27
return session_context.get()
30
def set_session_context(session_id: str) -> Token:
31
return session_context.set(session_id)
34
def reset_session_context(context: Token) -> None:
35
session_context.reset(context)
39
"writer": create_async_engine(config.WRITER_DB_URL, echo=False, pool_recycle=3600, max_overflow=30),
40
"reader": create_async_engine(config.READER_DB_URL, echo=False, pool_recycle=3600, max_overflow=30),
44
class RoutingSession(Session):
45
def get_bind(self, mapper=None, clause=None, **kw):
46
if self._flushing or isinstance(clause, (Update, Delete, Insert)):
47
return engines["writer"].sync_engine
49
return engines["reader"].sync_engine
52
async_session_factory = sessionmaker(
54
sync_session_class=RoutingSession,
56
session: Union[AsyncSession, async_scoped_session] = async_scoped_session(
57
session_factory=async_session_factory,
58
scopefunc=get_session_context,
61
mapper_registry = registry()
64
class Base(metaclass=DeclarativeMeta):
67
registry = mapper_registry
68
metadata = mapper_registry.metadata
70
__init__ = mapper_registry.constructor
72
async def prepare_bus(self, entity: object, method: str, updated_fields:list = None):
74
'cache_tag': CacheTag.MODEL,
75
'message': f'{self.__tablename__.capitalize()} is {method.capitalize()}',
76
'company_id': entity.company_id if hasattr(entity, 'company_id') else entity.id,
80
'model': self.__tablename__,
82
'updated_fields': updated_fields
86
@list_brocker.task(queue_name='model')
87
async def update_notify(model: str, data: dict):
89
client = asyncclient()
90
responce = await client.post(
91
url=f'http://{config.BUS_HOST}:{config.BUS_PORT}/api/bus/bus',
93
headers={'Authorization': config.INTERCO_TOKEN}
95
if responce.status_code == 200:
96
logger.info(f'Message sended to bus.bus')
98
logger.warning(responce.text)
101
async def notify(self, method, updated_fields: list = None, message=None):
103
message = await self.prepare_bus(self, method=method, updated_fields=updated_fields)
107
task = await self.update_notify.kiq(self.__tablename__, message)
108
logger.info(f'Message sended to bus.bus with id: {task.task_id}')
110
except Exception as ex:
112
await asyncio.sleep(5)
113
logger.error(f'Error while sending message to bus.bus: {ex}')
114
logger.error(f'Try to send message again to bus.bus: {i}..')