lavkach3

Форк
0
/
session.py 
116 строк · 3.7 Кб
1
import asyncio
2
from contextvars import ContextVar, Token
3
from idlelib.pyparse import trans
4
from typing import Union
5
import logging
6
from sqlalchemy.ext.asyncio import (
7
    AsyncSession,
8
    create_async_engine,
9
    async_scoped_session,
10
)
11
from sqlalchemy.orm import registry
12
from sqlalchemy.orm import sessionmaker, Session
13
from sqlalchemy.orm.decl_api import DeclarativeMeta
14
from sqlalchemy.sql.expression import Update, Delete, Insert
15
from core.helpers.broker import list_brocker
16
from core.db_config import config
17
from httpx import AsyncClient as asyncclient
18

19
from core.helpers.cache import CacheTag
20

21
logging.basicConfig(level=logging.INFO)
22
logger = logging.getLogger(__name__)
23
session_context: ContextVar[str] = ContextVar("session_context")
24

25

26
def get_session_context() -> str:
27
    return session_context.get()
28

29

30
def set_session_context(session_id: str) -> Token:
31
    return session_context.set(session_id)
32

33

34
def reset_session_context(context: Token) -> None:
35
    session_context.reset(context)
36

37

38
engines = {
39
    "writer": create_async_engine(config.WRITER_DB_URL, echo=False, pool_recycle=3600, max_overflow=30),
40
    "reader": create_async_engine(config.READER_DB_URL, echo=False, pool_recycle=3600, max_overflow=30),
41
}
42

43

44
class RoutingSession(Session):
45
    def get_bind(self, mapper=None, clause=None, **kw):
46
        if self._flushing or isinstance(clause, (Update, Delete, Insert)):
47
            return engines["writer"].sync_engine
48
        else:
49
            return engines["reader"].sync_engine
50

51

52
async_session_factory = sessionmaker(
53
    class_=AsyncSession,
54
    sync_session_class=RoutingSession,
55
)
56
session: Union[AsyncSession, async_scoped_session] = async_scoped_session(
57
    session_factory=async_session_factory,
58
    scopefunc=get_session_context,
59
)
60

61
mapper_registry = registry()
62

63

64
class Base(metaclass=DeclarativeMeta):
65
    __abstract__ = True
66

67
    registry = mapper_registry
68
    metadata = mapper_registry.metadata
69

70
    __init__ = mapper_registry.constructor
71

72
    async def prepare_bus(self, entity: object, method: str, updated_fields:list = None):
73
        return {
74
            'cache_tag': CacheTag.MODEL,
75
            'message': f'{self.__tablename__.capitalize()} is {method.capitalize()}',
76
            'company_id': entity.company_id if hasattr(entity,  'company_id') else entity.id,
77
            'vars': {
78
                'id': entity.id,
79
                'lsn': entity.lsn,
80
                'model': self.__tablename__,
81
                'method': method,
82
                'updated_fields': updated_fields
83
            }
84
        }
85

86
    @list_brocker.task(queue_name='model')
87
    async def update_notify(model: str, data: dict):
88
        print('alalala')
89
        client = asyncclient()
90
        responce = await client.post(
91
            url=f'http://{config.BUS_HOST}:{config.BUS_PORT}/api/bus/bus',
92
            json=data,
93
            headers={'Authorization': config.INTERCO_TOKEN}
94
        )
95
        if responce.status_code == 200:
96
            logger.info(f'Message sended to bus.bus')
97
        else:
98
            logger.warning(responce.text)
99

100

101
    async def notify(self, method, updated_fields: list = None, message=None):
102
        if not message:
103
            message = await self.prepare_bus(self, method=method, updated_fields=updated_fields)
104
        i: int = 1
105
        while True:
106
            try:
107
                task = await self.update_notify.kiq(self.__tablename__, message)
108
                logger.info(f'Message sended to bus.bus with id: {task.task_id}')
109
                break
110
            except Exception as ex:
111
                i += 1
112
                await asyncio.sleep(5)
113
                logger.error(f'Error while sending message to bus.bus: {ex}')
114
                logger.error(f'Try to send message again to bus.bus: {i}..')
115
            if i > 20:
116
                break
117

118

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.