lavkach3

Форк
0
516 строк · 19.8 Кб
1
import asyncio
2
import copy
3
import datetime
4
import uuid
5
from collections import defaultdict
6
from idlelib.debugobj import ClassTreeItem
7
from typing import Optional, Any
8
from uuid import uuid4
9

10
from fastapi import HTTPException
11
from jinja2_fragments import render_block
12
from pydantic import BaseModel, ValidationError
13
from starlette.datastructures import QueryParams
14

15
#from core.frontend.constructor import ClassView
16
from core.frontend.enviroment import environment
17
from core.frontend.exceptions import HTMXException
18
from core.frontend.field import Fields
19
from core.frontend.types import MethodType, ViewVars
20
from core.frontend.types import LineType
21
from core.utils import timeit
22
from core.utils.timeit import timed
23

24

25
class Line:
26
    """
27
        Обьект описывающий обьект отданный из другого сервиса или класса с помощью Pydantic модели
28
    """
29
    cls: Any                          # Список, которому принадлежит строка
30
    type: LineType                          # Тип поля СМ LineType
31
    is_last: bool = False                   # True Если обьект последний в Lines
32
    is_rel: bool = False                    # True если обьек является relation от поля родителя
33
    id: Optional[uuid.UUID] = None          # ID обьекта (если есть)
34
    fields: Fields                          # Поля обьекта
35

36
    def copy(self):
37
        copied_line = copy.copy(self)
38
        copied_line.fields = Fields()
39
        for field in self.fields:
40
            new_field = field.copy()
41
            new_field.line = copied_line
42
            setattr(copied_line.fields, field.field_name, new_field)
43
        return copied_line
44

45

46
    def __init__(self, cls, type,id=None, is_last=False, schema=None, **data):
47
        self.type = type
48
        self.cls = cls
49
        self.is_last = is_last
50
        self.id = id or uuid.uuid4()
51
        if self.type != LineType.FILTER:
52
            self.fields = self.cls._get_schema_fields(
53
                schema=schema if schema else cls._view.model.schemas.get,
54
                cls=cls,
55
                line=self.cls
56
            )
57
        if self.type == LineType.FILTER:
58
            self.fields = self.cls._get_schema_fields(
59
                schema=schema if schema else cls._view.model.schemas.filter,
60
                cls=cls,
61
                line=self.cls
62
            )
63

64
    @property
65
    def model_name(self):
66
        return self.lines.model_name
67
    @property
68
    def domain_name(self):
69
        return self.lines.domain_name
70
    @property
71
    def actions(self):
72
        return self.lines.actions
73
    @property
74
    def class_key(self):
75
        return self.lines.cls.key
76

77
    @property
78
    def display_title(self):
79
        return self.fields.title.val if hasattr(self.fields, 'title') else self.id
80

81
    @property
82
    def key(self) -> str:
83
        """Проп генерации UI ключа для обьекта"""
84
        if self.type == LineType.LINE:
85
            key = self.id
86
        elif self.type == LineType.NEW:
87
            key = id(self)  # type: ignore
88
        else:
89
            key = self.type.value
90
        return f'{self.lines.key}--{key}'
91

92
    @property
93
    def ui_key(self) -> str:
94
        """Сгенерировать ключ обьекта для UI"""
95
        return f'{self.model_name}--{self.id}'
96

97
    def _change_assign_line(self) -> None:
98
        """Присвоение нового обьекта Line'у"""
99
        for _, field in self.fields:  # type: ignore
100
            field.line = self
101

102
    def render(self, block_name: str, method: MethodType = MethodType.GET, last=False) -> str:
103
        """
104
            block_name: имя блока в шаблоне
105
            edit: Редактируемые ли поля внутри или нет
106
        """
107
        try:
108
            rendered_html = render_block(
109
                environment=environment,
110
                template_name=f'line/line.html',
111
                block_name=block_name,
112
                method=method,
113
                line=self,
114
                last=last
115
            )
116
        except Exception as ex:
117
            raise
118
        return rendered_html
119

120
    @property
121
    def button_view(self) -> str:
122
        """Сгенерировать кнопку на просмотр обьекта"""
123
        return self.render('button_view')
124

125
    @property
126
    def button_update(self) -> str:
127
        """Сгенерировать кнопку на редактирование обьекта"""
128
        return self.render('button_update')
129

130
    @property
131
    def button_create(self) -> str:
132
        """Сгенерировать кнопку на создание обьекта"""
133
        return self.render('button_create')
134

135
    @property
136
    def button_delete(self) -> str:
137
        """Сгенерировать кнопку на удаление обьекта"""
138
        return self.render(block_name='button_delete')
