lavkach3

Форк
0
192 строки · 7.6 Кб
1
import logging
2
import uuid
3
from typing import TYPE_CHECKING
4

5
import httpx
6
import redis.exceptions
7
from fastapi import HTTPException
8
from httpx import AsyncClient
9
from starlette.datastructures import QueryParams
10
from starlette.requests import Request, HTTPConnection
11
import json as _json
12

13
from core.fastapi.adapters.action_decorator import actions
14
from core.helpers.cache import Cache, CacheStrategy
15
from core.utils.timeit import timed
16

17
if TYPE_CHECKING:
18
    from core.env import Env, Model, Domain
19

20
logging.basicConfig(level=logging.INFO)
21
logger = logging.getLogger(__name__)
22

23

24
class Client(httpx.AsyncClient):
25

26
    async def request(self, method: str, url: str, json=None, params=None, timeout=None):
27
        if isinstance(json, str):
28
            json = _json.loads(json)
29
        query_param_cleaned = {}
30
        if params:
31
            for name, val in params.items():
32
                if val:
33
                    query_param_cleaned.update({name: val})
34
        qp = QueryParams(query_param_cleaned)
35
        try:
36
            responce = await super().request(method=method, url=url, json=json, params=qp, timeout=timeout)
37
        except Exception as ex:
38
            logger.error(f'URL: {url}\n JSON: {json}\n PARAMS: {str(qp)}')
39
            logger.error(str(ex))
40
            raise HTTPException(500, detail=str(ex))
41
        if responce.status_code != 200:
42
            raise HTTPException(responce.status_code, detail=responce.json().get('detail'))
43
        return responce
44

45
    async def get(self, url, *, params, kwargs=None):
46
        logger.info('Adapter %s %s', url, params)
47
        try:
48
            responce = await self.request('GET', url=url, params=params)
49
        except Exception as ex:
50
            logger.error(f'URL: {url}\n PARAMS: {str(params)}')
51
            logger.error(f'REQUEST OBJ: {self.request}\n SELF: {self}')
52
            logger.error(str(ex))
53
            print(f'URL: {url}\n PARAMS: {str(params)}')
54
            raise ex
55
        return responce
56

57
    async def post(self, url, json, *, params, kwargs=None):
58
        responce = await self.request('POST', url=url, json=json, params=params)
59
        return responce
60

61
    async def put(self, url, json, *, params, kwargs=None):
62
        responce = await self.request('PUT', url=url, json=json, params=params)
63
        return responce
64

65
    async def delete(self, url, *, params, kwargs=None):
66
        responce = await self.request('DELETE', url=url, params=params)
67
        return responce
68

69

70
async def common_exception_handler(responce):
71
    if responce.status_code != 200:
72
        raise HTTPException(
73
            status_code=responce.status_code,
74
            detail=f"{str(responce.text)}"
75
        )
76
    return responce.json()
77

78

79
class BaseAdapter:
80
    """
81
    Универсальный адаптер, что бы ходить в другие сервисы,
82
    При создании нужно указать модуль, или отнаследоваться с указанием модуля
83
    Так же при создании можно сразу указать и модуль и модель, если нужно много раз ходить
84
    """
85
    model: 'Model'
86
    client: Client
87
    domain: 'Domain'
88
    request: Request
89
    env: 'Env'
90
    cache: str = Cache
91
    headers: dict
92
    protocol: str
93
    host: str
94
    port: str
95

96
    def __init__(self, conn: HTTPConnection, domain: 'Domain', model: 'Model', env: 'Env'):
97

98

99
        self.model = model
100
        self.domain = domain
101
        self.host = f"{self.protocol}://{self.host}:{self.port}"
102
        self.env = env
103
        self.headers = {'Authorization': conn.headers.get("Authorization") or conn.cookies.get('token') or ''}
104
        # if self.headers.get('Authorization'):
105
        self.client = Client(headers=self.headers)
106

107
    def get_actions(self):
108
        return actions.get(self.model.name, {})
109

110
    async def __aenter__(self):
111
        self.client = Client(headers=self.headers)
112
        return self
113

114
    async def __aexit__(self, *args, **kwargs):
115
        await self.client.aclose()
116

117
    async def check_in_cache(self, params, model):
118
        is_cached = False
119
        cached_data = []
120
        missed = []
121
        if model in ('locale', 'currency', 'country') or self.model.cache_strategy != CacheStrategy.FULL:
122
            return is_cached, cached_data, missed
123
        if params:
124
            params.pop('module', None)
125
            params.pop('model', None)
126
            id__in = params.get('id__in')
127
            if len(params) == 1 and id__in:
128
                is_cached = True
129
                if isinstance(id__in, uuid.UUID):
130
                    ids = [id__in.__str__(), ]
131
                else:
132
                    ids = id__in.split(',')
133
                cached_courutins = []
134
                for id in set(ids):
135
                    cached_courutins.append({
136
                        'promise': Cache.get_model(self.model.domain.name, model or self.model, id),
137
                        'id': id
138
                    })
139
                for cou in cached_courutins:
140
                    try:
141
                        cached = await cou['promise']
142
                    except redis.exceptions.ConnectionError as ex:
143
                        logger.error(f'Cache error with {model or self.model}:{cou["id"]}')
144
                        cached = False
145
                    except redis.exceptions.TimeoutError as ex:
146
                        logger.error(f'Cache error with {model or self.model}:{cou["id"]}')
147
                        cached = False
148
                    if cached:
149
                        cached_data.append(cached)
150
                    else:
151
                        missed.append(cou['id'])
152
        return is_cached, cached_data, missed
153

154
    async def list(self, model: str | None = None, params={}, **kwargs):
155

156
        filter = self.model.schemas.filter(**params)
157
        params = filter.as_params()
158
        is_cached, cached_data, missed = await self.check_in_cache(params, model or self.model)
159
        if (is_cached and missed) or not is_cached:
160
            if missed:
161
                params = QueryParams({'id__in': ','.join(missed)})
162
            path = f'/api/{self.model.domain.name}/{model or self.model.name}'
163
            responce = await self.client.get(self.host + path, params=params)
164
            return responce.json()
165
        return {
166
            'size': len(cached_data),
167
            'cursor': max([i['lsn'] for i in cached_data]),
168
            'data': cached_data
169
        }
170

171
    async def create(self, json: dict, model: str | None = None, params=None):
172
        path = f'/api/{self.model.domain.name}/{model or self.model.name}'
173
        responce = await self.client.post(self.host + path, json=json, params=params)
174
        return await common_exception_handler(responce)
175

176
    async def update(self, id: uuid.UUID, json: dict, model: str | None = None, params=None, **kwargs):
177
        path = f'/api/{self.model.domain.name}/{model or self.model.name}/{id}'
178
        responce = await self.client.put(self.host + path, json=json, params=params)
179
        return await common_exception_handler(responce)
180

181
    async def get(self, id: uuid.UUID, model: str | None = None, params=None, **kwargs):
182
        path = f'/api/{self.model.domain.name}/{model or self.model.name}/{id}'
183
        responce = await self.client.get(self.host + path, params=params, kwargs=kwargs)
184
        return await common_exception_handler(responce)
185

186
    async def view(self, id: uuid.UUID, model: str | None = None, params=None, **kwargs):
187
        return await self.get(id, model, params, kwargs=kwargs)
188

189
    async def delete(self, id: uuid.UUID, model: str | None = None, params=None, **kwargs):
190
        path = f'/api/{self.model.domain.name}/{model or self.model.name}/{id}'
191
        responce = await self.client.delete(self.host + path, params=params)
192
        return await common_exception_handler(responce)
193

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.