3
from typing import TYPE_CHECKING
7
from fastapi import HTTPException
8
from httpx import AsyncClient
9
from starlette.datastructures import QueryParams
10
from starlette.requests import Request, HTTPConnection
13
from core.fastapi.adapters.action_decorator import actions
14
from core.helpers.cache import Cache, CacheStrategy
15
from core.utils.timeit import timed
18
from core.env import Env, Model, Domain
20
logging.basicConfig(level=logging.INFO)
21
logger = logging.getLogger(__name__)
24
class Client(httpx.AsyncClient):
26
async def request(self, method: str, url: str, json=None, params=None, timeout=None):
27
if isinstance(json, str):
28
json = _json.loads(json)
29
query_param_cleaned = {}
31
for name, val in params.items():
33
query_param_cleaned.update({name: val})
34
qp = QueryParams(query_param_cleaned)
36
responce = await super().request(method=method, url=url, json=json, params=qp, timeout=timeout)
37
except Exception as ex:
38
logger.error(f'URL: {url}\n JSON: {json}\n PARAMS: {str(qp)}')
40
raise HTTPException(500, detail=str(ex))
41
if responce.status_code != 200:
42
raise HTTPException(responce.status_code, detail=responce.json().get('detail'))
45
async def get(self, url, *, params, kwargs=None):
46
logger.info('Adapter %s %s', url, params)
48
responce = await self.request('GET', url=url, params=params)
49
except Exception as ex:
50
logger.error(f'URL: {url}\n PARAMS: {str(params)}')
51
logger.error(f'REQUEST OBJ: {self.request}\n SELF: {self}')
53
print(f'URL: {url}\n PARAMS: {str(params)}')
57
async def post(self, url, json, *, params, kwargs=None):
58
responce = await self.request('POST', url=url, json=json, params=params)
61
async def put(self, url, json, *, params, kwargs=None):
62
responce = await self.request('PUT', url=url, json=json, params=params)
65
async def delete(self, url, *, params, kwargs=None):
66
responce = await self.request('DELETE', url=url, params=params)
70
async def common_exception_handler(responce):
71
if responce.status_code != 200:
73
status_code=responce.status_code,
74
detail=f"{str(responce.text)}"
76
return responce.json()
81
Универсальный адаптер, что бы ходить в другие сервисы,
82
При создании нужно указать модуль, или отнаследоваться с указанием модуля
83
Так же при создании можно сразу указать и модуль и модель, если нужно много раз ходить
96
def __init__(self, conn: HTTPConnection, domain: 'Domain', model: 'Model', env: 'Env'):
101
self.host = f"{self.protocol}://{self.host}:{self.port}"
103
self.headers = {'Authorization': conn.headers.get("Authorization") or conn.cookies.get('token') or ''}
105
self.client = Client(headers=self.headers)
107
def get_actions(self):
108
return actions.get(self.model.name, {})
110
async def __aenter__(self):
111
self.client = Client(headers=self.headers)
114
async def __aexit__(self, *args, **kwargs):
115
await self.client.aclose()
117
async def check_in_cache(self, params, model):
121
if model in ('locale', 'currency', 'country') or self.model.cache_strategy != CacheStrategy.FULL:
122
return is_cached, cached_data, missed
124
params.pop('module', None)
125
params.pop('model', None)
126
id__in = params.get('id__in')
127
if len(params) == 1 and id__in:
129
if isinstance(id__in, uuid.UUID):
130
ids = [id__in.__str__(), ]
132
ids = id__in.split(',')
133
cached_courutins = []
135
cached_courutins.append({
136
'promise': Cache.get_model(self.model.domain.name, model or self.model, id),
139
for cou in cached_courutins:
141
cached = await cou['promise']
142
except redis.exceptions.ConnectionError as ex:
143
logger.error(f'Cache error with {model or self.model}:{cou["id"]}')
145
except redis.exceptions.TimeoutError as ex:
146
logger.error(f'Cache error with {model or self.model}:{cou["id"]}')
149
cached_data.append(cached)
151
missed.append(cou['id'])
152
return is_cached, cached_data, missed
154
async def list(self, model: str | None = None, params={}, **kwargs):
156
filter = self.model.schemas.filter(**params)
157
params = filter.as_params()
158
is_cached, cached_data, missed = await self.check_in_cache(params, model or self.model)
159
if (is_cached and missed) or not is_cached:
161
params = QueryParams({'id__in': ','.join(missed)})
162
path = f'/api/{self.model.domain.name}/{model or self.model.name}'
163
responce = await self.client.get(self.host + path, params=params)
164
return responce.json()
166
'size': len(cached_data),
167
'cursor': max([i['lsn'] for i in cached_data]),
171
async def create(self, json: dict, model: str | None = None, params=None):
172
path = f'/api/{self.model.domain.name}/{model or self.model.name}'
173
responce = await self.client.post(self.host + path, json=json, params=params)
174
return await common_exception_handler(responce)
176
async def update(self, id: uuid.UUID, json: dict, model: str | None = None, params=None, **kwargs):
177
path = f'/api/{self.model.domain.name}/{model or self.model.name}/{id}'
178
responce = await self.client.put(self.host + path, json=json, params=params)
179
return await common_exception_handler(responce)
181
async def get(self, id: uuid.UUID, model: str | None = None, params=None, **kwargs):
182
path = f'/api/{self.model.domain.name}/{model or self.model.name}/{id}'
183
responce = await self.client.get(self.host + path, params=params, kwargs=kwargs)
184
return await common_exception_handler(responce)
186
async def view(self, id: uuid.UUID, model: str | None = None, params=None, **kwargs):
187
return await self.get(id, model, params, kwargs=kwargs)
189
async def delete(self, id: uuid.UUID, model: str | None = None, params=None, **kwargs):
190
path = f'/api/{self.model.domain.name}/{model or self.model.name}/{id}'
191
responce = await self.client.delete(self.host + path, params=params)
192
return await common_exception_handler(responce)