139

140
    @property
141
    def button_save(self) -> str:
142
        """Кнопка сохранения обьекта"""
143
        return self.render(block_name='button_save')
144

145
    @property
146
    def button_actions(self) -> str:
147
        """Сгенерировать кнопку на меню доступных методов обьекта"""
148
        return self.render(block_name='button_actions')
149

150
    @property
151
    def as_tr_get(self) -> str:
152
        """Отобразить обьект как строку таблицы на просмотр"""
153
        return self.render(block_name='as_tr', method=MethodType.GET)
154

155
    @property
156
    def as_tr_header(self) -> str:
157
        """Отобразить обьект как строку заголовок таблицы"""
158
        return self.render(block_name='as_tr_header', method=MethodType.GET)
159

160
    @property
161
    def as_tr_placeholder(self) -> str:
162
        """Отобразить обьект как строку заголовок таблицы"""
163
        return self.render(block_name='as_tr_placeholder', method=MethodType.GET)
164

165
    @property
166
    def as_tr_update(self) -> str:
167
        """Отобразить обьект как строку таблицы на редактирование"""
168
        return self.render(block_name='as_tr', method=MethodType.UPDATE)
169

170
    @property
171
    def as_tr_create(self) -> str:
172
        """Отобразить обьект как строку таблицы на создание"""
173
        return self.render(block_name='as_tr', method=MethodType.CREATE)
174

175
    @property
176
    def as_item(self) -> str:
177
        """Отобразить обьект как айтем с заголовком"""
178
        return self.render(block_name='as_item', method=MethodType.CREATE)
179

180
    @property
181
    def as_div_get(self) -> str:
182
        """Отобразить обьект как строку таблицы на просмотр"""
183
        return self.render(block_name='as_div', method=MethodType.GET)
184

185
    @property
186
    def as_div_update(self) -> str:
187
        """Отобразить обьект как строку таблицы на просмотр"""
188
        return self.render(block_name='as_div', method=MethodType.UPDATE)
189

190
    @property
191
    def as_div_create(self) -> str:
192
        """Отобразить обьект как строку таблицы на просмотр"""
193
        return self.render(block_name='as_div', method=MethodType.CREATE)
194

195
    @property
196
    def as_card(self) -> str:
197
        """Отобразить обьект как строку таблицы на просмотр"""
198
        return self.render(block_name='as_card', method=MethodType.GET)
199

200
    @property
201
    def get_update(self) -> str:
202
        """Метод отдает модалку на редактирование обьекта"""
203
        return render_block(
204
            environment=environment,
205
            template_name=f'line/modal.html',
206
            method=MethodType.UPDATE,
207
            block_name='modal',
208
            line=self
209
        )
210

211
    @property
212
    def get_get(self) -> str:
213
        """Метод отдает модалку на просмотр обьекта"""
214
        return render_block(
215
            environment=environment,
216
            template_name=f'line/modal.html',
217
            method=MethodType.GET,
218
            block_name='modal',
219
            line=self
220
        )
221

222
    @property
223
    def get_delete(self) -> str:
224
        """Метод отдает модалку на удаление обьекта"""
225
        return render_block(
226
            environment=environment,
227
            template_name=f'line/modal.html',
228
            method=MethodType.DELETE,
229
            block_name='delete',
230
            line=self,
231
        )
232

233
    @property
234
    def get_create(self) -> str:
235
        """Метод отдает модалку на создание нового обьекта"""
236
        return render_block(
237
            environment=environment,
238
            template_name=f'line/modal.html',
239
            method=MethodType.CREATE,
240
            block_name='modal',
241
            line=self,
242
        )
243

244

245
class FilterLine(Line):
246
    """Обертка для определения класса фильтра"""
247
    ...
248

249

250
class Lines:
251
    """Делаем класс похожий на List и уже работаем с ним"""
252
    parent_field: Optional[Any] = None    # Поле родитель, если класс конструктор это Field другого класса
253
    line_header: Optional[Line] = None    # Обьект для заголовка
254
    line_new: Optional[Line] = None       # Пустой обьект
255
    line_filter: Optional[Line] = None    # Обьект, описывающий фильтр
256
    lines: list['Line'] = []              # Список обьектов
257
    cls: Any                              # Класс конструктора ClassView
258
    data: dict = {}                       # Данные
259
    filter_fields: Optional['Fields'] = None           # Поля обьекта
260
    line_fields: Optional['Fields'] = None  # Поля обьекта
261

262
    def __init__(self, cls):
263
        self.cls = cls
264
        self.lines = []
265
        self.line_header = Line(
266
            type=LineType.HEADER,
267
            lines=self,
268
        )
