5
from collections import defaultdict
6
from idlelib.debugobj import ClassTreeItem
7
from typing import Optional, Any
10
from fastapi import HTTPException
11
from jinja2_fragments import render_block
12
from pydantic import BaseModel, ValidationError
13
from starlette.datastructures import QueryParams
16
from core.frontend.enviroment import environment
17
from core.frontend.exceptions import HTMXException
18
from core.frontend.field import Fields
19
from core.frontend.types import MethodType, ViewVars
20
from core.frontend.types import LineType
21
from core.utils import timeit
22
from core.utils.timeit import timed
27
Обьект описывающий обьект отданный из другого сервиса или класса с помощью Pydantic модели
33
id: Optional[uuid.UUID] = None
37
copied_line = copy.copy(self)
38
copied_line.fields = Fields()
39
for field in self.fields:
40
new_field = field.copy()
41
new_field.line = copied_line
42
setattr(copied_line.fields, field.field_name, new_field)
46
def __init__(self, cls, type,id=None, is_last=False, schema=None, **data):
49
self.is_last = is_last
50
self.id = id or uuid.uuid4()
51
if self.type != LineType.FILTER:
52
self.fields = self.cls._get_schema_fields(
53
schema=schema if schema else cls._view.model.schemas.get,
57
if self.type == LineType.FILTER:
58
self.fields = self.cls._get_schema_fields(
59
schema=schema if schema else cls._view.model.schemas.filter,
66
return self.lines.model_name
68
def domain_name(self):
69
return self.lines.domain_name
72
return self.lines.actions
75
return self.lines.cls.key
78
def display_title(self):
79
return self.fields.title.val if hasattr(self.fields, 'title') else self.id
83
"""Проп генерации UI ключа для обьекта"""
84
if self.type == LineType.LINE:
86
elif self.type == LineType.NEW:
90
return f'{self.lines.key}--{key}'
93
def ui_key(self) -> str:
94
"""Сгенерировать ключ обьекта для UI"""
95
return f'{self.model_name}--{self.id}'
97
def _change_assign_line(self) -> None:
98
"""Присвоение нового обьекта Line'у"""
99
for _, field in self.fields:
102
def render(self, block_name: str, method: MethodType = MethodType.GET, last=False) -> str:
104
block_name: имя блока в шаблоне
105
edit: Редактируемые ли поля внутри или нет
108
rendered_html = render_block(
109
environment=environment,
110
template_name=f'line/line.html',
111
block_name=block_name,
116
except Exception as ex:
121
def button_view(self) -> str:
122
"""Сгенерировать кнопку на просмотр обьекта"""
123
return self.render('button_view')
126
def button_update(self) -> str:
127
"""Сгенерировать кнопку на редактирование обьекта"""
128
return self.render('button_update')
131
def button_create(self) -> str:
132
"""Сгенерировать кнопку на создание обьекта"""
133
return self.render('button_create')
136
def button_delete(self) -> str:
137
"""Сгенерировать кнопку на удаление обьекта"""
138
return self.render(block_name='button_delete')
141
def button_save(self) -> str:
142
"""Кнопка сохранения обьекта"""
143
return self.render(block_name='button_save')
146
def button_actions(self) -> str:
147
"""Сгенерировать кнопку на меню доступных методов обьекта"""
148
return self.render(block_name='button_actions')
151
def as_tr_get(self) -> str:
152
"""Отобразить обьект как строку таблицы на просмотр"""
153
return self.render(block_name='as_tr', method=MethodType.GET)
156
def as_tr_header(self) -> str:
157
"""Отобразить обьект как строку заголовок таблицы"""
158
return self.render(block_name='as_tr_header', method=MethodType.GET)
161
def as_tr_placeholder(self) -> str:
162
"""Отобразить обьект как строку заголовок таблицы"""
163
return self.render(block_name='as_tr_placeholder', method=MethodType.GET)
166
def as_tr_update(self) -> str:
167
"""Отобразить обьект как строку таблицы на редактирование"""
168
return self.render(block_name='as_tr', method=MethodType.UPDATE)
171
def as_tr_create(self) -> str:
172
"""Отобразить обьект как строку таблицы на создание"""
173
return self.render(block_name='as_tr', method=MethodType.CREATE)
176
def as_item(self) -> str:
177
"""Отобразить обьект как айтем с заголовком"""
178
return self.render(block_name='as_item', method=MethodType.CREATE)
181
def as_div_get(self) -> str:
182
"""Отобразить обьект как строку таблицы на просмотр"""
183
return self.render(block_name='as_div', method=MethodType.GET)
186
def as_div_update(self) -> str:
187
"""Отобразить обьект как строку таблицы на просмотр"""
188
return self.render(block_name='as_div', method=MethodType.UPDATE)
191
def as_div_create(self) -> str:
192
"""Отобразить обьект как строку таблицы на просмотр"""
193
return self.render(block_name='as_div', method=MethodType.CREATE)
196
def as_card(self) -> str:
197
"""Отобразить обьект как строку таблицы на просмотр"""
198
return self.render(block_name='as_card', method=MethodType.GET)
201
def get_update(self) -> str:
202
"""Метод отдает модалку на редактирование обьекта"""
204
environment=environment,
205
template_name=f'line/modal.html',
206
method=MethodType.UPDATE,
212
def get_get(self) -> str:
213
"""Метод отдает модалку на просмотр обьекта"""
215
environment=environment,
216
template_name=f'line/modal.html',
217
method=MethodType.GET,
223
def get_delete(self) -> str:
224
"""Метод отдает модалку на удаление обьекта"""
226
environment=environment,
227
template_name=f'line/modal.html',
228
method=MethodType.DELETE,
234
def get_create(self) -> str:
235
"""Метод отдает модалку на создание нового обьекта"""
237
environment=environment,
238
template_name=f'line/modal.html',
239
method=MethodType.CREATE,
245
class FilterLine(Line):
246
"""Обертка для определения класса фильтра"""
251
"""Делаем класс похожий на List и уже работаем с ним"""
252
parent_field: Optional[Any] = None
253
line_header: Optional[Line] = None
254
line_new: Optional[Line] = None
255
line_filter: Optional[Line] = None
256
lines: list['Line'] = []
259
filter_fields: Optional['Fields'] = None
260
line_fields: Optional['Fields'] = None
262
def __init__(self, cls):
265
self.line_header = Line(
266
type=LineType.HEADER,
269
self.line_filter = Line(
270
type=LineType.FILTER,
273
self.line_new = Line(
281
return self.cls.actions
284
return self.cls.params
286
def join_related(self):
287
return self.cls.join_related
289
def join_fields(self):
290
return self.cls.join_fields
297
def class_key(self) -> str:
300
def domain_name(self) -> str:
301
return self.cls.model.domain.domain_name
304
def model_name(self) -> str:
305
return self.cls.model_name
307
def key(self) -> str:
308
return self.parent_field.key if self.parent_field else self.class_key
310
def __bool__(self) -> bool:
319
params: QueryParams | dict | None = None,
320
data: list | dict | None = None,
321
schema: BaseModel | None = None,
322
join_related: bool = False,
325
"""Метод собирает данные для конструктора модели"""
330
if not self.cls.model.adapter.domain.domain_type == 'INTERNAL':
331
async with self.cls.model.adapter as a:
332
resp_data = await a.list(params=params)
333
data = resp_data['data']
335
data_obj = await self.cls.model.service.list(_filter=params)
336
data = [i.__dict__ for i in data_obj]
337
self.data = {i['id']: i for i in data}
338
await self.fill_lines(self.data, join_related, self.join_fields, schema)
342
async def fill_lines(
345
join_related: bool = False,
346
join_fields: list = [],
347
schema: BaseModel | None = None
349
if isinstance(data, list):
350
data = {i['id']: i for i in data}
354
join_fields = join_fields or self.join_fields
355
for _id, row in data.items():
356
line_copied = self.line_header.copy()
357
line_copied.type = LineType.LINE
365
line_copied.is_last = False
366
line_copied.id = row.get('id', id(line_copied))
367
line_copied.lsn = row.get('lsn')
368
for col in line_copied.fields:
369
col.val = row.get(col.field_name, None)
370
if col.type in ('date', 'datetime'):
371
if isinstance(col.val, datetime.datetime):
373
elif isinstance(col.val, str):
374
col.val = datetime.datetime.fromisoformat(col.val)
375
elif col.type == 'id':
378
elif col.type.endswith('list_rel'):
380
col.lines.parent_field = col
381
await col.lines.fill_lines(data=col.val, join_related=False)
384
self.lines.append(line_copied)
386
if join_related or join_fields:
387
missing_fields = defaultdict(list)
388
for _line in self.lines:
389
"""Достаем все релейтед обьекты у которых модуль отличается"""
390
assert _line.fields, "Проверяем что все поля есть"
391
for field_name, field in _line.fields.get_fields():
392
if field.type in ('uuid',):
395
missing_fields[field.field_name].append((field.val, field))
396
elif field.field_name in join_fields:
397
missing_fields[field.field_name].append((field.val, field))
399
for miss_key, miss_value in missing_fields.items():
401
_vals, _fields = [i[0] for i in miss_value], [i[1] for i in miss_value]
403
_corutine_data = None
404
if isinstance(_vals, list):
405
miss_value_str = ','.join([i for i in set(_vals) if i])
407
qp = {'id__in': miss_value_str}
408
_corutine_data = asyncio.create_task(self.cls.env[_fields[0].model_name].adapter.list(params=qp))
409
to_serialize.append((_vals, _fields, _corutine_data))
410
for _vals, _fields, _corutine_data in to_serialize:
413
_data = await _corutine_data
414
_join_lines = {i['id']: i for i in _data['data']}
415
for _val, _field in zip(_vals, _fields):
416
if isinstance(_val, list):
419
__val = _join_lines.get(_v)
421
_new_vals.append(__val)
422
_field.val = _new_vals
424
_field.val = _join_lines.get(_val)
425
if _field.type == 'uuid':
427
elif _field.type == 'list_uuid':
428
_field.type = 'list_rel'
432
detail=f'Wrong field name {_field.field_name} in table model {_field.model}'
434
for col in missing_fields.keys():
435
for _field_name, _header_col in self.line_header.fields.get_fields():
436
if col == _field_name:
437
_header_col.type = _header_col.type.replace('uuid', 'rel')
438
_header_col.type = _header_col.type.replace('list_uuid', 'list_rel')
440
async def get_lines(self, ids: list[uuid.UUID], join_related: bool = False) -> list[Line]:
442
schema=self.cls.model.schemas.get,
443
params={'id__in': ids},
444
join_related=join_related or self.join_related,
445
join_fields=self.join_fields,
449
async def update_lines(self, data: dict, id: uuid.UUID) -> list[Line]:
450
"""Метод обновления обьектов"""
452
for raw_line in data:
454
method_schema_obj = self.cls.model.schemas.update(**raw_line)
455
except ValidationError as e:
456
raise HTTPException(status_code=406, detail=f"Error: {str(e)}")
457
_json = method_schema_obj.model_dump(mode='json', exclude_unset=True)
458
line = await self.cls.model.adapter.update(id=id, json=_json)
459
new_data.append(line)
460
await self.fill_lines(new_data)
463
async def create_lines(self, data: dict) -> list[Line]:
464
"""Метод создания обьектов"""
466
for raw_line in data:
468
method_schema_obj = self.cls.model.schemas.create(**raw_line)
469
except ValidationError as e:
470
raise HTTPException(status_code=406, detail=f"Error: {str(e)}")
471
_json = method_schema_obj.model_dump(mode='json', exclude_unset=True)
472
line = await self.cls.model.adapter.create(json=_json)
473
new_data.append(line)
474
await self.fill_lines(new_data)
477
async def delete_lines(self, ids: list[uuid.UUID]) -> bool:
478
"""Метод удаления обьектов"""
480
await self.cls.model.adapter.delete(id=_id)
484
def as_table_update(self) -> str:
485
"""Метод отдает список обьектов как таблицу на редактирование"""
487
for i, line in enumerate(self.lines):
488
if i == len(self.lines) - 1:
490
rendered_html += line.as_tr_update
494
def as_table_get(self) -> str:
495
"""Метод отдает список обьектов как таблицу на просмотр"""
497
for i, line in enumerate(self.lines):
498
if i == len(self.lines) - 1:
500
rendered_html += line.as_tr_get
504
def as_card_kanban(self) -> str:
506
for i, line in enumerate(self.lines):
507
if i == len(self.lines) - 1:
509
rendered_html += f'<div class="col-6">{line.as_card}</div>'
515
def as_table_header(self) -> str:
516
return self.lines[0].as_tr_header if self.lines else self.line_header.as_tr_header