lavkach3
105 строк · 3.6 Кб
1import os2from enum import Enum3from uuid import UUID4
5import yaml6from sqlalchemy import select7from starlette.exceptions import HTTPException8
9#from app.basic.user.models import Role
10#from core.service.base import BaseService
11
12Allow = 'ALLOW'13Deny = 'Deny'14
15
16class Permissions(Enum):17...18
19
20def get_all_permit_fiels():21paths = []22path = os.path.abspath(__file__).split('/')23''.join(path[:-3])24for root, dirs, files in os.walk('/'.join(path[:-3]) + '/app'):25for file in files:26if file.endswith(".yaml"):27paths.append(os.path.join(root, file))28return paths29
30
31paths = get_all_permit_fiels()32permits = {}33for path in paths:34with open(path, "r") as stream:35try:36yam = yaml.safe_load(stream)37if yam:38for k, v in yam.get('permits').items():39permits.update(40{k: v}41)42except yaml.YAMLError as exc:43print(exc)44
45
46def permit(*arg):47"""48На вход дается пермит, далее логика такова
49Ищется в списке по пермиту роли, если роль сразу есть в списке пользователя, то функция дает True и заканчивается
50Если же роль не найдена, то функия идет рекурсивно по родителям ролей и так же пытается найти роль пользователя в родителях
51"""
52
53def inner_decorator(f):54async def wrapped(*args, **kwargs):55
56service = None57res = False58for a in args:59if isinstance(a, object):60service = a61break62if not service:63raise HTTPException(status_code=403, detail=f"User not found")64role = service.env['role'].model65if not service.user.is_admin:66service_roles = [UUID(i) for i in service.user.role_ids]67query = select(role).where(68role.permission_allow_list.contains(arg)69).where(70role.company_id.in_(service.user.company_ids)71)72result = await service.session.execute(query)73roles = result.scalars().all()74
75if set(service_roles) & set([i.id for i in roles]):76res = True77if not res:78parents = []79for r in roles:80if r.role_ids:81parents += r.role_ids82while parents:83query = select(role).where(84role.id.in_(parents)85).where(86role.company_id.in_(service.user.company_ids)87)88result = await service.session.execute(query)89roles = result.scalars().all()90if set(service_roles) & set([i.id for i in roles]):91res = True92break93parents = []94for r in roles:95if r.parents:96parents += r.parents97if not res:98raise HTTPException(status_code=403,99detail=f"The user ({service.user}) does not have permission to {arg}")100response = f(*args, **kwargs)101return await response102
103return wrapped104
105return inner_decorator106