6
from collections import defaultdict
7
from datetime import datetime
9
from inspect import isclass
10
from typing import Optional, get_args, get_origin, Any
11
from starlette.exceptions import HTTPException
12
from fastapi import Query
13
from fastapi_filter.contrib.sqlalchemy import Filter
14
from jinja2_fragments import render_block
15
from pydantic import BaseModel, UUID4, model_validator
16
from pydantic.fields import FieldInfo, ComputedFieldInfo
17
from pydantic.v1.schema import schema
18
from pydantic_core import ValidationError
19
from starlette.datastructures import QueryParams
20
from starlette.requests import Request
22
from app.front.utills import BaseClass
23
from core.env import Model, Env
24
from core.frontend.enviroment import passed_classes, readonly_fields, hidden_fields, table_fields, \
25
reserved_fields, environment, _crud_filter
26
from core.frontend.exceptions import HTMXException
27
from core.frontend.field import Field, Fields
28
from core.frontend.types import LineType, ViewVars, MethodType
29
from core.frontend.utils import clean_filter
30
from core.schemas import BaseFilter
31
from core.schemas.basic_schemes import ActionBaseSchame, BasicField
32
from core.utils.timeit import timed
36
"""Генерирует уникальный идетификатор для конструктора модели"""
37
return f'A{uuid.uuid4().hex[:10]}'
40
def get_types(annotation: object, _class: list = []) -> list[object]:
42
Рекурсивно берем типы из анотации типа
44
if isclass(annotation):
45
_class.append(annotation)
48
origin = get_origin(annotation)
49
annotate = get_args(annotation)
50
if origin and origin not in passed_classes:
53
get_types(annotate[0], _class)
54
except Exception as ex:
55
_class.append(annotation)
61
Класс описывает контекст конструктора модели,
62
ВАЖНО!! класс этот шерится между всеми лайнами,
63
<поэтому в нем не должно быть никаких данных
65
request: Request = None
67
vars: Optional[dict] = {
68
'button_update': True,
71
params: Optional[QueryParams] | dict | None
72
join_related: Optional[bool] | None = None
73
join_fields: Optional[list] = []
75
parent_field: Optional['ClassView'] = None
77
exclude: Optional[list] = [None]
88
Класс инкапсулирует все вспомогательные проперти
95
key = f'{self.cls.v.key}--{self.cls._id}'
99
def ui_key(self) -> str:
100
"""Сгенерировать ключ обьекта для UI"""
101
return f'{self.cls.v.model.name}--{self.cls._id}'
104
def lsn(self) -> str:
105
"""Сгенерировать ключ обьекта для UI"""
110
"""Сгенерировать ключ обьекта для UI"""
114
def model_name(self) -> str:
115
return self.cls.v.model.name
118
def domain_name(self):
119
return self.cls.v.model.domain.name
123
return self.cls.v.actions
128
return self.cls.v.key
131
def display_title(self):
132
return self.cls.title.val if hasattr(self.cls, 'title') else self.cls._id
137
Класс инкапсулирует все htmx вызовы шаблонов
142
def button_view(self) -> str:
143
"""Сгенерировать кнопку на просмотр обьекта"""
144
return self.cls.render_line('button_view')
147
def button_update(self) -> str:
148
"""Сгенерировать кнопку на редактирование обьекта"""
149
return self.cls.render_line('button_update')
152
def button_create(self) -> str:
153
"""Сгенерировать кнопку на создание обьекта"""
154
return self.cls.render_line('button_create')
157
def button_delete(self) -> str:
158
"""Сгенерировать кнопку на удаление обьекта"""
159
return self.cls.render_line(block_name='button_delete')
162
def button_save(self) -> str:
163
"""Кнопка сохранения обьекта"""
164
return self.cls.render_line(block_name='button_save')
167
def button_actions(self) -> str:
168
"""Сгенерировать кнопку на меню доступных методов обьекта"""
169
return self.cls.render_line(block_name='button_actions')
172
def as_tr_get(self) -> str:
173
"""Отобразить обьект как строку таблицы на просмотр"""
174
return self.cls.render_line(block_name='as_tr', method=MethodType.GET)
177
def as_table_header(self) -> str:
178
return self.cls.render_line('as_tr_header')
181
def as_tr_header(self) -> str:
182
"""Отобразить обьект как строку заголовок таблицы"""
183
return self.cls.render_line(block_name='as_tr_header', method=MethodType.GET)
186
def as_tr_placeholder(self) -> str:
187
"""Отобразить обьект как строку заголовок таблицы"""
188
return self.cls.render_line(block_name='as_tr_placeholder', method=MethodType.GET)
191
def as_tr_update(self) -> str:
192
"""Отобразить обьект как строку таблицы на редактирование"""
193
return self.cls.render_line(block_name='as_tr', method=MethodType.UPDATE)
196
def as_tr_create(self) -> str:
197
"""Отобразить обьект как строку таблицы на создание"""
198
return self.cls.render_line(block_name='as_tr', method=MethodType.CREATE)
201
def as_item(self) -> str:
202
"""Отобразить обьект как айтем с заголовком"""
203
return self.cls.render_line(block_name='as_item', method=MethodType.CREATE)
206
def as_div_get(self) -> str:
207
"""Отобразить обьект как строку таблицы на просмотр"""
208
return self.cls.render_line(block_name='as_div', method=MethodType.GET)
211
def as_div_update(self) -> str:
212
"""Отобразить обьект как строку таблицы на просмотр"""
213
return self.cls.render_line(block_name='as_div', method=MethodType.UPDATE)
216
def as_div_create(self) -> str:
217
"""Отобразить обьект как строку таблицы на просмотр"""
218
return self.cls.render_line(block_name='as_div', method=MethodType.CREATE)
221
def as_card(self) -> str:
222
"""Отобразить обьект как строку таблицы на просмотр"""
223
return self.cls.render_line(block_name='as_card', method=MethodType.GET)
226
def get_modal_update(self) -> str:
227
"""Метод отдает модалку на редактирование обьекта"""
229
environment=environment,
230
template_name=f'line/modal.html',
231
method=MethodType.UPDATE,
237
def get_modal_get(self) -> str:
238
"""Метод отдает модалку на просмотр обьекта"""
240
environment=environment,
241
template_name=f'line/modal.html',
242
method=MethodType.GET,
248
def get_modal_delete(self) -> str:
249
"""Метод отдает модалку на удаление обьекта"""
251
environment=environment,
252
template_name=f'line/modal.html',
253
method=MethodType.DELETE,
259
def get_modal_create(self) -> str:
260
"""Метод отдает модалку на создание нового обьекта"""
262
environment=environment,
263
template_name=f'line/modal.html',
264
method=MethodType.CREATE,
270
def get_button_view(self) -> str:
271
"""Сгенерировать кнопку на просмотр обьекта"""
272
return self.cls.render_line('button_view')
275
def get_button_update(self) -> str:
276
"""Сгенерировать кнопку на редактирование обьекта"""
277
return self.cls.render('button_update')
280
def get_button_create(self) -> str:
281
"""Сгенерировать кнопку на создание обьекта"""
282
return self.cls.render('button_create')
285
def get_button_delete(self) -> str:
286
"""Сгенерировать кнопку на удаление обьекта"""
287
return self.cls.render(block_name='button_delete')
290
def get_button_save(self) -> str:
291
"""Кнопка сохранения обьекта"""
292
return self.cls.render(block_name='button_save')
295
def get_button_actions(self) -> str:
296
"""Сгенерировать кнопку на меню доступных методов обьекта"""
297
return self.cls.render(block_name='button_actions')
300
def as_filter(self) -> str:
301
"""Метод отдает фильтр , те столбцы с типами для HTMX шаблонов"""
303
environment=environment, template_name=f'cls/filter.html',
304
block_name='filter', method=MethodType.UPDATE, cls=self.cls
308
def as_table(self) -> str:
309
"""Метод отдает Таблицу с хидером на просмотр"""
311
environment=environment, template_name=f'cls/table.html',
312
block_name='as_table', method=MethodType.GET, cls=self.cls
316
def as_table_update(self) -> str:
317
"""Метод отдает Таблицу с хидером на редакетирование"""
319
environment=environment, template_name=f'cls/table.html',
320
block_name='as_table', method=MethodType.UPDATE, cls=self
324
def as_table_widget(self) -> str:
325
"""Отдает виджет HTMX для построение таблицы"""
327
environment=environment,
328
template_name=f'cls/table.html',
330
method=MethodType.GET,
335
def as_card_kanban(self) -> str:
336
"""Метод отдает Таблицу с хидером на просмотр"""
337
return f'<div class="row" id="{self.key}">{self.lines.as_card_kanban}</div>'
340
def as_card_list(self) -> str:
341
"""Метод отдает Таблицу с хидером на просмотр"""
342
return f'<div class="row" id="{self.key}">{self.lines.as_card_list}</div>'
345
def as_filter_widget(self) -> str:
346
"""Отдает виджет HTMX для построение фильтра"""
348
environment=environment,
349
template_name=f'cls/filter.html',
355
def as_header_widget(self) -> str:
356
"""Отдает виджет HTMX для построения заголовка страницы обьекта"""
358
environment=environment,
359
template_name=f'cls/header.html',
365
def get_import(self) -> str:
366
"""Метод отдает фильтр , те столбцы с типами для HTMX шаблонов"""
368
environment=environment, template_name=f'cls/import.html',
369
block_name='import_get', method=MethodType.GET, cls=self.cls
373
def get_import_errors(self) -> str:
374
"""Метод отдает фильтр , те столбцы с типами для HTMX шаблонов"""
376
environment=environment, template_name=f'cls/import.html',
377
block_name='import_errors', method=MethodType.GET, cls=self.cls
383
Классконструктор модели для манипулирование уже их UI HTMX
385
_id: str | int = None
386
_lsn: int | None = None
393
def __setattr__(self, key, value):
394
super().__setattr__(key, value)
400
self._view.key = _get_key()
403
instance = copy.copy(self)
404
for k, attr in instance.__dict__.items():
405
if isinstance(attr, Field):
406
new_attr = attr.copy()
407
setattr(instance, k, new_attr)
411
new_attr.cls = instance
413
instance.p.cls = instance
415
instance.h.cls = instance
419
"""Если использовать конструктор как итератор, то он будет возвращать строки"""
421
line = self._lines[self.__state]
429
return f'{self.v.model.name}:{self._id}'
436
join_related: bool = False,
437
join_fields: list | None = None,
438
is_inline: bool = False,
439
key: str | None = None,
440
is_rel: bool = False,
441
vars: dict | None = None,
442
schema: BaseModel = None,
451
self._view.request = request
453
self._view.vars = vars
454
self._view.model = request.scope['env'][model]
455
assert self._view.model, 'Model is not defined'
456
self._view.model_name = self._view.model.name
457
self._view.is_rel = is_rel
458
self._view.actions = self._view.model.adapter.get_actions()
459
self._view.env = request.scope['env']
460
self._view.key = key or _get_key()
461
self._view.exclude = exclude or []
462
self._view.params = params or {}
463
self._view.join_related = join_related
464
self._view.join_fields = join_fields or []
465
self._view.schema = schema
466
self._view.submodels = {}
467
config_sort = self._view.model.sort
469
self._view.sort = {v: i for i, v in enumerate(config_sort)}
472
self._view.is_inline = is_inline
475
self._get_schema_fields(exclude=exclude)
479
return self.v.request
481
async def init(self, params: dict | None = None, join_related: bool = False,
482
data: list | dict = None, schema: BaseModel = None) -> None:
483
"""Майнинг данных по params"""
485
request_data = await self._view.request.json()
486
qp = clean_filter(request_data, self._view.key)
488
params = {i: v for i, v in qp[0].items() if v}
489
if isinstance(data, list):
490
data = {i['id']: i for i in data}
491
await self.get_data(params=params, data=data)
493
def _get_view_vars_by_fieldinfo(self, fielinfo: FieldInfo | None = None) -> ViewVars:
507
'required': fielinfo.is_required() if isinstance(fielinfo, FieldInfo) else False,
508
'title': fielinfo.title or str(fielinfo),
509
'hidden': fielinfo.json_schema_extra.get('hidden',
510
False) if fielinfo.json_schema_extra else False,
512
'color_map': fielinfo.json_schema_extra.get('color_map',
513
{}) if fielinfo.json_schema_extra else {},
515
'readonly': fielinfo.json_schema_extra.get('readonly',
516
False) if fielinfo.json_schema_extra else False,
518
'filter': fielinfo.json_schema_extra.get('filter',
519
{}) if fielinfo.json_schema_extra else {},
521
'table': fielinfo.json_schema_extra.get('table',
522
False) if fielinfo.json_schema_extra else False,
524
'form': fielinfo.json_schema_extra.get('form',
525
False) if fielinfo.json_schema_extra else False,
527
'description': fielinfo.description,
530
def _get_view_vars(self, fieldname: str, is_filter: bool) -> dict[str, ViewVars]:
531
"""Костыльный метод собирания ViewVars"""
532
if fieldname == 'id':
534
create_fields = self._view.model.schemas.create.model_fields | self._view.model.schemas.create.model_computed_fields
535
update_fields = self._view.model.schemas.update.model_fields | self._view.model.schemas.update.model_computed_fields
536
get_fields = self._view.model.schemas.get.model_fields | self._view.model.schemas.get.model_computed_fields
537
filter_fields = self._view.model.schemas.filter.model_fields | self._view.model.schemas.filter.model_computed_fields
538
create_fieldinfo = create_fields.get(fieldname)
539
update_fieldinfo = update_fields.get(fieldname)
540
get_fieldinfo = get_fields.get(fieldname)
541
filter_fieldinfo = filter_fields.get(fieldname)
543
'create': self._get_view_vars_by_fieldinfo(create_fieldinfo),
544
'update': self._get_view_vars_by_fieldinfo(
545
update_fieldinfo) if not is_filter else self._get_view_vars_by_fieldinfo(
547
'get': self._get_view_vars_by_fieldinfo(get_fieldinfo),
550
def _get_field(self, field_name: str, fields_merged: dict) -> Field:
552
Преобразование поля из Pydantic(Field) в схему Field для HTMX
554
fielinfo: FieldInfo = fields_merged[field_name]
557
model: Model | None = None
558
model_name: str = self._view.model.name
559
is_filter: bool = True if self._view.model.schemas.filter.model_fields.get(
560
field_name) else False
561
submodel: ClassView | None = None
562
if not isinstance(fielinfo, ComputedFieldInfo):
563
class_types: list = get_types(fielinfo.annotation, [])
565
class_types: list = [str]
567
if fielinfo.json_schema_extra:
568
if fielinfo.json_schema_extra.get('model'):
569
model_name = fielinfo.json_schema_extra.get('model')
570
model = self._view.env[model_name]
571
for i, c in enumerate(class_types):
574
if field_name == 'id':
577
elif issubclass(c, enum.Enum):
580
elif issubclass(c, BaseModel):
582
model_name = c.Config.orm_model.__tablename__
583
except Exception as ex:
584
model_name = c.Config.__name__.lower()
586
model = self._view.env[model_name]
587
submodel = ClassView(
588
request=self._view.request,
592
self._view.submodels.update({field_name: submodel})
594
res += c.__name__.lower()
596
if not model and model_name:
597
if model_name == self._view.model.name:
598
model = self._view.model
599
elif model_name != self._view.model.name:
600
model = self._view.env[model_name]
601
assert model, f'Model for field {field_name} is not defined'
603
**self._get_view_vars(field_name, is_filter),
604
'is_filter': is_filter,
605
'field_name': field_name,
606
'is_reserved': True if field_name in reserved_fields else False,
608
'model_name': model.name,
609
'domain_name': model.domain.name,
611
'sort_idx': self._view.sort.get(field_name, 999),
616
def _get_schema_fields(self, exclude: list = []) -> Fields:
617
"""Переделывает Pydantic схему на Схему для рендеринга в HTMX и Jinja2 - а зачем?"""
618
fields: list[tuple[str, Field]] = []
620
fields_merged = self._view.model.schemas.get.model_fields | self._view.model.schemas.get.model_computed_fields | self._view.model.schemas.filter.model_fields
621
for k, v in fields_merged.items():
624
f = self._get_field(field_name=k, fields_merged=fields_merged)
625
fields.append((k, f))
627
fields = sorted(fields, key=lambda x: x[1].sort_idx)
628
for field_name, field in fields:
629
setattr(self, field_name, field)
631
async def get_data(self, params: QueryParams | dict | None = None,
632
data: list | dict | None = None, ) -> None:
633
"""Метод собирает данные для конструктора модели"""
636
params = self._view.params
638
if not self._view.model.adapter.domain.domain_type == 'INTERNAL':
639
async with self._view.model.adapter as a:
640
resp_data = await a.list(params=params)
641
data = resp_data['data']
643
data_obj = await self.cls.model.service.list(_filter=params)
644
data = [i.__dict__ for i in data_obj]
645
self._view.data = {i['id']: i for i in data}
646
await self.fill_lines(self._view.data, self.v.join_related, self.v.join_fields)
648
async def fill_lines(self, data: dict | list, join_related: bool = False,
649
join_fields: list = []):
650
if isinstance(data, list):
651
data = {i['id']: i for i in data}
654
self._view.data = data
655
join_fields = join_fields or self._view.join_fields
656
for _id, row in data.items():
657
line_copied = self.copy() if self._lines else self
658
line_copied._id = row.get('id', id(line_copied))
659
line_copied._lsn = row.get('lsn')
660
for col in line_copied.get_fields():
661
val = row.get(col.field_name, None)
662
if col.type in ('date', 'datetime'):
663
if isinstance(val, datetime):
665
elif isinstance(val, str):
666
val = datetime.fromisoformat(val)
667
elif col.type == 'id':
670
elif col.type.endswith('list_rel'):
671
submodel = line_copied.v.submodels[col.field_name].copy()
672
val = await submodel.fill_lines(data=val, join_related=False)
673
submodel.v.key = col.key
674
submodel.v.parent_field = col
675
elif col.type.endswith('rel'):
676
submodel = line_copied.v.submodels[col.field_name].copy()
678
val = await submodel.fill_lines(data=[val], join_related=False)
679
submodel.v.key = col.key
680
submodel.v.parent_field = col
682
line_copied._lines.append(line_copied)
684
if join_related or join_fields:
685
missing_fields = defaultdict(list)
686
for _line in self.lines:
687
"""Достаем все релейтед обьекты у которых модуль отличается"""
688
assert _line.fields, "Проверяем что все поля есть"
689
for field_name, field in _line.fields.get_fields():
690
if field.type in ('uuid',):
693
missing_fields[field.field_name].append((field.val, field))
694
elif field.field_name in join_fields:
695
missing_fields[field.field_name].append((field.val, field))
697
for miss_key, miss_value in missing_fields.items():
699
_vals, _fields = [i[0] for i in miss_value], [i[1] for i in miss_value]
701
_corutine_data = None
702
if isinstance(_vals, list):
703
miss_value_str = ','.join([i for i in set(_vals) if i])
705
qp = {'id__in': miss_value_str}
706
_corutine_data = asyncio.create_task(
707
self.cls.env[_fields[0].model_name].adapter.list(params=qp))
708
to_serialize.append((_vals, _fields, _corutine_data))
709
for _vals, _fields, _corutine_data in to_serialize:
712
_data = await _corutine_data
713
_join_lines = {i['id']: i for i in _data['data']}
714
for _val, _field in zip(_vals, _fields):
715
if isinstance(_val, list):
718
__val = _join_lines.get(_v)
720
_new_vals.append(__val)
721
_field.val = _new_vals
723
_field.val = _join_lines.get(_val)
724
if _field.type == 'uuid':
726
elif _field.type == 'list_uuid':
727
_field.type = 'list_rel'
731
detail=f'Wrong field name {_field.field_name} in table model {_field.model}'
733
for col in missing_fields.keys():
734
for _field_name, _header_col in self.line_header.fields.get_fields():
735
if col == _field_name:
736
_header_col.type = _header_col.type.replace('uuid', 'rel')
737
_header_col.type = _header_col.type.replace('list_uuid', 'list_rel')
740
def get_fields(self, display_view: str = None, method: MethodType = MethodType.GET) -> Fields:
741
fields = [k for i, k in self.__dict__.items() if isinstance(k, Field)]
742
return _crud_filter(fields, method, display_view)
744
def render(self, template_name: str, block_name: str,
745
method: MethodType = MethodType.GET) -> str:
750
environment=environment,
751
template_name=template_name,
752
block_name=block_name,
757
def render_line(self, block_name: str, method: MethodType = MethodType.GET) -> str:
758
"""Рендеринг подразумеваем 1 лайны из шаблога line"""
760
environment=environment,
761
template_name='/line/line.html',
762
block_name=block_name,
771
def send_message(self, message: str) -> str:
772
"""Отправить пользователю сообщение """
774
environment=environment,
775
template_name=f'components/message.html',
776
block_name='success',
781
async def delete_lines(self, ids: list[uuid.UUID]) -> bool:
782
"""Метод удаления обьектов"""
784
await self.v.model.adapter.delete(id=_id)
787
async def get_lines(self, ids: list[uuid.UUID], join_related: bool = False) -> 'ClassView':
788
await self.get_data(params={'id__in': ids})
791
async def update_lines(self, data: dict, id: uuid.UUID) -> 'ClassView':
792
"""Метод обновления обьектов"""
794
for raw_line in data:
796
method_schema_obj = self.v.model.schemas.update(**raw_line)
797
except ValidationError as e:
798
raise HTTPException(status_code=406, detail=f"Error: {str(e)}")
799
_json = method_schema_obj.model_dump(mode='json', exclude_unset=True)
800
line = await self.v.model.adapter.update(id=id, json=_json)
801
new_data.append(line)
802
lines = await self.fill_lines(new_data)
805
async def create_lines(self, data: dict) -> 'ClassView':
806
"""Метод создания обьектов"""
808
for raw_line in data:
809
raw_line.update({'id': uuid.uuid4()})
811
method_schema_obj = self.v.model.schemas.create(**raw_line)
812
except ValidationError as e:
813
raise HTTPException(status_code=406, detail=f"Error: {str(e)}")
814
_json = method_schema_obj.model_dump(mode='json', exclude_unset=True)
815
line = await self.v.model.adapter.create(json=_json)
816
new_data.append(line)
817
await self.fill_lines(new_data)
820
async def get_action(self, action: str, ids: list[uuid.UUID], schema: BaseModel) -> str:
821
"""Метод отдает апдейт схему , те столбцы с типами для HTMX шаблонов"""
822
data = {k: ids if k == 'ids' else None for k, v in schema.model_fields.items()}
823
self.action_line = await self._get_line(schema=schema, type=LineType.ACTION)
824
self.action_lines = Lines(cls=self, class_key=self.key, line_header=self.action_line,
825
line_new=self.action_line)
826
await self.action_lines.get_data(
834
environment=environment,
835
template_name=f'cls/action.html',
836
block_name='action', cls=self, action=action
841
class Method(str, enum.Enum):
843
CREATE: str = 'create'
844
UPDATE: str = 'update'
845
DELETE: str = 'delete'
846
UPDATE_SAVE: str = 'save'
847
CREATE_SAVE: str = 'save_create'
848
DELETE_SAVE: str = 'delete_delete'
851
class BaseSchema(BaseModel):
854
method: Method = Method.GET
859
id: UUID4 | int = None
864
commit: Optional[bool] = False
866
@model_validator(mode='before')
867
def _filter(cls, value):
869
Так же убираем все пустые params
871
if f := value.get('filter'):
872
if isinstance(f, str):
874
value['filter'] = eval(f)
875
except TypeError as ex:
883
async def get_view(request: Request, schema: BaseSchema) -> ClassView:
884
body = await request.json() or {}