lavkach3

Форк
0
282 строки · 9.9 Кб
1
from enum import Enum
2
from typing import Optional, Any, Callable
3
import io
4

5
import openpyxl
6
from fastapi import APIRouter, Depends
7
from fastapi import Request
8
from fastapi.responses import HTMLResponse
9
from pydantic import BaseModel, UUID4, model_validator
10
from starlette.responses import JSONResponse
11

12
from app.front.front_tasks import import_prepare_data, import_save
13
from app.front.utills import BaseClass
14
from core.frontend.constructor import ClassView, get_view, BaseSchema, Method
15
from core.frontend.utils import clean_filter
16
from fastapi import FastAPI, File, UploadFile
17

18

19

20
class ExceptionResponseSchema(BaseModel):
21
    error: str
22

23

24

25
router = APIRouter(
26
    responses={"400": {"model": ExceptionResponseSchema}},
27
)
28

29
@router.post("/filter", response_class=HTMLResponse)
30
async def _filter(cls: ClassView = Depends(get_view)):
31
    """
32
     Универсальный запрос, который отдает фильтр обьекта по его модулю и модели
33
    """
34
    return cls.h.as_filter
35

36

37
class SearchSchema(BaseModel):
38
    model: str
39
    search: str = ''
40
    filter: Optional[Any] = None
41
    key: Optional[str] = None
42

43
    @model_validator(mode='before')
44
    def _filter(cls, value):
45
        """
46
            Так же убираем все пустые params
47
        """
48
        if f := value.get('filter'):
49
            if isinstance(f, str):
50
                try:
51
                    value['filter'] = eval(f)
52
                except TypeError as ex:
53
                    raise 'Type Error'
54
        return value
55

56

57
@router.get("/search", response_class=JSONResponse)
58
async def search(request: Request, schema: SearchSchema = Depends()):
59
    """
60
     Универсальный запрос поиска
61
    """
62
    params = {'search': schema.search}
63
    if schema.filter:
64
        params.update(schema.filter)
65
    async with request.scope['env'][schema.model].adapter as a:
66
        data = await a.list(params=params, model=schema.model)
67
    return [
68
        {
69
            'value': i['id'],
70
            'label': i.get('title') or i.get('name') or i.get('english_name') or i.get('nickname')
71
        }
72
        for i in data['data']
73
    ]
74

75

76
class SearchIds(BaseModel):
77
    model: str
78
    id__in: str
79

80

81
@router.get("/get_by_ids", response_class=JSONResponse)
82
async def get_by_ids(request: Request, schema: SearchIds = Depends()):
83
    """
84
     Универсальный запрос поиска
85
    """
86
    if not schema.id__in:
87
        return []
88
    params = {'id__in': schema.id__in}
89
    async with request.scope['env'][schema.model].adapter as a:
90
        data = await a.list(params=params, model=schema.model)
91
    return [
92
        {
93
            'value': i['id'],
94
            'label': i.get('title') or i.get('name') or i.get('english_name') or i.get('nickname')
95
        }
96
        for i in data['data']
97
    ]
98

99

100
class TableSchema(BaseSchema):
101
    model: str
102
    cursor: Optional[int] = 0
103
    key: str
104

105

106

107

108
@router.post("/table", response_class=HTMLResponse)
109
async def table(view: ClassView = Depends(get_view)):
110
    """
111
     Универсальный запрос, который отдает таблицу обьекта и связанные если нужно
112
    """
113
    await view.init()
114
    return view.h.as_table
115

116

117
class LineSchema(BaseSchema):
118
    id: Optional[UUID4 | int] = None
119
    mode: Optional[str] = 'get'
120

121
    class Config:
122
        extra = "allow"
123

124

125
@router.post("/line", response_class=HTMLResponse)
126
async def line(cls: ClassView = Depends(get_view)):
127
    """
128
     Универсальный запрос, который отдает/изменяет обьект
129
    """
130
    match cls.v.schema.method:
131
        case Method.UPDATE:
132
            """Отдать обьект на редактирование, в зависимости от mode (tr/div)"""
133
            line = await cls.get_lines(ids=[cls.v.schema.id], join_related=False)
134
            return getattr(line.h, f'as_{cls.v.schema.mode}_update')
135
        case Method.GET:
136
            """Отдать обьект на чтение, в зависимости от mode (tr/div)"""
137
            line = await cls.get_lines(ids=[cls.v.schema.id], join_related=False)
138
            return getattr(line.h, f'as_{cls.v.schema.mode}_get')
139
        case Method.CREATE:
140
            """Отдать обьект на создание, в зависимости от mode (tr/div)"""
141
            return cls.h.as_tr_create
142
        case Method.DELETE:
143
            """Отдать обьект на удаление, в не зависимости от mode (tr/div)"""
144
            if isinstance(cls.v.schema.id, int):
145
                """Если это временная запись, то просто удалить"""
146
                return
147
            line = await cls.get_lines(ids=[cls.v.schema.id], join_related=False)
148
            return line.h.get_modal_delete
149
        case Method.UPDATE_SAVE:
150
            """Сохранение записи при измененнии"""
151
            data = clean_filter(cls.v.schema.model_extra, cls.v.schema.key)
