2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
19
#define USSL_SCRAMBLE_LEN 16
20
#define USSL_MAX_KEY_LEN 16
21
#define AUTH_TYPE_STRING_MAX_LEN 32
24
SEND_FIRST_NEGO_MESSAGE = 1,
25
DOING_SSL_HANSHAKE = 2,
29
SERVER_ACCEPT_CONNECTION = 1,
30
SERVER_ACK_NEGO_AND_AUTH = 2,
31
SERVER_ACK_NEGO_AND_SSL = 3,
34
enum ob_rpc_connection_type {
35
OB_CONNECTION_COMMON_TYPE,
36
OB_CONNECTION_AUTH_BYPASS_TYPE,
39
static const int MAX_FD_NUM = 1024 * 1024;
40
static uint8_t gs_connection_type_arr[MAX_FD_NUM];
42
int ussl_set_rpc_connection_type(int fd, int type)
45
if (fd >= 0 && fd < MAX_FD_NUM) {
46
gs_connection_type_arr[fd] = type;
53
void ussl_reset_rpc_connection_type(int fd)
55
if (fd >= 0 && fd < MAX_FD_NUM) {
56
gs_connection_type_arr[fd] = 0;
60
static void auth_type_to_str(int auth_type, char *buf, size_t len)
62
if (USSL_AUTH_NONE == auth_type) {
63
strncpy(buf, "NONE", len);
64
} else if (USSL_AUTH_SSL_HANDSHAKE == auth_type) {
65
strncpy(buf, "SSL_NO_ENCRYPT", len);
66
} else if (USSL_AUTH_SSL_IO == auth_type) {
67
strncpy(buf, "SSL_IO", len);
71
static void get_client_addr(int fd, char *buf, int len)
73
struct sockaddr_storage addr;
74
socklen_t sock_len = sizeof(addr);
75
if (0 != getsockname(fd, (struct sockaddr *)&addr, &sock_len)) {
76
ussl_log_warn("getsockname failed, fd:%d, errno:%d", fd, errno);
78
char src_addr[INET6_ADDRSTRLEN];
79
if (AF_INET == addr.ss_family) {
80
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
81
if (NULL != inet_ntop(AF_INET, &s->sin_addr, src_addr, INET_ADDRSTRLEN)) {
82
if (snprintf(buf, len, "%s:%d", src_addr, ntohs(s->sin_port)) < 0) {
83
ussl_log_warn("snprintf failed, errno:%d", errno);
86
ussl_log_warn("call inet_ntop for AF_INET failed, errno:%d", errno);
89
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
90
if (NULL != inet_ntop(AF_INET6, &s->sin6_addr, src_addr, INET6_ADDRSTRLEN)) {
91
if (snprintf(buf, len, "[%s]:%d", src_addr, ntohs(s->sin6_port)) < 0) {
92
ussl_log_warn("snprintf failed, errno:%d", errno);
95
ussl_log_warn("call inet_ntop for AF_INET6 failed, errno:%d", errno);
101
static int is_local_ip_address(const char *addr)
104
if (NULL != strstr(addr, "127.0.0.1")) {
110
static int handle_client_writable_event(ussl_sock_t *s)
115
int need_giveback = 0;
116
socklen_t len = sizeof(err);
117
clientfd_sk_t *cs = (clientfd_sk_t *)s;
118
if (-1 == (err = getsockopt(cs->fd, SOL_SOCKET, SO_ERROR, (void *)&so_error, &len))) {
119
ussl_log_error("call getsockopt failed, fd:%d, errno:%d", cs->fd, errno);
120
} else if (0 != so_error) {
121
ussl_log_warn("there is an error on the socket, fd:%d, so_error:%d", cs->fd, so_error);
123
// 1.remode EPOLLOUT & add EPOLLIN
124
cs->mask &= ~EPOLLOUT;
125
struct epoll_event event;
126
uint32_t new_flags = EPOLLIN | EPOLLERR;
127
int client_am = get_client_auth_methods();
128
if (0 != (err = libc_epoll_ctl(cs->ep->fd, EPOLL_CTL_MOD, cs->fd,
129
ussl_make_epoll_event(&event, new_flags, (ussl_sock_t *)cs)))) {
130
ussl_log_error("modify epoll flag failed, fd:%d, errno:%d", cs->fd, errno);
131
} else { // 3.send negotiation message
132
int need_send_negotiation = 1;
133
if (USSL_AUTH_NONE == client_am) {
134
if (1 == cs->fd_info.send_negotiation) {
135
need_send_negotiation = 1;
137
need_send_negotiation = 0;
141
need_send_negotiation = 1;
143
if (1 == need_send_negotiation) {
144
negotiation_message_t nego_msg;
145
nego_msg.type = client_am;
146
nego_msg.client_gid = cs->fd_info.client_gid;
147
if (0 != (err = send_negotiation_message(cs->fd, (char *)&nego_msg, sizeof(nego_msg)))) {
148
ussl_log_warn("send negotiation message failed, fd:%d, err:%d, errno:%d", cs->fd, err,
150
} else { // 4.add to timeout list (if needed)
152
char client_addr[IP_STRING_MAX_LEN] = {0};
153
get_client_addr(cs->fd, client_addr, IP_STRING_MAX_LEN);
154
char auth_type[AUTH_TYPE_STRING_MAX_LEN] = {0};
155
auth_type_to_str(nego_msg.type, auth_type, AUTH_TYPE_STRING_MAX_LEN);
156
ussl_log_info("client send negotiation message succ, fd:%d, addr:%s, auth_method:%s, gid:0x%lx",
157
cs->fd, client_addr, auth_type, cs->fd_info.client_gid);
158
if (USSL_AUTH_NONE == client_am) {
161
if (is_local_ip_address(client_addr)) {
164
cs->start_time = time(NULL);
165
add_to_timeout_list(&cs->timeout_link);
166
cs->fd_info.stage = SEND_FIRST_NEGO_MESSAGE;
174
if (0 != err || 0 != so_error || need_giveback) {
175
s->has_error = ((err != 0) || (so_error != 0)) ? 1 : 0;
181
static int client_do_ssl_handshake(clientfd_sk_t *cs)
185
err = ssl_do_handshake(cs->fd);
187
// stop timer and give back
190
} else if (EAGAIN == err) {
193
ussl_log_warn("client do ssl handshake failed, fd:%d, err:%d, errno:%d", cs->fd, err, errno);
200
static int handle_client_readable_event(ussl_sock_t *s)
203
clientfd_sk_t *cs = (clientfd_sk_t *)s;
204
char client_addr[IP_STRING_MAX_LEN] = {0};
205
get_client_addr(cs->fd, client_addr, IP_STRING_MAX_LEN);
206
char auth_type[AUTH_TYPE_STRING_MAX_LEN] = {0};
207
auth_type_to_str(cs->fd_info.auth_methods, auth_type, AUTH_TYPE_STRING_MAX_LEN);
208
if (SEND_FIRST_NEGO_MESSAGE == cs->fd_info.stage) {
210
char buf[USSL_BUF_LEN];
212
while ((rbytes = recv(cs->fd, buf, sizeof(buf), MSG_PEEK)) < 0 && EINTR == errno)
217
ussl_log_info("read EOF, fd:%d, src_addr:%s", cs->fd, client_addr);
218
} else if (rbytes < 0) {
219
if (EINTR == errno) {
221
} else if (EAGAIN == errno || EWOULDBLOCK == errno) {
227
ussl_log_warn("read failed, fd:%d, errno:%d", s->fd, errno);
229
} else if (rbytes < sizeof(negotiation_head_t)) {
230
ussl_log_warn("recv message is not complete, close connection, rbytes:%ld, fd:%d", rbytes, cs->fd);
233
} else { // get mag len & read msg
234
negotiation_head_t msg_head;
235
memcpy(&msg_head, buf, sizeof(msg_head));
236
if (NEGOTIATION_MAGIC != msg_head.magic) {
239
} else if (rbytes < sizeof(negotiation_head_t) + msg_head.len) {
240
ussl_log_warn("recv message is not complete, close connection, rbytes:%ld, fd:%d", rbytes, cs->fd);
244
while ((rbytes = recv(cs->fd, buf, sizeof(msg_head) + msg_head.len, 0)) < 0 &&
247
if (rbytes != sizeof(msg_head) + msg_head.len) {
248
ussl_log_warn("recv data failed, fd:%d, errno:%d, rbytes:%ld", cs->fd, errno, rbytes);
252
negotiation_message_t nego_msg;
253
memcpy(&nego_msg, buf + sizeof(msg_head), sizeof(nego_msg));
254
if (USSL_AUTH_SSL_HANDSHAKE == nego_msg.type || USSL_AUTH_SSL_IO == nego_msg.type) {
257
(ret = fd_enable_ssl_for_client(cs->fd, cs->fd_info.ssl_ctx_id, nego_msg.type))) {
259
ussl_log_error("create SSL failed, fd:%d, errno:%d", s->fd, errno);
261
ussl_log_info("client do ssl handshake first, fd:%d, addr:%s, auth_method:%s", cs->fd,
262
client_addr, auth_type);
263
ret = client_do_ssl_handshake(cs);
265
cs->fd_info.stage = DOING_SSL_HANSHAKE;
273
ussl_log_info("client do ssl handshake again, fd:%d, addr:%s, auth_method:%s", cs->fd,
274
client_addr, auth_type);
275
ret = client_do_ssl_handshake(cs);
280
int clientfd_sk_handle_event(clientfd_sk_t *s)
283
if (ussl_skt(s, OUT)) {
284
ret = handle_client_writable_event((ussl_sock_t *)s);
285
} else if (ussl_skt(s, IN)) {
286
ret = handle_client_readable_event((ussl_sock_t *)s);
291
void ussl_get_peer_addr(int fd, char *buf, int len)
293
struct sockaddr_storage addr;
294
socklen_t sock_len = sizeof(addr);
295
if (0 != getpeername(fd, (struct sockaddr *)&addr, &sock_len)) {
296
ussl_log_warn("getpeername failed, fd:%d, errno:%d", fd, errno);
298
char src_addr[INET6_ADDRSTRLEN];
299
if (AF_INET == addr.ss_family) {
300
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
301
if (NULL != inet_ntop(AF_INET, &s->sin_addr, src_addr, INET_ADDRSTRLEN)) {
302
if (snprintf(buf, len, "%s:%d", src_addr, ntohs(s->sin_port)) < 0) {
303
ussl_log_warn("snprintf failed, errno:%d", errno);
306
ussl_log_warn("call inet_ntop for AF_INET failed, errno:%d", errno);
309
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
310
if (NULL != inet_ntop(AF_INET6, &s->sin6_addr, src_addr, INET6_ADDRSTRLEN)) {
311
if (snprintf(buf, len, "[%s]:%d", src_addr, ntohs(s->sin6_port)) < 0) {
312
ussl_log_warn("snprintf failed, errno:%d", errno);
315
ussl_log_warn("call inet_ntop for AF_INET6 failed, errno:%d", errno);
321
static int acceptfd_handle_first_readable_event(acceptfd_sk_t *s)
324
char buf[USSL_BUF_LEN];
326
while ((rbytes = recv(s->fd, buf, sizeof(buf), MSG_PEEK)) < 0 && EINTR == errno);
327
negotiation_head_t *h = (typeof(h))buf;
328
char src_addr[IP_STRING_MAX_LEN] = {0};
329
ussl_get_peer_addr(s->fd, src_addr, IP_STRING_MAX_LEN);
333
ussl_log_info("read EOF, fd:%d, src_addr:%s", s->fd, src_addr);
334
} else if (rbytes < 0) {
335
if (EINTR == errno) {
336
} else if (EAGAIN == errno || EWOULDBLOCK == errno) {
342
ussl_log_info("recv failed, fd:%d, errno:%d, src_addr:%s", s->fd, errno, src_addr);
344
} else if (rbytes < sizeof(negotiation_head_t)) {
347
ussl_log_info("read EOF, fd:%d, src_addr:%s", s->fd, src_addr);
348
} else if (h->magic != NEGOTIATION_MAGIC) {
349
int need_dispatch = 0;
350
if (test_server_auth_methods(USSL_AUTH_NONE)) {
352
} else if (is_local_ip_address(src_addr)) {
353
ussl_log_info("local ip address:%s, need dispatch", src_addr);
355
} else if (is_net_keepalive_connection(rbytes, buf)) {
357
ussl_log_info("net keepalive negotation message, need dispatch, src:%s, fd:%d", src_addr, s->fd);
359
//if enable rpc auth bypass, all connections are allowed, including tableapi, liboblog,
360
//else, only tableapi connections are allowed
361
if (ussl_get_auth_bypass_flag()) {
362
ussl_log_info("rpc auth enable bypass, need dispatch, src:%s, fd:%d", src_addr, s->fd);
365
if (ob_judge_is_tableapi_pcode_from_raw_packet(buf, rbytes)) {
366
ussl_log_info("tableapi connection, need dispatch, src:%s, fd:%d", src_addr, s->fd);
371
if (0 == ussl_set_rpc_connection_type(s->fd, OB_CONNECTION_AUTH_BYPASS_TYPE)) {
373
ussl_log_warn("ussl_set_rpc_connection_type failed, need close, src:%s, fd:%d", src_addr, s->fd);
380
s->fd_info.client_gid = UINT64_MAX;
381
ussl_log_info("recv non-negotiation message, the fd will be dispatched, fd:%d, src_addr:%s, magic:0x%x",
382
s->fd, src_addr, h->magic);
384
char auth_type[AUTH_TYPE_STRING_MAX_LEN] = {0};
385
auth_type_to_str(get_server_auth_methods(), auth_type, AUTH_TYPE_STRING_MAX_LEN);
388
ussl_log_warn("connection is not allowed, fd:%d, src_addr:%s, server_auth_method:%s, "
389
"rbytes:%ld, magic:%x",
390
s->fd, src_addr, auth_type, rbytes, h->magic);
392
} else if (h->len + sizeof(*h) > rbytes) {
395
ussl_log_warn("recv message is not complete, close connection, rbytes:%ld, fd:%d", rbytes, s->fd);
397
while ((rbytes = recv(s->fd, buf, h->len + sizeof(negotiation_head_t), 0)) < 0 &&
400
if (rbytes != h->len + sizeof(negotiation_head_t)) {
403
ussl_log_warn("consume nego message failed, rbytes:%ld, fd:%d, errno:%d", rbytes, s->fd,
406
if (is_local_ip_address(src_addr)) {
408
//if observer use local loop ip to start service, there will be error here
410
s->fd_info.client_gid = UINT64_MAX;
411
ussl_log_info("local ip address:%s, dispatch after consume", src_addr);
413
negotiation_message_t *nego_message = (typeof(nego_message))(h + 1);
414
s->fd_info.client_gid = nego_message->client_gid;
415
char auth_type[AUTH_TYPE_STRING_MAX_LEN] = {0};
416
auth_type_to_str(nego_message->type, auth_type, AUTH_TYPE_STRING_MAX_LEN);
417
if (USSL_AUTH_NONE == nego_message->type) {
418
if (test_server_auth_methods(USSL_AUTH_NONE)) {
420
s->fd_info.client_gid = nego_message->client_gid;
421
ussl_log_info("auth mothod is NONE, the fd will be dispatched, fd:%d, src_addr:%s", s->fd,
423
} else if (ussl_get_auth_bypass_flag()) {
424
if (0 == ussl_set_rpc_connection_type(s->fd, OB_CONNECTION_AUTH_BYPASS_TYPE)) {
426
ussl_log_warn("enable bypass connection, allow connect, src:%s, fd:%d", src_addr, s->fd);
434
ussl_log_warn("ussl server not support mode:%s, fd:%d", auth_type, s->fd);
436
} else if (USSL_AUTH_SSL_IO == nego_message->type ||
437
USSL_AUTH_SSL_HANDSHAKE == nego_message->type) {
438
if (test_server_auth_methods(USSL_AUTH_SSL_IO) ||
439
test_server_auth_methods(USSL_AUTH_SSL_HANDSHAKE)) {
440
if (-1 == ssl_config_ctx_id) {
443
ussl_log_warn("ssl config not configured or not load completely!");
445
negotiation_message_t nego_message_ack;
446
nego_message_ack.type = nego_message->type;
447
int has_method_none = test_server_auth_methods(USSL_AUTH_NONE);
448
if (0 != fd_enable_ssl_for_server(s->fd, ssl_config_ctx_id, nego_message->type,
452
ussl_log_error("fd_enable_ssl_for_server failed, fd:%d", s->fd);
453
} else if (0 != send_negotiation_message(s->fd, (char *)&nego_message_ack,
454
sizeof(nego_message_ack))) {
457
ussl_log_warn("send_negotiation_message failed, auth-mode:%d, fd:%d",
458
nego_message->type, s->fd);
460
ussl_log_info("auth method is SSL_NO_ENCRYPT or SSL_IO, and the negotiation message "
461
"has be sent, fd:%d, src_addr:%s",
463
s->fd_info.stage = SERVER_ACK_NEGO_AND_SSL;
470
ussl_log_warn("ussl server not support mode:%s, fd:%d", auth_type, s->fd);
479
static int acceptfd_handle_ssl_event(acceptfd_sk_t *s)
482
char src_addr[IP_STRING_MAX_LEN] = {0};
483
ussl_get_peer_addr(s->fd, src_addr, IP_STRING_MAX_LEN);
484
ret = ssl_do_handshake(s->fd);
487
ussl_log_info("ssl_do_handshake succ, fd:%d, client_gid:%lu, src_addr:%s", s->fd, s->fd_info.client_gid, src_addr);
488
} else if (EAGAIN == ret) {
491
ussl_log_warn("ssl_do_handshake failed, fd:%d, ret:%d, src_addr:%s", s->fd, ret, src_addr);
496
int acceptfd_sk_handle_event(acceptfd_sk_t *s)
499
if (ussl_skt(s, IN)) {
500
if (SERVER_ACCEPT_CONNECTION == s->fd_info.stage) {
501
ret = acceptfd_handle_first_readable_event(s);
502
} else if (SERVER_ACK_NEGO_AND_SSL == s->fd_info.stage) {
503
ret = acceptfd_handle_ssl_event(s);
509
int ussl_check_pcode_mismatch_connection(int fd, uint32_t pcode)
512
if (fd >= 0 && fd < MAX_FD_NUM) {
513
ret = (gs_connection_type_arr[fd] & OB_CONNECTION_AUTH_BYPASS_TYPE) &&
514
!ob_is_bypass_pcode(pcode);