269
        self.line_filter = Line(
270
            type=LineType.FILTER,
271
            lines=self,
272
        )
273
        self.line_new = Line(
274
            type=LineType.NEW,
275
            lines=self,
276
        )
277

278

279
    @property
280
    def actions(self):
281
        return self.cls.actions
282
    @property
283
    def params(self):
284
        return self.cls.params
285
    @property
286
    def join_related(self):
287
        return self.cls.join_related
288
    @property
289
    def join_fields(self):
290
        return self.cls.join_fields
291

292
    @property
293
    def vars(self):
294
        return self.cls.vars
295

296
    @property
297
    def class_key(self) -> str:
298
        return self.cls.key
299
    @property
300
    def domain_name(self) -> str:
301
        return self.cls.model.domain.domain_name
302

303
    @property
304
    def model_name(self) -> str:
305
        return self.cls.model_name
306
    @property
307
    def key(self) -> str:
308
        return self.parent_field.key if self.parent_field else self.class_key
309

310
    def __bool__(self) -> bool:
311
        if not self.lines:
312
            return False
313
        else:
314
            return True
315

316

317
    async def get_data(
318
            self,
319
            params: QueryParams | dict | None = None,
320
            data: list | dict | None = None,
321
            schema: BaseModel | None = None,
322
            join_related: bool = False,
323
            **kwargs
324
    ) -> None:
325
        """Метод собирает данные для конструктора модели"""
326

327
        if not params:
328
            params = self.params
329
        if not data:
330
            if not self.cls.model.adapter.domain.domain_type == 'INTERNAL':
331
                async with self.cls.model.adapter as a:  # type: ignore
332
                    resp_data = await a.list(params=params)
333
                    data = resp_data['data']
334
            else:
335
                data_obj = await self.cls.model.service.list(_filter=params)
336
                data = [i.__dict__ for i in data_obj]
337
        self.data = {i['id']: i for i in data}
338
        await self.fill_lines(self.data, join_related, self.join_fields, schema)
339

340

341

342
    async def fill_lines(
343
            self,
344
            data: list|dict,
345
            join_related: bool = False,
346
            join_fields: list = [],
347
            schema: BaseModel | None = None
348
    ) -> None:
349
        if isinstance(data, list):
350
            data = {i['id']: i for i in data}
351
        if not data:
352
            data = {}
353
        self.data = data
354
        join_fields = join_fields or self.join_fields
355
        for _id, row in data.items():
356
            line_copied = self.line_header.copy()
357
            line_copied.type = LineType.LINE
358
            # line_copied = Line(
359
            #     type=LineType.LINE,
360
            #     lines=self,
361
            #     id=row.get('id'),
362
            #     is_last=False,
363
            #     schema=schema
364
            # )
365
            line_copied.is_last = False
366
            line_copied.id = row.get('id', id(line_copied))
367
            line_copied.lsn = row.get('lsn')
368
            for col in line_copied.fields:
369
                col.val = row.get(col.field_name, None)
370
                if col.type in ('date', 'datetime'):
371
                    if isinstance(col.val, datetime.datetime):
372
                        pass
373
                    elif isinstance(col.val, str):
374
                        col.val = datetime.datetime.fromisoformat(col.val)
375
                elif col.type == 'id':
376
                    if not col.val:
377
                        col.val = []
378
                elif col.type.endswith('list_rel'):
379
                    ...
380
                    col.lines.parent_field = col
381
                    await col.lines.fill_lines(data=col.val, join_related=False)
382
                #row[col.field_name] = col.val
383
                #setattr(line_copied, col.field_name, col.val)
384
            self.lines.append(line_copied)
385

386
        if join_related or join_fields:
387
            missing_fields = defaultdict(list)
388
            for _line in self.lines:
389
                """Достаем все релейтед обьекты у которых модуль отличается"""
390
                assert _line.fields,   "Проверяем что все поля есть"
391
                for field_name, field in _line.fields.get_fields():
392
                    if field.type in ('uuid',):
393
                        # if field.widget.get('table'):  # TODO: может все надо а не ток table
394
                        if not join_fields:
395
                            missing_fields[field.field_name].append((field.val, field))
396
                        elif field.field_name in join_fields:
397
                            missing_fields[field.field_name].append((field.val, field))
398
            to_serialize = []
399
            for miss_key, miss_value in missing_fields.items():
400
                # _data = []
401
                _vals, _fields = [i[0] for i in miss_value], [i[1] for i in miss_value]
402
                miss_value_str = ''
403
                _corutine_data = None
404
                if isinstance(_vals, list):
405
                    miss_value_str = ','.join([i for i in set(_vals) if i])
