openvpn-2fa-otp-freeradius-ldap

Форк
0
112 строк · 4.2 Кб
1
from typing import List, TypedDict
2
from ldap3 import REUSABLE, ASYNC, Server, Connection, ServerPool, SUBTREE, core, AUTO_BIND_TLS_BEFORE_BIND, IP_V4_ONLY
3
import yaml
4
from base64 import b64encode, b64decode
5
from config import LDAPConfig
6
import radiusd
7
from util import log_msg
8

9

10
def connect_async(config: LDAPConfig, server: Server)->Connection | None:
11
    try:
12
        connection = Connection(
13
            server=server,
14
            user=config["bind_dn"],
15
            password=config["password"],
16
            read_only=True,
17
            client_strategy=ASYNC
18
        )
19
        if not connection.bind():
20
            log_msg(
21
                radiusd.L_ERR, 'Can not connect to {0}'.format(config["host"]))
22
            return None
23
        return connection
24
    except Exception as ex:
25
        log_msg(radiusd.L_ERR, 'Can not connect to {0}. Exception message {1}'.format(
26
            config["host"], ex))
27
        return None
28
    
29

30

31

32

33
class LDAPConnection:
34
    def __init__(self, config: LDAPConfig):
35
        self.config = config
36

37
        self.server = Server(
38
            self.config["host"],
39
            port=config["port"],
40
            use_ssl=config["use_ssl"],
41
            get_info='NO_INFO',
42
            connect_timeout=10,  # 10 seconds
43
            mode=IP_V4_ONLY,
44
        )
45
        self.connection = connect_async(config, self.server)
46
        # try:
47
        #     self.connection = Connection(
48
        #         server=self.server,
49
        #         user=config["bind_dn"],
50
        #         password=config["password"],
51
        #         read_only=True,
52
        #         client_strategy=ASYNC
53
        #     )
54
        #     if not self.connection.bind():
55
        #         log_msg(
56
        #             radiusd.L_ERR, 'Can not connect to {0}'.format(self.config["host"]))
57
        # except Exception as ex:
58
        #     log_msg(radiusd.L_ERR, 'Can not connect to {0}. Exception message {1}'.format(
59
        #         self.config["host"], ex))
60

61
    def authenticate(self, login: str, pwd: str) -> bool:
62
        filter = self.config["search_filter"] % login
63
        response = None
64
        result = None
65
        if self.connection is None:
66
            log_msg(
67
                radiusd.L_WARN, "No connection to ldap server. Is LDAP server alive? Trying to reconnect")
68
            self.connection = connect_async(self.config, self.server)
69
            if self.connection is None:
70
                log_msg(radiusd.L_ERR, 'Can not connect to {0}'.format(
71
                    self.config["host"]))
72
                return False
73
        if self.connection.closed:
74
            log_msg(
75
                radiusd.L_DBG, "Connection to ldap server is closed. Trying to reconnect.")
76
            self.connection = connect_async(self.config, self.server)
77
            if self.connection is None:
78
                log_msg(radiusd.L_ERR, 'Can not reconnect to {0}'.format(
79
                    self.config["host"]))
80
                return False
81
        try:
82
            msg_id = self.connection.search(
83
                search_base=self.config["search_base"],
84
                search_filter=filter,
85
                search_scope=SUBTREE,
86
            )
87
            response, result = self.connection.get_response(msg_id, timeout=10)
88
        except Exception as ex:
89
            log_msg(
90
                radiusd.L_ERR, "Trying to search user {0}. Exception message: {1}".format(login, ex))
91
            return False
92
        if result['result'] != 0:
93
            log_msg(
94
                radiusd.L_ERR, "protocol error. Code: {}\n{}".format(result, response))
95
            return False
96
        length = len(response)
97
        if length == 0:
98
            log_msg(
99
                radiusd.L_INFO,"user {} not found in LDAP".format(login))
100
            return False
101
        dn = response[0]["dn"]
102
        log_msg(
103
                radiusd.L_DBG,"found user dn {}".format(dn))
104
        auth_result = self.connection.rebind(user=dn, password=pwd)
105
        result = self.connection.rebind(
106
            user=self.config["bind_dn"], password=self.config["password"])
107
        if result is False:
108
            log_msg(radiusd.L_ERR, "error rebinding connection to bind_dn user. Consider user {} is not authenticated.".format(login))
109
            return False
110
        if auth_result is False:
111
            return False
112
        return True

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.