152
            await cls.update_lines(id=cls.v.schema.id, data=data)
153
        case Method.CREATE_SAVE:
154
            """Сохранение записи при создании"""
155
            data = clean_filter(cls.v.schema.model_extra, cls.v.schema.key)
156
            line = await cls.create_lines(data)
157
            return line.h.as_div_update
158
        case Method.DELETE_SAVE:
159
            await cls.delete_lines(ids=[cls.v.schema.id])
160
            """Отдать обьект на удаление, в не зависимости от mode (tr/div)"""
161

162

163
class ModalSchema(BaseSchema):
164
    id: Optional[UUID4] = None
165

166
    class Config:
167
        extra = 'allow'
168

169
@router.post("/modal", response_class=HTMLResponse)
170
async def modal(cls: ClassView = Depends(get_view)):
171
    """
172
     Универсальный запрос модалки, который отдает форму модели
173
    """
174
    cls.reset_key()
175
    match cls.v.schema.method:
176
        case Method.GET:
177
            line = await cls.get_lines(ids=[cls.v.schema.id])
178
            return line.h.get_modal_get
179
        case Method.UPDATE:
180
            line = await cls.get_lines(ids=[cls.v.schema.id])
181
            return line.h.get_modal_update
182
        case Method.DELETE:
183
            line = await cls.get_lines(ids=[cls.v.schema.id])
184
            return line.h.get_modal_delete
185
        case Method.CREATE:
186
            return cls.h.get_modal_create
187

188

189
class ActionSchema(BaseSchema):
190
    action: str
191
    ids: Optional[list[str]] = []
192
    schema: Any = None
193
    commit: Optional[bool] = False
194

195
    class Config:
196
        extra = 'allow'
197

198

199
@router.post("/action", response_class=HTMLResponse)
200
async def action(cls: ClassView = Depends(get_view)):
201
    """
202
     Универсальный запрос, который отдает форму модели (черпает из ModelUpdateSchema
203
    """
204
    cls: ClassView = ClassView(request, schema.model)
205
    func: Callable = getattr(cls.model.adapter, schema.action)
206
    result = []
207
    if schema.commit and schema.method == 'update':
208
        action_schema = cls.actions[schema.action]['schema']
209
        if data := schema.model_extra:
210
            _json: dict = {}
211
            data = clean_filter(data, schema.key)
212
            for line in data:  # type: ignore
213
                obj = action_schema(**line)
214
                res = await func(obj)
215
                result += res
216
    elif schema.method == 'update':
217
        res = await func(payload=schema.model_dump_json())
218
        return cls.send_message(message=f'Action {schema.action} done')
219
    elif schema.method == 'get':
220
        action_schema = cls.actions[schema.action]['schema']
221
        return await cls.get_action(action=schema.action, ids=schema.ids, schema=action_schema)
222

223
    return cls.send_message('Action Done')
224

225
class ImportSchema(BaseSchema):
226
    class Config:
227
        extra = 'allow'
228

229
@router.post("/import", response_class=HTMLResponse)
230
async def modal(cls: ClassView = Depends(get_view)):
231
    """
232
     Универсальный запрос модалки, который отдает форму модели
233
    """
234
    cls = ClassView(request, schema.model, force_init=False)
235
    match schema.method:
236
        case Method.GET:
237
            return cls.get_import
238
        case Method.UPDATE_SAVE:
239
            data = clean_filter(schema.model_extra, schema.key)
240
            lines = await import_save(cls.model_name, data)
241
            await cls.init(params={}, data=lines, join_related=True)
242
            return cls.as_table
243

244
@router.post("/import_upload", response_class=HTMLResponse)
245
async def import_upload(cls: ClassView = Depends(get_view)):
246
    """
247
     Универсальный запрос модалки, который отдает форму модели
248
    """
249
    model, key = request.query_params.values()
250
    cls: ClassView = ClassView(request, model, key=key, force_init=False)
251
    format: str = file.filename.split('.')[-1]
252
    data: list = []
253
    header: list = []
254
    import_schema: BaseSchema = cls.model.schemas.create
255
    match format:
256
        case 'csv':
257
            ...
258
        case 'xlsx':
259
            # Read it, 'f' type is bytes
260
            f = await file.read()
261
            xlsx = io.BytesIO(f)
262
            wb = openpyxl.load_workbook(xlsx)
263
            ws = wb[wb.sheetnames[0]]
264

265
            for i, cells in enumerate(ws.iter_rows()):
266
                if i == 0:
267
                   header = [cell.value.lower() for cell in cells]
268
                   continue
269
                line: dict = {}
270
                for col, label in enumerate(header):
271
                    line[label] = cells[col].value
272
                data.append(line)
273
    if not data:
274
        return f"{cls.send_message(message=f'No data in file')} {cls.get_import}"
275
    task = await import_prepare_data.kiq(model=cls.model_name, data=data)
276
    task_result = await task.wait_result()
277
    cls.errors = task_result.return_value[0]
278
    lines = task_result.return_value[1]
279
    if task_result.return_value[2] == 'update':
280
        import_schema = cls.model.schemas.update
281
    await cls.init(data=lines, schema=import_schema)
282
    return f"{cls.get_import_errors}\n{cls.as_table_update}"
283

284

285

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.