lavkach3

Форк
0
177 строк · 5.4 Кб
1
import uuid
2
from dataclasses import dataclass
3
from enum import Enum
4
from typing import TYPE_CHECKING, Any
5

6
from httpx import AsyncClient
7
from starlette.requests import HTTPConnection, Request
8
from taskiq import AsyncBroker
9

10

11
from core.db.session import set_session_context
12
from core.db_config import config
13
from core.fastapi.adapters.action_decorator import actions
14
from httpx import AsyncClient as asyncclient, request
15

16
from core.fastapi.schemas import CurrentUser
17
from core.helpers.cache import CacheStrategy
18
from pydantic import BaseModel
19

20
if TYPE_CHECKING:
21
    from core.fastapi.adapters import BaseAdapter
22
    from core.service.base import Model, BaseService
23
    from core.db import Base
24
from .core_apps.base import __domain__ as base_domain
25
from .core_apps.bus import __domain__ as bus_domain
26

27
class DeleteSchema(BaseModel):
28
    delete_id: uuid.UUID
29

30

31
@dataclass
32
class Schemas:
33
    create: Any = None
34
    get: Any = None
35
    filter: Any = None
36
    update: Any = None
37
    delete: Any = None
38

39

40
class Model:  # type: ignore
41
    name: str
42
    _adapter: 'BaseAdapter'
43
    _service: 'BaseService'
44
    domain: 'Domain'
45
    schemas: Schemas
46
    model: Any
47
    sort: list = []
48
    cache_strategy: 'CacheStrategy' = CacheStrategy.NONE
49
    actions: dict = {}
50

51
    def __init__(self, name, _adapter, _service, domain, schemas, model, sort=[], cache_strategy=CacheStrategy.NONE):
52
        self.name = name
53
        self._adapter = _adapter
54
        self._service = _service
55
        self.domain = domain
56
        self.schemas = Schemas(**schemas)
57
        self.model = model
58
        self.sort = sort if sort else ['id', 'created_at', 'updated_at', 'lsn']
59
        self.cache_strategy = cache_strategy
60
        self.actions = actions.get(name, {})
61

62
    def __copy__(self):
63
        return self
64

65
    @property
66
    def adapter(self):
67
        return self._adapter(
68
            conn=self.domain._env.request,
69
            domain=self.domain,
70
            model=self,
71
            env=self.domain._env
72
        )
73

74
    @property
75
    def service(self):
76
        return self._service(
77
            self.domain._env.request,
78
        )
79

80

81
class Domain:
82
    name: str
83
    models: dict[str, Model]
84
    _env: 'Env'
85
    domain_type: str
86
    _adapter: 'BaseAdapter' = None
87

88

89
    def __init__(self, domain: dict, domain_type='EXTERNAL'):
90
        self.name = domain['name']
91
        self.domain_type = domain_type
92
        self._adapter = domain.get('adapter', None)
93
        models = {}
94
        for name, value in domain.items():
95
            if name not in ('adapter', 'name'):
96
                if shemas := value.get('schemas', {}):
97
                    shemas.update({'delete': DeleteSchema})
98
                models.update({name: Model(
99
                    name=name,
100
                    _adapter=self._adapter,
101
                    _service=value.get('service'),
102
                    schemas=shemas,
103
                    model=value.get('model'),
104
                    cache_strategy=value.get('cache_strategy'),
105
                    domain=self
106
                )})
107
        self.models = models
108

109
    def __getitem__(self, item: str):
110
        return self.models[item]
111

112
core_domains = [base_domain, bus_domain]
113

114
class Env:
115
    domains: dict[str, Domain]
116
    request: HTTPConnection
117
    broker: AsyncBroker
118

119
    def __init__(self, domains: list | dict, conn: HTTPConnection | AsyncClient | Request, broker: AsyncBroker | None = None):
120
        _domains: dict = {}
121
        if isinstance(domains, dict):
122
            domains = [domains]
123
        for d in domains+core_domains:
124
            domain_type: str = 'EXTERNAL'
125
            if isinstance(d, tuple):
126
                domain_type = d[1]
127
                d = d[0]
128
            if isinstance(d, dict):
129
                d = Domain(d, domain_type)
130
            d._env = self
131
            _domains.update({d.name: d})
132
        self.request = conn  # type: ignore
133
        self.domains = _domains
134
        self.broker = broker  # type: ignore
135

136
    def __getitem__(self, item: str):
137
        for k, v in self.domains.items():
138
            exist_model = v.models.get(item)
139
            if exist_model:
140
                return exist_model
141
        raise KeyError
142

143
    @classmethod
144
    async def get_sudo_env(self):
145
        """
146
            Создает env путем авторизации суперюзера с походом в basic сервис
147
        """
148
        client = asyncclient()
149
        body = {
150
            "email": config.SUPERUSER_EMAIL,
151
            "password": config.SUPERUSER_PASSWORD
152
        }
153
        responce = await client.post(
154
            url=f'http://{config.BASE_HOST}:{config.BASE_PORT}/api/base/user/login',
155
            json=body
156
        )
157
        data = responce.json()
158
        client = asyncclient(headers={'Authorization': data['token']})
159
        env = Env(core_domains, client)
160
        user = CurrentUser(id=uuid.uuid4(), is_admin=True)
161
        setattr(client, 'user', user)
162
        setattr(client, 'scope', {'env': env})
163
        return env
164

165
    @classmethod
166
    def get_env(cls):
167
        """
168
            Создает env путем ,без авторизации суперюзера
169
        """
170
        session_id = str(uuid.uuid4())
171
        set_session_context(session_id=session_id) # Делаем сессию для env, если она без реквеста
172
        client = asyncclient(headers={'Authorization': f'Bearer {config.INTERCO_TOKEN}'})
173
        env = Env(core_domains, client)
174
        user = CurrentUser(id=uuid.uuid4(), is_admin=True)
175
        setattr(client, 'user', user)
176
        setattr(client, 'scope', {'env': env})
177
        return env
178

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.