openvpn-2fa-otp-freeradius-ldap
50 строк · 1.5 Кб
1import radiusd2from sqlalchemy import select, create_engine, URL, String3from sqlalchemy.orm import DeclarativeBase, Session4from typing import Optional, TypedDict5from sqlalchemy.orm import Mapped6from sqlalchemy.orm import mapped_column7from config import DBConfig8from util import log_msg9
10class Base(DeclarativeBase):11pass12
13
14class User(Base):15__tablename__ = "users"16
17id: Mapped[int] = mapped_column(primary_key=True)18login: Mapped[str] = mapped_column(String[200])19otp_secret: Mapped[str] = mapped_column(String[50])20
21
22class DBConnection:23def __init__(self, config: DBConfig):24self.config = config25self.conn = self.connect()26
27def connect(self):28url_object = URL.create(29"postgresql+psycopg2",30username=self.config["user"],31password=self.config["password"],32host=self.config["host"],33port=self.config["port"],34database=self.config["db_name"],35)36return create_engine(url_object, echo=False)37
38def get_otp_secret(self, login: str) -> Optional[str]:39# select user40try:41with Session(self.conn) as session:42stmt = select(User.otp_secret).filter_by(login=login)43otp_secret = session.execute(stmt).first()44if otp_secret is None:45return None46return otp_secret[0]47except Exception as e:48log_msg(radiusd.L_ERR,49"{}".format(e))50return None51