406
                if miss_value_str:
407
                    qp = {'id__in': miss_value_str}
408
                    _corutine_data = asyncio.create_task(self.cls.env[_fields[0].model_name].adapter.list(params=qp))
409
                to_serialize.append((_vals, _fields, _corutine_data))
410
            for _vals, _fields, _corutine_data in to_serialize:
411
                _join_lines = {}
412
                if _corutine_data:
413
                    _data = await _corutine_data
414
                    _join_lines = {i['id']: i for i in _data['data']}
415
                for _val, _field in zip(_vals, _fields):
416
                    if isinstance(_val, list):
417
                        _new_vals = []
418
                        for _v in _val:
419
                            __val = _join_lines.get(_v)
420
                            if __val:
421
                                _new_vals.append(__val)
422
                        _field.val = _new_vals
423
                    else:
424
                        _field.val = _join_lines.get(_val)
425
                    if _field.type == 'uuid':
426
                        _field.type = 'rel'
427
                    elif _field.type == 'list_uuid':
428
                        _field.type = 'list_rel'
429
                    else:
430
                        raise HTMXException(
431
                            status_code=500,
432
                            detail=f'Wrong field name {_field.field_name} in table model {_field.model}'
433
                        )
434
            for col in missing_fields.keys():
435
                for _field_name, _header_col in self.line_header.fields.get_fields():  # type: ignore
436
                    if col == _field_name:
437
                        _header_col.type = _header_col.type.replace('uuid', 'rel')
438
                        _header_col.type = _header_col.type.replace('list_uuid', 'list_rel')
439

440
    async def get_lines(self, ids: list[uuid.UUID], join_related: bool = False) -> list[Line]:
441
        await self.get_data(
442
            schema=self.cls.model.schemas.get,
443
            params={'id__in': ids},
444
            join_related=join_related or self.join_related,
445
            join_fields=self.join_fields,
446
        )
447
        return self.lines
448

449
    async def update_lines(self, data: dict, id: uuid.UUID) -> list[Line]:
450
        """Метод обновления обьектов"""
451
        new_data = []
452
        for raw_line in data:
453
            try:
454
                method_schema_obj = self.cls.model.schemas.update(**raw_line)
455
            except ValidationError as e:
456
                raise HTTPException(status_code=406, detail=f"Error: {str(e)}")
457
            _json = method_schema_obj.model_dump(mode='json', exclude_unset=True)
458
            line = await self.cls.model.adapter.update(id=id, json=_json)
459
            new_data.append(line)
460
        await self.fill_lines(new_data)
461
        return self.lines
462

463
    async def create_lines(self, data: dict) -> list[Line]:
464
        """Метод создания обьектов"""
465
        new_data = []
466
        for raw_line in data:
467
            try:
468
                method_schema_obj = self.cls.model.schemas.create(**raw_line)
469
            except ValidationError as e:
470
                raise HTTPException(status_code=406, detail=f"Error: {str(e)}")
471
            _json = method_schema_obj.model_dump(mode='json', exclude_unset=True)
472
            line = await self.cls.model.adapter.create(json=_json)
473
            new_data.append(line)
474
        await self.fill_lines(new_data)
475
        return self.lines
476

477
    async def delete_lines(self, ids: list[uuid.UUID]) -> bool:
478
        """Метод удаления обьектов"""
479
        for _id in ids:
480
            await self.cls.model.adapter.delete(id=_id)
481
        return True
482

483
    @property
484
    def as_table_update(self) -> str:
485
        """Метод отдает список обьектов как таблицу на редактирование"""
486
        rendered_html = ''
487
        for i, line in enumerate(self.lines):
488
            if i == len(self.lines) - 1:
489
                line.is_last = True
490
            rendered_html += line.as_tr_update
491
        return rendered_html
492

493
    @property
494
    def as_table_get(self) -> str:
495
        """Метод отдает список обьектов как таблицу на просмотр"""
496
        rendered_html = ''
497
        for i, line in enumerate(self.lines):
498
            if i == len(self.lines) - 1:
499
                line.is_last = True
500
            rendered_html += line.as_tr_get
501
        return rendered_html
502

503
    @property
504
    def as_card_kanban(self) -> str:
505
        rendered_html = ''
506
        for i, line in enumerate(self.lines):
507
            if i == len(self.lines) - 1:
508
                line.is_last = True
509
            rendered_html += f'<div class="col-6">{line.as_card}</div>'
510
        return rendered_html
511

512

513

514
    @property
515
    def as_table_header(self) -> str:
516
        return self.lines[0].as_tr_header if self.lines else self.line_header.as_tr_header  # type: ignore
517

518

519

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.