2
from dataclasses import dataclass
4
from typing import TYPE_CHECKING, Any
6
from httpx import AsyncClient
7
from starlette.requests import HTTPConnection, Request
8
from taskiq import AsyncBroker
11
from core.db.session import set_session_context
12
from core.db_config import config
13
from core.fastapi.adapters.action_decorator import actions
14
from httpx import AsyncClient as asyncclient, request
16
from core.fastapi.schemas import CurrentUser
17
from core.helpers.cache import CacheStrategy
18
from pydantic import BaseModel
21
from core.fastapi.adapters import BaseAdapter
22
from core.service.base import Model, BaseService
23
from core.db import Base
24
from .core_apps.base import __domain__ as base_domain
25
from .core_apps.bus import __domain__ as bus_domain
27
class DeleteSchema(BaseModel):
42
_adapter: 'BaseAdapter'
43
_service: 'BaseService'
48
cache_strategy: 'CacheStrategy' = CacheStrategy.NONE
51
def __init__(self, name, _adapter, _service, domain, schemas, model, sort=[], cache_strategy=CacheStrategy.NONE):
53
self._adapter = _adapter
54
self._service = _service
56
self.schemas = Schemas(**schemas)
58
self.sort = sort if sort else ['id', 'created_at', 'updated_at', 'lsn']
59
self.cache_strategy = cache_strategy
60
self.actions = actions.get(name, {})
68
conn=self.domain._env.request,
77
self.domain._env.request,
83
models: dict[str, Model]
86
_adapter: 'BaseAdapter' = None
89
def __init__(self, domain: dict, domain_type='EXTERNAL'):
90
self.name = domain['name']
91
self.domain_type = domain_type
92
self._adapter = domain.get('adapter', None)
94
for name, value in domain.items():
95
if name not in ('adapter', 'name'):
96
if shemas := value.get('schemas', {}):
97
shemas.update({'delete': DeleteSchema})
98
models.update({name: Model(
100
_adapter=self._adapter,
101
_service=value.get('service'),
103
model=value.get('model'),
104
cache_strategy=value.get('cache_strategy'),
109
def __getitem__(self, item: str):
110
return self.models[item]
112
core_domains = [base_domain, bus_domain]
115
domains: dict[str, Domain]
116
request: HTTPConnection
119
def __init__(self, domains: list | dict, conn: HTTPConnection | AsyncClient | Request, broker: AsyncBroker | None = None):
121
if isinstance(domains, dict):
123
for d in domains+core_domains:
124
domain_type: str = 'EXTERNAL'
125
if isinstance(d, tuple):
128
if isinstance(d, dict):
129
d = Domain(d, domain_type)
131
_domains.update({d.name: d})
133
self.domains = _domains
136
def __getitem__(self, item: str):
137
for k, v in self.domains.items():
138
exist_model = v.models.get(item)
144
async def get_sudo_env(self):
146
Создает env путем авторизации суперюзера с походом в basic сервис
148
client = asyncclient()
150
"email": config.SUPERUSER_EMAIL,
151
"password": config.SUPERUSER_PASSWORD
153
responce = await client.post(
154
url=f'http://{config.BASE_HOST}:{config.BASE_PORT}/api/base/user/login',
157
data = responce.json()
158
client = asyncclient(headers={'Authorization': data['token']})
159
env = Env(core_domains, client)
160
user = CurrentUser(id=uuid.uuid4(), is_admin=True)
161
setattr(client, 'user', user)
162
setattr(client, 'scope', {'env': env})
168
Создает env путем ,без авторизации суперюзера
170
session_id = str(uuid.uuid4())
171
set_session_context(session_id=session_id)
172
client = asyncclient(headers={'Authorization': f'Bearer {config.INTERCO_TOKEN}'})
173
env = Env(core_domains, client)
174
user = CurrentUser(id=uuid.uuid4(), is_admin=True)
175
setattr(client, 'user', user)
176
setattr(client, 'scope', {'env': env})