2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
16
#include "util/easy_mod_stat.h"
17
#include "observer/mysql/obmp_connect.h"
18
#include "lib/mysqlclient/ob_mysql_result.h"
19
#include "lib/net/ob_net_util.h"
20
#include "lib/string/ob_sql_string.h"
21
#include "lib/oblog/ob_log.h"
22
#include "lib/stat/ob_session_stat.h"
23
#include "lib/mysqlclient/ob_mysql_proxy.h"
24
#include "common/object/ob_object.h"
25
#include "common/ob_string_buf.h"
26
#include "share/schema/ob_multi_version_schema_service.h"
27
#include "share/schema/ob_schema_getter_guard.h"
28
#include "share/ob_cluster_version.h"
29
#include "share/ob_get_compat_mode.h"
30
#include "share/resource_manager/ob_resource_manager.h"
31
#include "rpc/ob_request.h"
32
#include "rpc/obmysql/packet/ompk_ok.h"
33
#include "rpc/obmysql/packet/ompk_error.h"
34
#include "sql/engine/expr/ob_expr_operator.h"
35
#include "sql/session/ob_sql_session_mgr.h"
36
#include "sql/ob_sql.h"
37
#include "observer/ob_server.h"
38
#include "rpc/obmysql/obsm_struct.h"
39
#include "observer/omt/ob_multi_tenant.h"
40
#include "observer/omt/ob_tenant.h"
41
#include "observer/ob_req_time_service.h"
42
#include "storage/tx/wrs/ob_weak_read_util.h" //ObWeakReadUtil
43
#include "sql/monitor/ob_security_audit_utils.h"
44
#include "sql/privilege_check/ob_privilege_check.h"
45
#include "sql/privilege_check/ob_ora_priv_check.h"
47
using namespace oceanbase::share;
48
using namespace oceanbase::common;
49
using namespace oceanbase::sql;
50
using namespace oceanbase::obmysql;
51
using namespace oceanbase::observer;
52
using namespace oceanbase::share::schema;
58
ObString extract_user_name(const ObString &in)
61
if (in.length() > 1 && '\'' == in[0] && '\'' == in[in.length() - 1]) {
62
user_name.assign_ptr(in.ptr() + 1, in.length() - 2);
69
int extract_user_tenant(const ObString &in, ObString &user_name, ObString &tenant_name)
72
const char *user_pos = in.ptr();
73
const char *at_pos = in.find('@'); // use @ as seperator, e.g. xiaochu@tenant
74
const char *tenant_pos = at_pos + 1;
77
user_name = extract_user_name(in);
78
tenant_name = ObString::make_empty_string();
79
LOG_INFO("username and tenantname", K(user_name), K(tenant_name));
81
// Accept empty username. Empty username is one of normal
82
// usernames that we can create user with empty name.
85
if (at_pos - user_pos < 0) {
86
ret = OB_ERR_USER_EMPTY;
87
LOG_WARN("Must Provide user name to login", K(ret));
89
int64_t tenant_len = in.length() - (tenant_pos - user_pos);
90
if (tenant_len > OB_MAX_TENANT_NAME_LENGTH || tenant_len <= 0) {
91
ret = OB_ERR_INVALID_TENANT_NAME;
92
LOG_WARN("Violate with tenant length limit", "max", OB_MAX_TENANT_NAME_LENGTH, "actual", tenant_len, K(ret));
96
ObString username(at_pos - user_pos, user_pos);
97
ObString tenantname(in.length() - (tenant_pos - user_pos), tenant_pos);
98
user_name = extract_user_name(username);
99
tenant_name = tenantname;
100
LOG_DEBUG("username and tenantname", K(user_name), K(tenant_name));
107
int extract_tenant_id(const ObString &tenant_name, uint64_t &tenant_id)
109
int ret = OB_SUCCESS;
110
tenant_id = OB_INVALID_ID;
112
if (tenant_name.empty()) {
113
tenant_id = OB_SYS_TENANT_ID; // default to sys tenant
115
if (OB_ISNULL(GCTX.schema_service_)) {
116
ret = OB_ERR_UNEXPECTED;
117
LOG_ERROR("invalid schema service", K(ret), K(GCTX.schema_service_));
120
share::schema::ObSchemaGetterGuard guard;
121
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
122
LOG_WARN("get_schema_guard failed", K(ret));
123
} else if (OB_FAIL(guard.get_tenant_id(tenant_name, tenant_id))) {
124
LOG_WARN("get_tenant_id failed", K(ret), K(tenant_name));
132
} // namespace observer
133
} // namespace oceanbase
135
ObMPConnect::ObMPConnect(const ObGlobalContext &gctx)
141
deser_ret_(OB_SUCCESS)
143
client_ip_buf_[0] = '\0';
144
user_name_var_[0] = '\0';
145
db_name_var_[0] = '\0';
148
ObMPConnect::~ObMPConnect()
153
int ObMPConnect::deserialize()
155
int ret = OB_SUCCESS;
157
ObSMConnection *conn = get_conn();
159
if (OB_ISNULL(conn)) {
160
ret = OB_ERR_UNEXPECTED;
161
LOG_ERROR("invalid conn", K(ret), K(conn));
162
} else if (OB_ISNULL(req_)) {
163
ret = OB_ERR_UNEXPECTED;
164
LOG_ERROR("invalid req_", K(ret), K(req_));
166
hsr_ = reinterpret_cast<const OMPKHandshakeResponse&>(req_->get_packet());
167
if (OB_FAIL(hsr_.decode())) {
168
LOG_WARN("decode hsr fail", K(ret));
170
conn->cap_flags_ = hsr_.get_capability_flags();
171
conn->client_cs_type_ = hsr_.get_char_set();
172
conn->sql_req_level_ = hsr_.get_sql_request_level();
174
if (hsr_.is_obproxy_client_mode()) {
175
conn->is_proxy_ = true;
177
if (hsr_.is_java_client_mode()) {
178
conn->is_java_client_ = true;
180
if (hsr_.is_oci_client_mode()) {
181
conn->is_oci_client_ = true;
183
if (hsr_.is_jdbc_client_mode()) {
184
conn->is_jdbc_client_ = true;
187
if (hsr_.is_ob_client_jdbc()) {
188
conn->client_type_ = common::OB_CLIENT_JDBC;
189
} else if (hsr_.is_ob_client_oci()) {
190
conn->client_type_ = common::OB_CLIENT_OCI;
192
conn->client_type_ = common::OB_CLIENT_NON_STANDARD;
194
db_name_ = hsr_.get_database();
195
LOG_DEBUG("database name", K(hsr_.get_database()));
198
deser_ret_ = ret; // record deserialize ret code.
199
ret = OB_SUCCESS; // return OB_SUCCESS anyway.
204
int ObMPConnect::init_process_single_stmt(const ObMultiStmtItem &multi_stmt_item,
205
ObSQLSessionInfo &session,
206
bool has_more_result) const
208
int ret = OB_SUCCESS;
209
const ObString &sql = multi_stmt_item.get_sql();
210
ObVirtualTableIteratorFactory vt_iter_factory(*gctx_.vt_iter_creator_);
211
ObSessionStatEstGuard stat_est_guard(get_conn()->tenant_->id(), session.get_sessid());
212
ObSchemaGetterGuard schema_guard;
213
// init_connect可以执行query和dml语句,必须加上req_timeinfo_guard
214
observer::ObReqTimeGuard req_timeinfo_guard;
215
//Do not change the order of SqlCtx and Allocator. ObSqlCtx uses the resultset's allocator to
216
//allocate memory for ObSqlCtx::base_constraints_. The allocator must be deconstructed after sqlctx.
217
ObArenaAllocator allocator(ObModIds::OB_SQL_SESSION);
219
ctx.exec_type_ = MpQuery;
220
session.init_use_rich_format();
221
if (OB_FAIL(init_process_var(ctx, multi_stmt_item, session))) {
222
LOG_WARN("init process var failed.", K(ret), K(multi_stmt_item));
223
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
224
session.get_effective_tenant_id(), schema_guard))) {
225
LOG_WARN("get schema guard failed.", K(ret));
226
} else if (OB_FAIL(set_session_active(sql, session, ObTimeUtil::current_time()))) {
227
LOG_WARN("fail to set session active", K(ret));
229
const bool enable_trace_log = lib::is_trace_log_enabled();
230
if (enable_trace_log) {
231
//set session log_level.Must use ObThreadLogLevelUtils::clear() in pair
232
ObThreadLogLevelUtils::init(session.get_log_id_level_map());
234
ctx.retry_times_ = 0; // 这里是建立连接的时候的初始化sql的执行,不重试
235
ctx.schema_guard_ = &schema_guard;
236
HEAP_VAR(ObMySQLResultSet, result, session, allocator) {
237
result.set_has_more_result(has_more_result);
238
result.get_exec_context().get_task_exec_ctx().set_min_cluster_version(GET_MIN_CLUSTER_VERSION());
239
if (OB_FAIL(result.init())) {
240
LOG_WARN("result set init failed");
241
} else if (OB_ISNULL(gctx_.sql_engine_)) {
242
ret = OB_ERR_UNEXPECTED;
243
LOG_ERROR("invalid sql engine", K(ret), K(gctx_));
244
} else if (OB_FAIL(gctx_.sql_engine_->stmt_query(sql, ctx, result))) {
245
LOG_WARN("sql execute failed", K(multi_stmt_item), K(sql), K(ret));
247
int open_ret = result.open();
249
LOG_WARN("failed to do result set open", K(open_ret));
251
if (OB_FAIL(result.close())) {
252
LOG_WARN("result close failed, disconnect.", K(ret));
254
ret = (open_ret != OB_SUCCESS) ? open_ret : ret;
256
if (enable_trace_log) {
257
ObThreadLogLevelUtils::clear();
261
//对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret
262
int tmp_ret = OB_SUCCESS;
263
tmp_ret = do_after_process(session, ctx, false); // 不是异步回包
269
int ObMPConnect::init_connect_process(ObString &init_sql,
270
ObSQLSessionInfo &session) const
272
int ret = OB_SUCCESS;
273
ObSEArray<ObString, 4> queries;
274
ObArenaAllocator allocator(ObModIds::OB_SQL_PARSER);
275
ObParser parser(allocator, session.get_sql_mode(), session.get_charsets4parser());
276
ObMPParseStat parse_stat;
277
if (OB_SUCC(parser.split_multiple_stmt(init_sql, queries, parse_stat))) {
278
if (OB_UNLIKELY(0 == queries.count())) {
279
ret = OB_ERR_UNEXPECTED;
280
LOG_WARN("empty query!", K(ret), K(init_sql));
283
ARRAY_FOREACH(queries, i) {
284
has_more = (queries.count() > i + 1);
285
if (OB_FAIL(init_process_single_stmt(ObMultiStmtItem(true, i, queries[i]), session, has_more))) {
286
LOG_WARN("process single stmt failed!", K(ret), K(queries[i]));
290
LOG_WARN("split multiple stmt failed!", K(ret));
295
int ObMPConnect::process()
297
int ret = deser_ret_;
298
ObSMConnection *conn = NULL;
299
uint64_t tenant_id = OB_INVALID_ID;
300
ObSQLSessionInfo *session = NULL;
301
bool autocommit = false;
302
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
303
THIS_WORKER.set_timeout_ts(INT64_MAX); // avoid see a former timeout value
304
if (THE_TRACE != nullptr) {
308
LOG_ERROR("deserialize failed", K(ret));
309
} else if (OB_ISNULL(req_)) {
310
ret = OB_ERR_UNEXPECTED;
311
LOG_ERROR("null ez_req", K(ret));
312
} else if (OB_ISNULL(conn = get_conn())) {
313
ret = OB_ERR_UNEXPECTED;
314
LOG_ERROR("null conn", K(ret));
315
} else if (OB_ISNULL(GCTX.session_mgr_)) {
316
ret = OB_ERR_UNEXPECTED;
317
LOG_ERROR("session mgr is NULL", K(ret));
319
if (OB_FAIL(conn->ret_)) {
320
LOG_WARN("connection fail at obsm_handle process", K(conn->ret_));
321
} else if (OB_FAIL(get_user_tenant(*conn))) {
322
LOG_WARN("get user name and tenant name failed", K(ret));
323
} else if ((SS_INIT == GCTX.status_ || SS_STARTING == GCTX.status_)
324
&& !tenant_name_.empty()
325
&& 0 != tenant_name_.compare(OB_SYS_TENANT_NAME)) {
326
// accept system tenant for bootstrap, do not let other users login before observer start service
327
ret = OB_SERVER_IS_INIT;
328
LOG_WARN("server is initializing", K(ret));
329
} else if (SS_STOPPING == GCTX.status_) {
330
ret = OB_SERVER_IS_STOPPING;
331
LOG_WARN("server is stopping", K(ret));
332
} else if (OB_FAIL(check_update_tenant_id(*conn, tenant_id))) {
333
LOG_WARN("fail to check update tenant id", K(ret));
334
} else if (OB_FAIL(guard.switch_to(tenant_id))) {
335
LOG_WARN("switch to tenant fail", K(ret), K(tenant_id));
336
} else if (OB_FAIL(check_client_property(*conn))) {
337
LOG_WARN("check_client_property fail", K(ret));
338
} else if (OB_FAIL(verify_connection(tenant_id))) {
339
LOG_WARN("verify connection fail", K(ret));
340
} else if (OB_FAIL(create_session(conn, session))) {
341
LOG_WARN("alloc session fail", K(ret));
342
} else if (OB_ISNULL(session)) {
343
ret = OB_ERR_UNEXPECTED;
344
LOG_ERROR("null session", K(ret), K(session));
345
} else if (OB_FAIL(verify_identify(*conn, *session, tenant_id))) {
346
LOG_WARN("fail to verify_identify", K(ret));
347
} else if (OB_FAIL(process_kill_client_session(*session, true))) {
348
LOG_WARN("client session has been killed", K(ret));
349
} else if (OB_FAIL(update_transmission_checksum_flag(*session))) {
350
LOG_WARN("update transmisson checksum flag failed", K(ret));
351
} else if (OB_FAIL(update_proxy_sys_vars(*session))) {
352
LOG_WARN("update_proxy_sys_vars failed", K(ret));
353
} else if (OB_FAIL(update_charset_sys_vars(*conn, *session))) {
354
LOG_WARN("fail to update charset sys vars", K(ret));
355
} else if (OB_FAIL(setup_user_resource_group(*conn, tenant_id, session))) {
356
LOG_WARN("fail setup user resource group", K(ret));
358
// set connection info to session
359
session->set_ob20_protocol(conn->proxy_cap_flags_.is_ob_protocol_v2_support());
360
// set sql request level to session, to avoid sql request dead lock between OB cluster (eg. dblink)
361
session->set_sql_request_level(conn->sql_req_level_);
362
// set session var sync info.
363
session->set_session_var_sync(conn->proxy_cap_flags_.is_session_var_sync_support());
364
// proxy mode & direct mode
365
session->set_client_sessid_support(conn->proxy_cap_flags_.is_client_sessid_support()
366
|| (conn->proxy_sessid_ == 0));
367
session->set_session_sync_support(conn->proxy_cap_flags_.is_session_sync_support());
368
session->get_control_info().support_show_trace_ = conn->proxy_cap_flags_.is_flt_show_trace_support();
369
LOG_TRACE("setup user resource group OK",
370
"user_id", session->get_user_id(),
373
"group_id", conn->group_id_,
374
"sql_req_level", conn->sql_req_level_);
375
conn->set_auth_phase();
376
session->get_autocommit(autocommit);
380
char client_ip_buf[OB_IP_STR_BUFF] = {};
381
if (!get_peer().ip_to_string(client_ip_buf, OB_IP_STR_BUFF)) {
382
LOG_WARN("fail to ip to string");
383
snprintf(client_ip_buf, OB_IP_STR_BUFF, "xxx.xxx.xxx.xxx");
385
char host_name_buf[OB_IP_STR_BUFF] = {};
386
if (NULL != session && !session->get_client_ip().empty()) {
387
session->get_host_name().to_string(host_name_buf, OB_IP_STR_BUFF);
389
snprintf(host_name_buf, OB_IP_STR_BUFF, "xxx.xxx.xxx.xxx");
391
const ObString host_name(host_name_buf);
392
const ObCSProtocolType protoType = conn->get_cs_protocol_type();
393
const uint32_t sessid = conn->sessid_;
394
const uint64_t proxy_sessid = conn->proxy_sessid_;
395
const uint32_t client_sessid = conn->client_sessid_;
396
const int64_t sess_create_time = conn->sess_create_time_;
397
const uint32_t capability = conn->cap_flags_.capability_;
398
const bool from_proxy = conn->is_proxy_;
399
const bool from_java_client = conn->is_java_client_;
400
const bool from_oci_client = conn->is_oci_client_;
401
const bool from_jdbc_client = conn->is_jdbc_client_;
402
const bool use_ssl = conn->cap_flags_.cap_flags_.OB_CLIENT_SSL;
403
const uint64_t proxy_capability = conn->proxy_cap_flags_.capability_;
405
if (OB_SUCC(proc_ret)) {
406
// send packet for client
408
ok_param.is_on_connect_ = true;
409
ok_param.affected_rows_ = 0;
410
if (OB_FAIL(send_ok_packet(*session, ok_param))) {
411
LOG_WARN("fail to send ok packet", K(ok_param), K(ret));
414
char buf[OB_MAX_ERROR_MSG_LEN];
416
case OB_PASSWORD_WRONG:
417
case OB_ERR_INVALID_TENANT_NAME: {
418
ret = OB_PASSWORD_WRONG;
419
snprintf(buf, OB_MAX_ERROR_MSG_LEN, ob_errpkt_str_user_error(ret, lib::is_oracle_mode()),
420
user_name_.length(), user_name_.ptr(),
421
host_name.length(), host_name.ptr(),
422
(hsr_.get_auth_response().empty() ? "NO" : "YES"));
425
case OB_CLUSTER_NO_MATCH: {
426
snprintf(buf, OB_MAX_ERROR_MSG_LEN, ob_errpkt_str_user_error(ret, lib::is_oracle_mode()),
427
GCONF.cluster.str());
435
if (OB_FAIL(send_error_packet(ret, buf))) {
436
LOG_WARN("response fail packet fail", K(ret));
440
if (NULL != session) {
441
#ifdef OB_BUILD_AUDIT_SECURITY
442
ObSqlString comment_text;
443
(void)comment_text.append_fmt("LOGIN: tenant_name=%.*s, user_name=%.*s, client_ip=%.*s, "
444
"sessid=%u, proxy_sessid=%lu, "
445
"capability=%X, proxy_capability=%lX, use_ssl=%s, protocol=%s",
446
tenant_name_.length(), tenant_name_.ptr(),
447
user_name_.length(), user_name_.ptr(),
448
host_name.length(), host_name.ptr(),
453
use_ssl ? "true" : "false",
454
get_cs_protocol_type_name(protoType));
456
(void)ObSecurityAuditUtils::handle_security_audit(*session,
458
ObString::make_string("CONNECT"),
459
comment_text.string(),
462
// oracle temp table need to be refactored
463
//if (OB_SUCCESS == proc_ret) {
464
// proc_ret = session->drop_reused_oracle_temp_tables();
466
//Action!!:must revert it after no use it
467
revert_session(session);
469
common::ObTenantStatEstGuard guard(tenant_id);
470
if (OB_SUCCESS != proc_ret) {
471
if (NULL != session) {
475
EVENT_INC(SQL_USER_LOGONS_FAILED_CUMULATIVE);
476
EVENT_INC(SQL_USER_LOGOUTS_CUMULATIVE);
479
EVENT_INC(SQL_USER_LOGONS_CUMULATIVE);
480
EVENT_ADD(SQL_USER_LOGONS_COST_TIME_CUMULATIVE, common::ObTimeUtility::current_time() - req_->get_receive_timestamp());
482
LOG_INFO("MySQL LOGIN", "direct_client_ip", client_ip_buf, K_(client_ip),
483
K_(tenant_name), K(tenant_id), K_(user_name), K(host_name),
484
K(sessid), K(proxy_sessid), K(client_sessid), K(from_proxy),
485
K(from_java_client), K(from_oci_client), K(from_jdbc_client),
486
K(capability), K(proxy_capability), K(use_ssl),
487
"c/s protocol", get_cs_protocol_type_name(protoType),
488
K(autocommit), K(proc_ret), K(ret), K(conn->client_type_), K(conn->client_version_));
494
inline bool is_inner_proxyro_user(const ObSMConnection &conn, const ObString &user_name)
496
const static ObString PROXYRO_USERNAME(OB_PROXYRO_USERNAME);
497
const static ObString PROXYRO_HOSTNAME(OB_DEFAULT_HOST_NAME);
498
return (!conn.is_proxy_
499
&& !conn.is_java_client_
500
&& OB_SYS_TENANT_ID == conn.tenant_id_
501
&& 0 == PROXYRO_USERNAME.compare(user_name));
504
inline void reset_inner_proxyro_scramble(
505
ObSMConnection &conn,
506
oceanbase::share::schema::ObUserLoginInfo &login_info)
508
const ObString PROXYRO_OLD_SCRAMBLE("aaaaaaaabbbbbbbbbbbb");
509
MEMCPY(conn.scramble_buf_, PROXYRO_OLD_SCRAMBLE.ptr(), PROXYRO_OLD_SCRAMBLE.length());
510
login_info.scramble_str_.assign_ptr(conn.scramble_buf_, sizeof(conn.scramble_buf_));
513
int ObMPConnect::load_privilege_info(ObSQLSessionInfo &session)
515
LOG_DEBUG("load privilege info");
516
int ret = OB_SUCCESS;
517
ObSMConnection *conn = get_conn();
518
ObSchemaGetterGuard schema_guard;
519
if (OB_ISNULL(gctx_.schema_service_) || OB_ISNULL(conn)) {
520
ret = OB_INVALID_ARGUMENT;
521
LOG_WARN("invalid argument", K(gctx_.schema_service_));
522
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(conn->tenant_id_, schema_guard))) {
523
LOG_WARN("get schema guard failed", K(ret));
526
if (conn->is_java_client_) {
527
session.set_client_mode(OB_JAVA_CLIENT_MODE);
529
if (conn->is_proxy_) {
530
session.set_client_mode(OB_PROXY_CLIENT_MODE);
532
if (conn->is_oci_client_) {
533
session.set_client_mode(OB_OCI_CLIENT_MODE);
535
if (conn->is_jdbc_client_) {
536
session.set_client_mode(OB_JDBC_CLIENT_MODE);
540
uint64_t client_attr_cap_flags = 0;
543
if (tenant_name_.empty()) {
544
tenant_name_ = ObString::make_string(OB_SYS_TENANT_NAME);
545
OB_LOG(INFO, "no tenant name set, use default tenant name", K_(tenant_name));
548
if (OB_NOT_NULL(tenant_name_.find('$'))) {
549
ret = OB_ERR_INVALID_TENANT_NAME;
550
LOG_WARN("invalid tenant name. “$” is not allowed in tenant name.", K(ret), K_(tenant_name));
553
//在oracle租户下需要转换db_name和user_name,处理双引号和大小写
554
//在mysql租户下不会作任何处理,只简单拷贝下~
556
if (db_name_.length() > OB_MAX_DATABASE_NAME_LENGTH ||
557
user_name_.length() > OB_MAX_USER_NAME_LENGTH) {
558
ret = OB_INVALID_ARGUMENT_FOR_LENGTH;
559
LOG_WARN("invalid length for db_name or user_name", K(db_name_), K(user_name_), K(ret));
561
MEMCPY(db_name_var_, db_name_.ptr(), db_name_.length());
562
db_name_var_[db_name_.length()] = '\0';
563
MEMCPY(user_name_var_, user_name_.ptr(), user_name_.length());
564
user_name_var_[user_name_.length()] = '\0';
565
user_name_.assign_ptr(user_name_var_, user_name_.length());
566
db_name_.assign_ptr(db_name_var_, db_name_.length());
570
share::schema::ObSessionPrivInfo session_priv;
571
const ObSysVariableSchema *sys_variable_schema = NULL;
573
} else if (OB_FAIL(convert_oracle_object_name(conn->tenant_id_, user_name_))) {
574
LOG_WARN("fail to convert oracle user name", K(ret));
575
} else if (OB_FAIL(convert_oracle_object_name(conn->tenant_id_, db_name_))) {
576
LOG_WARN("fail to convert oracle db name", K(ret));
577
} else if (OB_FAIL(schema_guard.get_sys_variable_schema(conn->tenant_id_, sys_variable_schema))) {
578
LOG_WARN("get sys variable schema failed", K(ret));
579
} else if (OB_ISNULL(sys_variable_schema)) {
580
ret = OB_ERR_UNEXPECTED;
581
LOG_WARN("sys variable schema is null", K(ret));
582
} else if (OB_FAIL(session.init_tenant(tenant_name_, conn->tenant_id_))) {
583
LOG_WARN("failed to init_tenant", K(ret));
584
} else if (OB_FAIL(session.load_all_sys_vars(*sys_variable_schema, false))) {
585
LOG_WARN("load system variables failed", K(ret));
587
share::schema::ObUserLoginInfo login_info;
588
login_info.tenant_name_ = tenant_name_;
589
login_info.user_name_ = user_name_;
590
login_info.client_ip_ = client_ip_;
591
SSL *ssl_st = SQL_REQ_OP.get_sql_ssl_st(req_);
592
const ObUserInfo *user_info = NULL;
593
// 当 oracle 模式下,用户登录没有指定 schema_name 时,将其默认设置为对应的 user_name
594
if (OB_SUCC(ret) && ORACLE_MODE == session.get_compatibility_mode() && db_name_.empty()) {
595
login_info.db_ = user_name_;
596
} else if (!db_name_.empty()) {
597
ObString db_name = db_name_;
598
ObNameCaseMode mode = OB_NAME_CASE_INVALID;
599
bool perserve_lettercase = true;
600
ObCollationType cs_type = CS_TYPE_INVALID;
601
if (OB_FAIL(session.get_collation_connection(cs_type))) {
602
LOG_WARN("fail to get collation_connection", K(ret));
603
} else if (OB_FAIL(session.get_name_case_mode(mode))) {
604
LOG_WARN("fail to get name case mode", K(mode), K(ret));
605
} else if (FALSE_IT(perserve_lettercase = ORACLE_MODE == session.get_compatibility_mode()
606
? true : (mode != OB_LOWERCASE_AND_INSENSITIVE))) {
607
} else if (OB_FAIL(ObSQLUtils::check_and_convert_db_name(
608
cs_type, perserve_lettercase, db_name))) {
609
LOG_WARN("fail to check and convert database name", K(db_name), K(ret));
610
} else if (OB_FAIL(ObSQLUtils::cvt_db_name_to_org(schema_guard, &session, db_name))) {
611
LOG_WARN("fail to convert db name to org");
613
login_info.db_ = db_name;
617
login_info.scramble_str_.assign_ptr(conn->scramble_buf_, sizeof(conn->scramble_buf_));
618
login_info.passwd_ = hsr_.get_auth_response();
622
} else if (OB_FAIL(schema_guard.check_user_access(login_info, session_priv, ssl_st, user_info))) {
624
int inner_ret = OB_SUCCESS;
625
bool is_unlocked = false;
626
if (ORACLE_MODE == session.get_compatibility_mode()
627
&& OB_ERR_USER_IS_LOCKED == ret) {
628
if (OB_SUCCESS != (inner_ret = unlock_user_if_time_is_up(conn->tenant_id_, schema_guard, is_unlocked))) {
629
LOG_WARN("fail to check user unlock", K(inner_ret));
632
if (MYSQL_MODE == session.get_compatibility_mode()
633
&& OB_ERR_USER_IS_LOCKED == ret) {
634
if (OB_SUCCESS != (inner_ret = unlock_user_if_time_is_up_mysql(conn->tenant_id_,
635
session_priv.user_id_,
638
LOG_WARN("fail to check user unlock", K(inner_ret));
642
int tmp_ret = OB_SUCCESS;
643
ObMultiVersionSchemaService *schema_service = gctx_.schema_service_;
644
int64_t local_version = OB_INVALID_VERSION;
645
int64_t global_version = OB_INVALID_VERSION;
646
if (OB_SUCCESS != (tmp_ret = schema_service->get_tenant_refreshed_schema_version(conn->tenant_id_, local_version))) {
647
LOG_WARN("fail to get local version", K(ret), K(tmp_ret), "tenant_id", conn->tenant_id_);
648
} else if (OB_SUCCESS != (tmp_ret = schema_service->get_tenant_received_broadcast_version(conn->tenant_id_, global_version))) {
649
LOG_WARN("fail to get local version", K(ret), K(tmp_ret), "tenant_id", conn->tenant_id_);
650
} else if (local_version < global_version || is_unlocked) {
651
uint64_t tenant_id = conn->tenant_id_;
652
LOG_INFO("try to refresh schema", K(tenant_id), K(is_unlocked),
653
K(local_version), K(global_version));
654
if (OB_SUCCESS != (tmp_ret = gctx_.schema_service_->async_refresh_schema(
655
tenant_id, global_version))) {
656
LOG_WARN("failed to refresh schema", K(tmp_ret), K(tenant_id), K(global_version));
657
} else if (OB_SUCCESS != (tmp_ret = gctx_.schema_service_->get_tenant_schema_guard(
658
tenant_id, schema_guard))) {
659
LOG_WARN("get schema guard failed", K(ret), K(tmp_ret), K(tenant_id));
660
} else if (OB_SUCCESS == inner_ret) {
661
//schema刷新成功,并且内部执行也没有出错,尝试重新登录
662
if (OB_FAIL(schema_guard.check_user_access(login_info, session_priv,
663
ssl_st, user_info))) {
664
LOG_WARN("User access denied", K(login_info), K(ret));
670
if (OB_PASSWORD_WRONG == ret && is_inner_proxyro_user(*conn, user_name_)) {
671
reset_inner_proxyro_scramble(*conn, login_info);
673
if (OB_FAIL(schema_guard.check_user_access(login_info, session_priv,
674
ssl_st, user_info))) {
675
LOG_WARN("User access denied", K(login_info), K(pre_ret),K(ret));
678
LOG_WARN("User access denied", K(login_info), K(ret));
683
if (OB_FAIL(session.on_user_connect(session_priv, user_info))) {
684
LOG_WARN("session on user connect failed", K(ret));
690
if (ORACLE_MODE == session.get_compatibility_mode()
691
&& (OB_SUCC(ret) || OB_PASSWORD_WRONG == ret)) {
693
if (OB_FAIL(update_login_stat_in_trans(conn->tenant_id_, OB_SUCCESS == login_ret, schema_guard))) {
694
LOG_WARN("fail to update login stat in trans", K(ret));
696
ret = login_ret; // 还原错误码
700
if (MYSQL_MODE == session.get_compatibility_mode()
701
&& (OB_SUCC(ret) || OB_PASSWORD_WRONG == ret || OB_ERR_USER_IS_LOCKED == ret)) {
703
bool is_unlocked_now = false;
704
if (OB_FAIL(update_login_stat_mysql(conn->tenant_id_, is_valid_id(session_priv.user_id_),
705
schema_guard, is_unlocked_now))) {
706
LOG_WARN("fail to update login stat", K(ret));
707
} else if (OB_ERR_USER_IS_LOCKED == login_ret && is_unlocked_now) {
708
ret = OB_PASSWORD_WRONG;
709
LOG_WARN("user under connnection control and temporarily not locked",
710
K(conn->tenant_id_), K(user_name_), K(ret));
712
ret = login_ret; // recover return code
717
if (OB_FAIL(check_password_expired(conn->tenant_id_, schema_guard, session))) {
718
LOG_WARN("fail to check password expired", K(ret));
723
// Attention!! must set session capability firstly
724
if (ORACLE_MODE == session.get_compatibility_mode()) {
726
hsr_.set_client_found_rows();
728
session.set_capability(hsr_.get_capability_flags());
729
session.set_user_priv_set(session_priv.user_priv_set_);
730
session.set_db_priv_set(session_priv.db_priv_set_);
731
session.set_enable_role_array(session_priv.enable_role_id_array_);
732
host_name = session_priv.host_name_;
733
uint64_t db_id = OB_INVALID_ID;
734
const ObTenantSchema *tenant_schema = NULL;
735
if (OB_FAIL(session.set_user(user_name_, session_priv.host_name_, session_priv.user_id_))) {
736
LOG_WARN("failed to set_user", K(ret));
737
} else if (OB_FAIL(session.set_real_client_ip_and_port(client_ip_, client_port_))) {
738
LOG_WARN("failed to set_real_client_ip_and_port", K(ret));
739
} else if (OB_FAIL(session.set_default_database(session_priv.db_))) {
740
LOG_WARN("failed to set default database", K(ret), K(session_priv.db_));
741
} else if (OB_FAIL(schema_guard.get_tenant_info(session_priv.tenant_id_, tenant_schema))) {
742
LOG_WARN("get tenant info failed", K(ret));
743
} else if (OB_ISNULL(tenant_schema)) {
744
ret = OB_TENANT_NOT_EXIST;
745
LOG_WARN("tenant_schema is null", K(ret));
746
} else if (tenant_schema->is_in_recyclebin()) {
747
ret = OB_TENANT_NOT_EXIST;
748
LOG_WARN("tenant is in recyclebin", KR(ret), K(session_priv.tenant_id_));
749
} else if (tenant_schema->is_restore()) {
750
ret = OB_STATE_NOT_MATCH;
751
LOG_WARN("tenant is in restore", KR(ret), K(session_priv.tenant_id_));
752
} else if (OB_FAIL(session.update_database_variables(&schema_guard))) {
753
LOG_WARN("failed to update database variables", K(ret));
754
} else if (OB_FAIL(session.update_max_packet_size())) {
755
LOG_WARN("failed to update max packet size", K(ret));
756
#ifdef OB_BUILD_AUDIT_SECURITY
757
} else if (OB_FAIL(check_audit_user(session_priv.tenant_id_, user_name_))) {
758
LOG_WARN("fail to check audit user privilege", K(ret));
760
} else if (OB_FAIL(get_client_attribute_capability(client_attr_cap_flags))) {
761
LOG_WARN("failed to get client attribute capability", K(ret));
763
session.set_client_attrbuite_capability(client_attr_cap_flags);
766
if (OB_SUCC(ret) && !session.get_database_name().empty()) {
767
if (OB_FAIL(schema_guard.get_database_id(session.get_effective_tenant_id(),
768
session.get_database_name(),
770
int tmp_ret = OB_SUCCESS;
771
LOG_WARN("failed to get database id", K(ret), K(session.get_database_name()));
772
ObMultiVersionSchemaService *schema_service = gctx_.schema_service_;
773
int64_t local_version = OB_INVALID_VERSION;
774
int64_t global_version = OB_INVALID_VERSION;
775
const uint64_t effective_tenant_id = session.get_effective_tenant_id();
776
if (OB_SUCCESS != (tmp_ret = schema_service->get_tenant_refreshed_schema_version(effective_tenant_id, local_version))) {
777
LOG_WARN("fail to get local version", K(ret), K(tmp_ret), "tenant_id", effective_tenant_id);
778
} else if (OB_SUCCESS != (tmp_ret = schema_service->get_tenant_received_broadcast_version(effective_tenant_id, global_version))) {
779
LOG_WARN("fail to get local version", K(ret), K(tmp_ret), "tenant_id", effective_tenant_id);
780
} else if (local_version < global_version) {
781
LOG_INFO("try to refresh schema", K(effective_tenant_id),
782
K(local_version), K(global_version));
783
if (OB_SUCCESS != (tmp_ret = gctx_.schema_service_->async_refresh_schema(
784
effective_tenant_id, global_version))) {
785
LOG_WARN("failed to refresh schema", K(tmp_ret),
786
K(effective_tenant_id), K(global_version));
787
} else if (OB_SUCCESS != (tmp_ret = gctx_.schema_service_->get_tenant_schema_guard(effective_tenant_id, schema_guard))) {
788
LOG_WARN("get schema guard failed", K(ret), K(tmp_ret));
789
} else if (OB_SUCCESS != (tmp_ret = schema_guard.get_database_id(effective_tenant_id, session.get_database_name(), db_id))) {
790
LOG_WARN("failed to get database id", K(ret), K(tmp_ret));
792
// 只有成功刷到schema时才重置错误码
798
session.set_database_id(db_id);
803
LOG_DEBUG("obmp connect info:", K_(tenant_name), K_(user_name),
804
K(host_name), K_(client_ip), "database", hsr_.get_database(),
805
K(hsr_.get_capability_flags().capability_),
806
K(session.is_client_use_lob_locator()),
807
K(session.is_client_support_lob_locatorv2()));
812
int ObMPConnect::switch_lock_status_for_current_login_user(const uint64_t tenant_id, bool do_lock)
814
int ret = OB_SUCCESS;
815
OZ(switch_lock_status_for_user(tenant_id, ObString::make_string("%"), ORACLE_MODE, do_lock),
820
int ObMPConnect::switch_lock_status_for_user(const uint64_t tenant_id, const ObString &host_name,
821
ObCompatibilityMode compat_mode, bool do_lock)
823
int ret = OB_SUCCESS;
825
ObSqlString lock_user_sql;
826
common::ObMySQLProxy *sql_proxy = nullptr;
827
int64_t affected_rows = 0;
828
const char *name_quote = ORACLE_MODE == compat_mode ? "\"" : "'";
830
if (!is_valid_tenant_id(tenant_id)) {
831
ret = OB_INVALID_ARGUMENT;
832
LOG_WARN("invalid id", K(tenant_id), K(ret));
833
} else if (OB_FAIL(lock_user_sql.append_fmt("ALTER USER %s%.*s%s", name_quote,
834
user_name_.length(), user_name_.ptr(), name_quote))) {
835
LOG_WARN("append string failed", K(ret));
836
} else if (MYSQL_MODE == compat_mode && OB_FAIL(lock_user_sql.append_fmt("@%s%.*s%s",
837
name_quote, host_name.length(), host_name.ptr(), name_quote))) {
838
LOG_WARN("append string failed", K(ret));
839
} else if (OB_FAIL(lock_user_sql.append_fmt(" ACCOUNT %s", do_lock ? "LOCK" : "UNLOCK"))) {
840
LOG_WARN("append string failed", K(ret));
841
} else if (OB_ISNULL(sql_proxy = gctx_.sql_proxy_)) {
842
ret = OB_ERR_UNEXPECTED;
843
LOG_WARN("sql proxy is null", K(ret));
844
} else if (OB_FAIL(sql_proxy->write(tenant_id, lock_user_sql.ptr(), affected_rows, compat_mode))) {
845
LOG_WARN("fail to execute lock user", K(ret));
847
LOG_INFO("user ddl has been sent, change user lock status to ", K(tenant_id), K(user_name_),
848
K(host_name), K(do_lock));
853
int ObMPConnect::unlock_user_if_time_is_up(const uint64_t tenant_id,
854
ObSchemaGetterGuard &schema_guard,
857
int ret = OB_SUCCESS;
858
uint64_t user_id = OB_INVALID_ID;
860
bool is_exist = false;
861
int64_t failed_login_limit_num = INT64_MAX;
862
int64_t failed_login_limit_time = INT64_MAX;
863
int64_t current_failed_login_num = 0;
864
int64_t last_failed_login_timestamp = 0;
865
int64_t current_gmt = ObTimeUtil::current_time();
866
ObMySQLTransaction trans;
868
if (!is_valid_tenant_id(tenant_id)) {
869
ret = OB_INVALID_ARGUMENT;
870
LOG_WARN("Invalid tenant", K(tenant_id), K(ret));
871
} else if (OB_FAIL(schema_guard.check_user_exist(tenant_id,
873
ObString(OB_DEFAULT_HOST_NAME),
876
LOG_WARN("fail to check user exist", K(ret));
877
} else if (!is_exist) {
878
ret = OB_ERR_UNEXPECTED;
879
LOG_WARN("user not exist", K(ret));
880
} else if (ObOraSysChecker::is_super_user(user_id)) {
881
//do nothing for oracle sys user
882
} else if (OB_FAIL(schema_guard.get_user_profile_failed_login_limits(tenant_id,
884
failed_login_limit_num,
885
failed_login_limit_time))) {
886
LOG_WARN("fail to get user id and profile limit", K(ret), K(tenant_id), K(user_id));
887
} else if (failed_login_limit_num == INT64_MAX) {
888
//unlimited do nothing
889
} else if (OB_FAIL(trans.start(gctx_.sql_proxy_, gen_meta_tenant_id(tenant_id)))) {
890
LOG_WARN("fail to start trans", K(ret), K(tenant_id));
891
} else if (OB_FAIL(get_last_failed_login_info(tenant_id,
894
current_failed_login_num,
895
last_failed_login_timestamp))) {
896
LOG_WARN("fail to check current login user need unlock", K(user_id), K(user_name_), K(ret));
897
} else if (current_failed_login_num >= failed_login_limit_num
898
&& current_gmt - last_failed_login_timestamp >= failed_login_limit_time) { //锁定时间到了
899
if (OB_FAIL(switch_lock_status_for_current_login_user(tenant_id, false))) {
900
LOG_WARN("fail to check lock status", K(ret));
901
} else if (OB_FAIL(update_current_user_failed_login_num(
902
tenant_id, user_id, trans, 0))) {
903
LOG_WARN("fail to clear failed login num", K(ret));
911
if (trans.is_started()) {
912
int temp_ret = OB_SUCCESS;
913
if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) {
914
LOG_WARN("trans end failed", "is_commit", OB_SUCCESS == ret, K(ret), K(temp_ret));
915
ret = OB_SUCC(ret) ? temp_ret : ret;
919
LOG_DEBUG("user is locked, check timeout", K(failed_login_limit_num), K(current_failed_login_num),
920
"need unlock", (current_gmt - last_failed_login_timestamp >= failed_login_limit_time));
925
int ObMPConnect::unlock_user_if_time_is_up_mysql(const uint64_t tenant_id,
926
const uint64_t user_id,
927
ObSchemaGetterGuard &schema_guard,
930
int ret = OB_SUCCESS;
931
int64_t current_failed_login_num = 0;
932
int64_t last_failed_login_timestamp = 0;
933
const ObUserInfo *user_info = NULL;
937
ObMySQLTransaction trans;
939
if (!is_valid_tenant_id(tenant_id)) {
940
ret = OB_INVALID_ARGUMENT;
941
LOG_WARN("Invalid tenant", K(tenant_id), K(ret));
942
} else if (!is_connection_control_enabled(tenant_id)) {
943
// do nothing when connection_control is disabled
944
} else if (!is_valid_id(user_id)) {
945
// user_id is valid only the password is correct, do nothing when password wrong
946
} else if (OB_FAIL(schema_guard.get_user_info(tenant_id,
949
LOG_WARN("fail to get user info", K(ret));
950
} else if (OB_ISNULL(user_info)) {
951
ret = OB_ERR_UNEXPECTED;
952
LOG_WARN("user not exist", K(ret));
953
} else if (OB_FAIL(trans.start(gctx_.sql_proxy_, gen_meta_tenant_id(tenant_id)))) {
954
LOG_WARN("fail to start trans", K(ret), K(tenant_id));
955
} else if (OB_FAIL(get_last_failed_login_info(tenant_id, user_id, trans,
956
current_failed_login_num,
957
last_failed_login_timestamp))) {
958
LOG_WARN("fail to get current user failed login num", K(ret));
959
} else if (OB_FAIL(get_connection_control_stat(tenant_id, current_failed_login_num,
960
last_failed_login_timestamp,
961
need_lock, is_locked_now))) {
962
LOG_WARN("fail to get current user failed login num", K(ret));
963
} else if (!is_locked_now) { // time's up
964
if (OB_FAIL(clear_current_user_failed_login_num(tenant_id, user_id, trans))) {
965
LOG_WARN("fail to clear failed login num", K(ret));
966
} else if (OB_FAIL(switch_lock_status_for_user(tenant_id, user_info->get_host_name_str(),
967
MYSQL_MODE, false))) {
968
LOG_WARN("fail to check lock status", K(ret));
974
if (trans.is_started()) {
975
int temp_ret = OB_SUCCESS;
976
if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) {
977
LOG_WARN("trans end failed", "is_commit", OB_SUCCESS == ret, K(ret), K(temp_ret));
978
ret = OB_SUCC(ret) ? temp_ret : ret;
984
#ifdef OB_BUILD_AUDIT_SECURITY
985
int ObMPConnect::check_audit_user(const uint64_t tenant_id, ObString &user_name)
987
int ret = OB_SUCCESS;
988
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::MYSQL;
989
if (0 == user_name.case_compare(OB_ORA_AUDITOR_NAME)) {
990
if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(tenant_id, compat_mode))) {
991
LOG_WARN("fail to get tenant mode in convert_oracle_object_name", K(ret));
992
} else if (compat_mode == lib::Worker::CompatMode::MYSQL) {
993
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
994
if (tenant_config.is_valid()) {
995
ObString audit_mode(tenant_config->_audit_mode.get_value());
996
if (0 != audit_mode.case_compare("ORACLE")) {
997
ret = OB_ERR_NO_PRIVILEGE;
998
LOG_WARN("audit user cannot login in because of the"
999
"audit_mode not be oracle", K(ret));
1008
int ObMPConnect::update_login_stat_in_trans(const uint64_t tenant_id,
1009
const bool is_login_succ,
1010
ObSchemaGetterGuard &schema_guard)
1012
int ret = OB_SUCCESS;
1013
uint64_t user_id = OB_INVALID_ID;
1014
bool is_exist = false;
1015
int64_t current_failed_login_num = INT64_MAX;
1016
int64_t last_failed_login_timestamp = INT64_MAX;
1017
int64_t failed_login_limit_num = INT64_MAX;
1018
int64_t failed_login_limit_time = INT64_MAX;
1019
ObMySQLTransaction trans;
1022
if (!is_valid_tenant_id(tenant_id)) {
1023
ret = OB_INVALID_ARGUMENT;
1024
LOG_WARN("Invalid tenant", K(tenant_id), K(ret));
1025
} else if (OB_FAIL(schema_guard.check_user_exist(tenant_id,
1027
ObString(OB_DEFAULT_HOST_NAME),
1030
LOG_WARN("fail to check user exist", K(ret));
1031
} else if (!is_exist) {
1033
} else if (ObOraSysChecker::is_super_user(user_id)) {
1034
//do nothing for oracle sys user
1035
} else if (OB_FAIL(schema_guard.get_user_profile_failed_login_limits(tenant_id,
1037
failed_login_limit_num,
1038
failed_login_limit_time))) {
1039
LOG_WARN("fail to get user id and profile limit", K(ret), K(tenant_id), K(user_id));
1040
} else if (failed_login_limit_num == INT64_MAX) {
1041
//unlimited do nothing
1042
} else if (OB_FAIL(trans.start(gctx_.sql_proxy_, gen_meta_tenant_id(tenant_id)))) {
1043
LOG_WARN("fail to start transaction", K(ret));
1044
} else if (OB_FAIL(get_last_failed_login_info(tenant_id, user_id, trans,
1045
current_failed_login_num, last_failed_login_timestamp))) {
1046
LOG_WARN("fail to get current user failed login num", K(ret));
1047
} else if (OB_LIKELY(is_login_succ)) {
1048
//如果登录成功了,清除之前登录失败的统计信息
1049
if (OB_UNLIKELY(current_failed_login_num != 0)) {
1050
if (OB_FAIL(clear_current_user_failed_login_num(tenant_id, user_id, trans))) {
1051
LOG_WARN("fail to clear current user failed login", K(ret));
1054
} else { //login failed with wrong password
1055
//如果登录失败了,统计失败登录次数,达到阈值锁定用户
1056
if (OB_FAIL(update_current_user_failed_login_num(
1057
tenant_id, user_id, trans, current_failed_login_num + 1))) {
1058
LOG_WARN("fail to clear current user failed login", K(ret));
1059
} else if (current_failed_login_num + 1 == failed_login_limit_num
1060
|| (current_failed_login_num + 1 > failed_login_limit_num
1061
&& ObTimeUtil::current_time() - last_failed_login_timestamp > USECS_PER_SEC * 10)) {
1062
if (OB_FAIL(switch_lock_status_for_current_login_user(tenant_id, true))) {
1063
LOG_WARN("fail to lock current login user", K(ret));
1066
commit = (current_failed_login_num < failed_login_limit_num);
1069
if (trans.is_started()) {
1070
int temp_ret = OB_SUCCESS;
1071
if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret) && commit))) {
1072
LOG_WARN("trans end failed", "is_commit", OB_SUCCESS == ret && commit, K(ret), K(temp_ret));
1073
ret = OB_SUCC(ret) ? temp_ret : ret;
1076
LOG_DEBUG("update_login_stat_in_trans check", K(commit),
1077
K(current_failed_login_num), K(last_failed_login_timestamp),
1078
K(failed_login_limit_num), K(failed_login_limit_time));
1082
int ObMPConnect::update_login_stat_mysql(const uint64_t tenant_id,
1083
const bool is_login_succ,
1084
ObSchemaGetterGuard &schema_guard,
1085
bool &is_unlocked_now) {
1086
int ret = OB_SUCCESS;
1087
ObSEArray<const ObUserInfo*, 1> user_infos;
1088
const ObUserInfo *user_info = NULL;
1089
bool is_locked_tmp = false;
1090
bool is_locked_now = false;
1091
is_unlocked_now = false;
1092
if (!is_valid_tenant_id(tenant_id)) {
1093
ret = OB_INVALID_ARGUMENT;
1094
LOG_WARN("Invalid tenant", K(tenant_id), K(ret));
1096
if (OB_SUCC(ret) && is_connection_control_enabled(tenant_id)) {
1097
OZ(schema_guard.get_user_info(tenant_id, user_name_, user_infos), tenant_id, user_name_);
1098
for (int64_t i = 0; OB_SUCC(ret) && i < user_infos.count(); ++i) {
1099
user_info = user_infos.at(i);
1100
if (OB_ISNULL(user_info)) {
1101
ret = OB_ERR_UNEXPECTED;
1102
LOG_WARN("user info is null", K(tenant_id), K(user_name_), K(ret));
1103
} else if (!obsys::ObNetUtil::is_match(client_ip_, user_info->get_host_name_str())) {
1104
LOG_INFO("account not matched, try next", KPC(user_info), K(client_ip_));
1105
} else if (OB_FAIL(update_login_stat_in_trans_mysql(tenant_id, *user_info, is_login_succ,
1107
LOG_WARN("fail to update login stat in trans mysql");
1109
is_locked_now = is_locked_now || is_locked_tmp;
1112
OX(is_unlocked_now = !is_locked_now);
1117
int ObMPConnect::update_login_stat_in_trans_mysql(const uint64_t tenant_id,
1118
const ObUserInfo &user_info,
1119
const bool is_login_succ,
1120
bool &is_locked_now) {
1121
int ret = OB_SUCCESS;
1122
int64_t current_failed_login_num = INT64_MAX;
1123
int64_t last_failed_login_timestamp = INT64_MAX;
1124
bool need_lock = false; // true if need to lock user
1125
is_locked_now = false; // true if exceed the threshold and time's not up
1126
ObMySQLTransaction trans;
1127
if (OB_FAIL(trans.start(gctx_.sql_proxy_, gen_meta_tenant_id(tenant_id)))) {
1128
LOG_WARN("fail to start transaction", K(ret));
1129
} else if (OB_FAIL(get_last_failed_login_info(tenant_id, user_info.get_user_id(),
1130
trans, current_failed_login_num,
1131
last_failed_login_timestamp))) {
1132
LOG_WARN("fail to get current user failed login num", K(ret));
1133
} else if (OB_FAIL(get_connection_control_stat(tenant_id, current_failed_login_num,
1134
last_failed_login_timestamp,
1135
need_lock, is_locked_now))) {
1136
LOG_WARN("fail to get current user failed login num", K(ret));
1137
} else if (OB_UNLIKELY(is_locked_now)) {
1139
LOG_WARN("user locked by connection control", K(tenant_id), K(user_info), K(client_ip_),
1140
K(is_login_succ), K(current_failed_login_num), K(last_failed_login_timestamp), K(ret));
1141
} else if (OB_LIKELY(is_login_succ)) {
1142
// clear the failed login num if login succ
1143
if (OB_UNLIKELY(current_failed_login_num != 0)) {
1144
if (OB_FAIL(clear_current_user_failed_login_num(tenant_id, user_info.get_user_id(), trans))) {
1145
LOG_WARN("fail to clear current user failed login", K(ret));
1149
// increase login failed num if login with wrong password
1150
if (OB_FAIL(update_current_user_failed_login_num(
1151
tenant_id, user_info.get_user_id(), trans, current_failed_login_num + 1))) {
1152
LOG_WARN("fail to clear current user failed login", K(ret));
1155
if (OB_SUCC(ret) && need_lock && !user_info.get_is_locked()) {
1156
if (OB_FAIL(switch_lock_status_for_user(tenant_id, user_info.get_host_name(),
1157
MYSQL_MODE, true))) {
1158
LOG_WARN("fail to lock user", K(ret));
1162
if (trans.is_started()) {
1163
int temp_ret = OB_SUCCESS;
1164
if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) {
1165
LOG_WARN("trans end failed", "is_commit", OB_SUCCESS == ret, K(ret), K(temp_ret));
1166
ret = OB_SUCC(ret) ? temp_ret : ret;
1173
int ObMPConnect::get_last_failed_login_info(
1174
const uint64_t tenant_id,
1175
const uint64_t user_id,
1176
ObISQLClient &sql_client,
1177
int64_t ¤t_failed_login_num,
1178
int64_t &last_failed_timestamp)
1181
int ret = OB_SUCCESS;
1182
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
1183
ObSqlString select_sql;
1184
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
1185
sqlclient::ObMySQLResult *result = NULL;
1186
if (!is_valid_tenant_id(tenant_id)
1187
|| !is_valid_id(user_id)) {
1188
ret = OB_INVALID_ARGUMENT;
1189
LOG_WARN("invalid id", K(tenant_id), K(user_id), K(ret));
1190
} else if (OB_FAIL(select_sql.append_fmt("SELECT failed_login_attempts, gmt_modified FROM `%s`"
1191
" WHERE tenant_id = %lu and user_id = %lu FOR UPDATE",
1192
OB_ALL_TENANT_USER_FAILED_LOGIN_STAT_TNAME,
1195
LOG_WARN("append string failed", K(ret));
1196
} else if (OB_FAIL(sql_client.read(res, exec_tenant_id, select_sql.ptr()))) {
1197
LOG_WARN("fail to execute lock user", KR(ret), K(tenant_id), K(exec_tenant_id));
1198
} else if (OB_ISNULL(result = res.get_result())) {
1199
ret = OB_ERR_UNEXPECTED;
1200
LOG_WARN("result is null", K(ret));
1201
} else if (OB_FAIL(result->next())) {
1202
if (OB_ITER_END == ret) {
1204
current_failed_login_num = 0;
1205
last_failed_timestamp = 0;
1207
LOG_WARN("get result failed", K(ret));
1209
} else if (OB_FAIL(result->get_int("failed_login_attempts", current_failed_login_num))) {
1210
LOG_WARN("fail to get int value", K(ret));
1211
} else if (OB_FAIL(result->get_timestamp("gmt_modified", NULL, last_failed_timestamp))) {
1212
LOG_WARN("fail get timestamp value", K(ret));
1213
} else if (result->next() != OB_ITER_END) {
1214
ret = OB_ERR_UNEXPECTED;
1215
LOG_WARN("more than one row returned", K(ret));
1220
int temp_ret = OB_SUCCESS;
1221
if (OB_SUCCESS != (temp_ret = result->close())) {
1222
LOG_WARN("fail to close", K(temp_ret));
1224
ret = OB_SUCC(ret) ? temp_ret : ret;
1232
int ObMPConnect::clear_current_user_failed_login_num(
1233
const uint64_t tenant_id,
1234
const uint64_t user_id,
1235
ObISQLClient &sql_client)
1237
int ret = OB_SUCCESS;
1238
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
1240
int64_t affected_rows = 0;
1241
if (!is_valid_id(user_id)
1242
|| !is_valid_tenant_id(tenant_id)
1243
|| user_name_.empty()) {
1244
ret = OB_INVALID_ARGUMENT;
1245
LOG_WARN("invalid id", K(user_id), K(tenant_id), K_(user_name), K(ret));
1246
} else if (OB_FAIL(sql.assign_fmt("DELETE FROM `%s` "
1247
" WHERE tenant_id = %lu and user_id = %lu",
1248
OB_ALL_TENANT_USER_FAILED_LOGIN_STAT_TNAME,
1249
tenant_id, user_id))) {
1250
LOG_WARN("append table name failed", K(ret));
1251
} else if (OB_FAIL(sql_client.write(exec_tenant_id,
1254
LOG_WARN("fail to do update", KR(ret), K(sql), K(exec_tenant_id), K(tenant_id));
1255
} else if (!is_single_row(affected_rows)) {
1256
ret = OB_ERR_UNEXPECTED;
1257
LOG_WARN("unexpected affected rows", K(ret), K(affected_rows), K(sql));
1262
int ObMPConnect::update_current_user_failed_login_num(
1263
const uint64_t tenant_id,
1264
const uint64_t user_id,
1265
ObISQLClient &sql_client,
1266
int64_t new_failed_login_num)
1268
int ret = OB_SUCCESS;
1269
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
1272
int64_t affected_rows = 0;
1274
if (!is_valid_id(user_id)
1275
|| !is_valid_tenant_id(tenant_id)
1276
|| user_name_.empty()) {
1277
ret = OB_INVALID_ARGUMENT;
1278
LOG_WARN("invalid id", K(user_id), K(tenant_id), K_(user_name), K(ret));
1279
} else if (OB_FAIL(sql.assign_fmt("INSERT INTO `%s` (", OB_ALL_TENANT_USER_FAILED_LOGIN_STAT_TNAME))) {
1280
LOG_WARN("append table name failed", K(ret));
1282
SQL_COL_APPEND_VALUE(sql, values, tenant_id, "tenant_id", "%lu");
1283
SQL_COL_APPEND_VALUE(sql, values, user_id, "user_id", "%lu");
1284
SQL_COL_APPEND_ESCAPE_STR_VALUE(sql, values, user_name_.ptr(), user_name_.length(), "user_name");
1285
SQL_COL_APPEND_VALUE(sql, values, new_failed_login_num, "failed_login_attempts", "%ld");
1289
if (OB_FAIL(sql.append_fmt(", gmt_modified) VALUES (%.*s, now(6))"
1290
" ON DUPLICATE KEY UPDATE"
1291
" failed_login_attempts = %ld"
1292
", last_failed_login_svr_ip = \"%.*s\"",
1293
static_cast<int32_t>(values.length()), values.ptr(),
1294
new_failed_login_num,
1295
(new_failed_login_num == 0 ? 0 : client_ip_.length()),
1296
client_ip_.ptr()))) {
1297
LOG_WARN("append sql failed", K(ret));
1298
} else if (OB_FAIL(sql_client.write(exec_tenant_id,
1301
LOG_WARN("fail to do update", K(ret), K(sql), K(exec_tenant_id), K(tenant_id));
1302
} else if (!is_single_row(affected_rows) && !is_double_row(affected_rows)) {
1303
ret = OB_ERR_UNEXPECTED;
1304
LOG_WARN("unexpected affected rows", K(ret), K(affected_rows), K(sql));
1311
bool ObMPConnect::is_connection_control_enabled(const uint64_t tenant_id)
1313
bool is_enabled = false;
1314
if (OB_SYS_TENANT_ID == tenant_id || 0 == user_name_.compare(OB_SYS_USER_NAME)) {
1317
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
1318
if (tenant_config.is_valid()) {
1319
int64_t threshold = tenant_config->connection_control_failed_connections_threshold;
1320
is_enabled = threshold > 0;
1326
int ObMPConnect::get_connection_control_stat(const uint64_t tenant_id,
1327
const int64_t current_failed_login_num, const int64_t last_failed_login_timestamp,
1328
bool &need_lock, bool &is_locked)
1330
int ret = OB_SUCCESS;
1333
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
1334
if (tenant_config.is_valid()) {
1335
int64_t threshold = tenant_config->connection_control_failed_connections_threshold;
1337
int64_t min_delay = tenant_config->connection_control_min_connection_delay;
1338
int64_t max_delay = tenant_config->connection_control_max_connection_delay;
1339
int64_t current_gmt = ObTimeUtil::current_time();
1340
if (threshold <= 0 || current_failed_login_num + 1 < threshold) {
1342
} else if (current_failed_login_num + 1 == threshold ||
1343
(current_failed_login_num + 1 > threshold &&
1344
current_gmt - last_failed_login_timestamp > USECS_PER_SEC * 10)) {
1345
// 1. failed_login_num achieve the threshold exactly
1346
// 2. user is unlocked manually need to be locked again, the interval 10s is used to reduce
1347
// concurrent ddl operation
1350
delay = MIN(MAX((current_failed_login_num + 1 - threshold) * MSECS_PER_SEC, min_delay), max_delay);
1351
is_locked = current_gmt <= delay * USECS_PER_MSEC + last_failed_login_timestamp;
1357
int ObMPConnect::check_password_expired(const uint64_t tenant_id,
1358
ObSchemaGetterGuard &schema_guard,
1359
ObSQLSessionInfo &session)
1361
int ret = OB_SUCCESS;
1362
uint64_t user_id = OB_INVALID_ID;
1363
bool is_exist = false;
1364
if (!is_valid_tenant_id(tenant_id)) {
1365
ret = OB_INVALID_ARGUMENT;
1366
LOG_WARN("Invalid tenant", K(tenant_id), K(ret));
1367
} else if (OB_FAIL(schema_guard.check_user_exist(tenant_id,
1369
ObString(OB_DEFAULT_HOST_NAME),
1372
LOG_WARN("fail to check user exist", K(ret));
1373
} else if (!is_exist) {
1375
} else if (OB_FAIL(ObPrivilegeCheck::check_password_expired_on_connection(
1376
tenant_id, user_id, schema_guard, session))) {
1377
LOG_WARN("fail to check password expired", K(ret), K(tenant_id), K(user_id));
1382
int ObMPConnect::get_user_tenant(ObSMConnection &conn)
1384
// resolve tenantname and username
1385
int ret = OB_SUCCESS;
1386
if (0 != STRLEN(conn.user_name_buf_)) {
1387
user_name_.assign_ptr(conn.user_name_buf_, static_cast<int32_t>(STRLEN(conn.user_name_buf_)));
1388
tenant_name_.assign_ptr(conn.tenant_name_buf_, static_cast<int32_t>(STRLEN(conn.tenant_name_buf_)));
1389
} else if (OB_FAIL(extract_user_tenant(hsr_.get_username(), user_name_, tenant_name_))) {
1390
LOG_WARN("extract username and tenantname failed", K(ret), "str", hsr_.get_username());
1395
int ObMPConnect::get_tenant_id(ObSMConnection &conn, uint64_t &tenant_id)
1397
int ret = OB_SUCCESS;
1398
tenant_id = OB_INVALID_ID;
1399
if (is_valid_tenant_id(conn.tenant_id_)) {
1400
tenant_id = conn.tenant_id_;
1402
if (tenant_name_.case_compare(OB_DIAG_TENANT_NAME) == 0) {
1403
tenant_name_ = user_name_;
1404
conn.group_id_ = share::OBCG_DIAG_TENANT;
1406
if (OB_FAIL(extract_tenant_id(tenant_name_, tenant_id))) {
1407
LOG_WARN("extract_tenant_id failed", K(ret), K_(tenant_name));
1411
if (is_meta_tenant(tenant_id)) {
1412
ret = OB_NOT_SUPPORTED;
1413
LOG_WARN("can't login meta tenant", KR(ret), K_(tenant_name), K(tenant_id));
1414
LOG_USER_ERROR(OB_NOT_SUPPORTED, "login meta tenant");
1415
} else if (OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(GCTX.ob_service_)) {
1416
ret = OB_ERR_UNEXPECTED;
1417
LOG_WARN("schema_service or ob_service is NULL", KR(ret), K(tenant_id));
1418
} else if (!GCTX.schema_service_->is_tenant_refreshed(tenant_id)) {
1419
bool is_empty = false;
1420
if (is_sys_tenant(tenant_id)
1421
&& OB_FAIL(GCTX.ob_service_->check_server_empty(is_empty))) {
1422
LOG_WARN("fail to check server is empty", KR(ret));
1423
} else if (is_sys_tenant(tenant_id) && is_empty) {
1424
//in bootstrap, we could use sys to login
1426
ret = OB_SERVER_IS_INIT;
1427
LOG_WARN("tenant schema not refreshed yet", KR(ret), K(tenant_id));
1434
int64_t ObMPConnect::get_user_id()
1436
return OB_SYS_USER_ID;
1439
int64_t ObMPConnect::get_database_id()
1441
return OB_SYS_DATABASE_ID;
1444
int ObMPConnect::get_conn_id(uint32_t &conn_id) const
1446
int ret = OB_SUCCESS;
1447
bool is_found = false;
1449
key_str.assign_ptr(OB_MYSQL_CONNECTION_ID , static_cast<int32_t>(STRLEN(OB_MYSQL_CONNECTION_ID)));
1450
for (int64_t i = 0; i < hsr_.get_connect_attrs().count() && OB_SUCC(ret) && !is_found; ++i) {
1451
ObStringKV kv = hsr_.get_connect_attrs().at(i);
1452
if (key_str == kv.key_) {
1454
value.set_varchar(kv.value_);
1455
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
1456
ObCastCtx cast_ctx(&allocator, NULL, CM_NONE, ObCharset::get_system_collation());
1457
EXPR_GET_UINT32_V2(value, conn_id);
1459
LOG_WARN("fail to cast connection id to uint32", K(kv.value_), K(ret));
1466
if (OB_SUCC(ret) && !is_found) {
1467
ret = OB_ENTRY_NOT_EXIST;
1473
int ObMPConnect::get_proxy_conn_id(uint64_t &proxy_conn_id) const
1475
int ret = OB_SUCCESS;
1476
bool is_found = false;
1478
key_str.assign_ptr(OB_MYSQL_PROXY_CONNECTION_ID , static_cast<int32_t>(STRLEN(OB_MYSQL_PROXY_CONNECTION_ID)));
1479
for (int64_t i = 0; i < hsr_.get_connect_attrs().count() && OB_SUCC(ret) && !is_found; ++i) {
1480
const ObStringKV &kv = hsr_.get_connect_attrs().at(i);
1481
if (key_str == kv.key_) {
1483
value.set_varchar(kv.value_);
1484
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
1485
ObCastCtx cast_ctx(&allocator, NULL, CM_NONE, ObCharset::get_system_collation());
1486
EXPR_GET_UINT64_V2(value, proxy_conn_id);
1488
LOG_WARN("fail to cast proxy connection id to uint32", K(kv.value_), K(ret));
1495
if (OB_SUCC(ret) && !is_found) {
1496
//if fail to find proxy_connection_id, ignore it, compatible with old obproxyro's connection
1502
int ObMPConnect::get_client_addr_port(int32_t &client_addr_port) const
1504
int ret = OB_SUCCESS;
1505
bool is_found = false;
1507
key_str.assign_ptr(OB_MYSQL_CLIENT_ADDR_PORT , static_cast<int32_t>(STRLEN(OB_MYSQL_CLIENT_ADDR_PORT)));
1508
for (int64_t i = 0; i < hsr_.get_connect_attrs().count() && OB_SUCC(ret) && !is_found; ++i) {
1509
const ObStringKV &kv = hsr_.get_connect_attrs().at(i);
1510
if (key_str == kv.key_) {
1512
value.set_varchar(kv.value_);
1513
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
1514
ObCastCtx cast_ctx(&allocator, NULL, CM_NONE, ObCharset::get_system_collation());
1515
EXPR_GET_INT32_V2(value, client_addr_port);
1517
LOG_WARN("fail to cast client connection id to int32", K(kv.value_), K(ret));
1524
if (OB_SUCC(ret) && !is_found) {
1525
//if fail to find client addr port, ignore it, compatible with old obproxyro's connection
1526
client_addr_port = 0;
1531
int ObMPConnect::get_client_conn_id(uint32_t &client_sessid) const
1533
int ret = OB_SUCCESS;
1534
bool is_found = false;
1536
key_str.assign_ptr(OB_MYSQL_CLIENT_SESSION_ID , static_cast<int32_t>(STRLEN(OB_MYSQL_CLIENT_SESSION_ID)));
1537
for (int64_t i = 0; i < hsr_.get_connect_attrs().count() && OB_SUCC(ret) && !is_found; ++i) {
1538
const ObStringKV &kv = hsr_.get_connect_attrs().at(i);
1539
if (key_str == kv.key_) {
1541
value.set_varchar(kv.value_);
1542
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
1543
ObCastCtx cast_ctx(&allocator, NULL, CM_NONE, ObCharset::get_system_collation());
1544
EXPR_GET_UINT32_V2(value, client_sessid);
1546
LOG_WARN("fail to cast client connection id to uint32", K(kv.value_), K(ret));
1553
if (OB_SUCC(ret) && !is_found) {
1554
// if fail to find proxy_connection_id, ignore it, compatible with old obproxyro's connection
1555
client_sessid = INVALID_SESSID;
1560
int ObMPConnect::get_client_create_time(int64_t &client_create_time) const
1562
int ret = OB_SUCCESS;
1563
bool is_found = false;
1565
key_str.assign_ptr(OB_MYSQL_CLIENT_CONNECT_TIME_US , static_cast<int32_t>(STRLEN(OB_MYSQL_CLIENT_CONNECT_TIME_US)));
1566
for (int64_t i = 0; i < hsr_.get_connect_attrs().count() && OB_SUCC(ret) && !is_found; ++i) {
1567
const ObStringKV &kv = hsr_.get_connect_attrs().at(i);
1568
if (key_str == kv.key_) {
1570
value.set_varchar(kv.value_);
1571
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
1572
ObCastCtx cast_ctx(&allocator, NULL, CM_NONE, ObCharset::get_system_collation());
1573
EXPR_GET_INT64_V2(value, client_create_time);
1575
LOG_WARN("fail to cast client create time", K(kv.value_), K(ret));
1582
if (OB_SUCC(ret) && !is_found) {
1583
//if fail to find client_create_time, ignore it, compatible with old obproxyro's connection
1584
client_create_time = 0;
1589
//proxy连接方式时获取client->proxy的连接创建时间
1590
int ObMPConnect::get_proxy_sess_create_time(int64_t &sess_create_time) const
1592
int ret = OB_SUCCESS;
1593
bool is_found = false;
1595
key_str.assign_ptr(OB_MYSQL_PROXY_SESSION_CREATE_TIME_US , static_cast<int32_t>(STRLEN(OB_MYSQL_PROXY_SESSION_CREATE_TIME_US)));
1596
for (int64_t i = 0; i < hsr_.get_connect_attrs().count() && OB_SUCC(ret) && !is_found; ++i) {
1597
const ObStringKV &kv = hsr_.get_connect_attrs().at(i);
1598
if (key_str == kv.key_) {
1600
value.set_varchar(kv.value_);
1601
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
1602
ObCastCtx cast_ctx(&allocator, NULL, CM_NONE, ObCharset::get_system_collation());
1603
EXPR_GET_INT64_V2(value, sess_create_time);
1605
LOG_WARN("fail to cast proxy session create time", K(kv.value_), K(ret));
1612
if (OB_SUCC(ret) && !is_found) {
1613
//if fail to find __proxy_sess_create_time, ignore it, compatible with old obproxyro's connection
1614
sess_create_time = 0;
1619
int ObMPConnect::get_proxy_capability(uint64_t &cap) const
1621
int ret = OB_SUCCESS;
1623
bool is_capability_flag_found = false;
1625
for (int64_t i = 0; !is_capability_flag_found && i < hsr_.get_connect_attrs().count(); ++i) {
1626
kv = hsr_.get_connect_attrs().at(i);
1627
if (kv.key_ == OB_MYSQL_CAPABILITY_FLAG) {
1628
is_capability_flag_found = true;
1632
if (is_capability_flag_found) {
1634
value.set_varchar(kv.value_);
1635
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
1636
ObCastCtx cast_ctx(&allocator, NULL, CM_NONE, ObCharset::get_system_collation());
1637
EXPR_GET_UINT64_V2(value, cap);
1639
LOG_WARN("fail to cast capability flag to uint64", K_(kv.value), K(ret));
1645
int ObMPConnect::get_client_attribute_capability(uint64_t &cap) const
1647
int ret = OB_SUCCESS;
1649
bool is_capability_flag_found = false;
1651
for (int64_t i = 0; !is_capability_flag_found && i < hsr_.get_connect_attrs().count(); ++i) {
1652
kv = hsr_.get_connect_attrs().at(i);
1653
if (kv.key_ == OB_MYSQL_CLIENT_ATTRIBUTE_CAPABILITY_FLAG) {
1654
is_capability_flag_found = true;
1658
if (is_capability_flag_found) {
1660
value.set_varchar(kv.value_);
1661
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
1662
ObCastCtx cast_ctx(&allocator, NULL, CM_NONE, ObCharset::get_system_collation());
1663
EXPR_GET_UINT64_V2(value, cap);
1665
LOG_WARN("fail to cast client attribute capability flag to uint64", K_(kv.value), K(ret));
1671
int ObMPConnect::check_update_proxy_capability(ObSMConnection &conn) const
1673
int ret = OB_SUCCESS;
1674
uint64_t client_proxy_cap = 0;
1675
bool is_monotonic_weak_read = transaction::ObWeakReadUtil::enable_monotonic_weak_read(conn.tenant_id_);
1676
if (OB_FAIL(get_proxy_capability(client_proxy_cap))) {
1677
LOG_WARN("get proxy capability fail", K(ret));
1679
// set proxy_capability_ to tell proxy which features observer supports
1680
ObProxyCapabilityFlags server_proxy_cap_flag;
1681
server_proxy_cap_flag.cap_flags_.OB_CAP_PARTITION_TABLE = 1;
1682
server_proxy_cap_flag.cap_flags_.OB_CAP_CHANGE_USER = 1;
1683
server_proxy_cap_flag.cap_flags_.OB_CAP_READ_WEAK = 1;
1684
server_proxy_cap_flag.cap_flags_.OB_CAP_CHECKSUM = 1;
1685
server_proxy_cap_flag.cap_flags_.OB_CAP_SAFE_WEAK_READ = 0;
1686
server_proxy_cap_flag.cap_flags_.OB_CAP_PRIORITY_HIT = 1;
1687
server_proxy_cap_flag.cap_flags_.OB_CAP_CHECKSUM_SWITCH = 1;
1688
server_proxy_cap_flag.cap_flags_.OB_CAP_EXTRA_OK_PACKET_FOR_OCJ = 1;
1689
server_proxy_cap_flag.cap_flags_.OB_CAP_OB_PROTOCOL_V2 = 1;
1690
server_proxy_cap_flag.cap_flags_.OB_CAP_EXTRA_OK_PACKET_FOR_STATISTICS = 1;
1691
server_proxy_cap_flag.cap_flags_.OB_CAP_ABUNDANT_FEEDBACK = 1;
1692
server_proxy_cap_flag.cap_flags_.OB_CAP_PL_ROUTE = 1;
1693
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_REROUTE = 1;
1694
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_SESSIOIN_SYNC = 1;
1695
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_FULL_LINK_TRACING = 1;
1696
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_NEW_EXTRA_INFO = 1;
1697
if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0) {
1698
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_SESSION_VAR_SYNC = 1;
1700
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_SESSION_VAR_SYNC = 0;
1702
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_SESSION_VAR_SYNC = 1;
1703
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_FULL_LINK_TRACING_EXT = 1;
1704
server_proxy_cap_flag.cap_flags_.OB_CAP_SERVER_DUP_SESS_INFO_SYNC = 1;
1705
server_proxy_cap_flag.cap_flags_.OB_CAP_LOCAL_FILES = 1;
1706
if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_0_0) {
1707
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_CLIENT_SESSION_ID = 1;
1709
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_CLIENT_SESSION_ID = 0;
1711
conn.proxy_cap_flags_.capability_ = (server_proxy_cap_flag.capability_ & client_proxy_cap);//if old java client, set it 0
1713
LOG_DEBUG("Negotiated capability",
1714
K(conn.proxy_cap_flags_.is_proxy_reroute_support()),
1715
K(conn.proxy_cap_flags_.is_ob_protocol_v2_support()));
1720
int ObMPConnect::get_proxy_scramble(ObString &proxy_scramble) const
1722
int ret = OB_SUCCESS;
1723
bool is_found = false;
1725
key_str.assign_ptr(OB_MYSQL_SCRAMBLE , static_cast<int32_t>(STRLEN(OB_MYSQL_SCRAMBLE)));
1726
for (int64_t i = 0; i < hsr_.get_connect_attrs().count() && OB_SUCC(ret) && !is_found; ++i) {
1727
const ObStringKV &kv = hsr_.get_connect_attrs().at(i);
1728
if (key_str == kv.key_) {
1729
proxy_scramble.assign_ptr(kv.value_.ptr(), kv.value_.length());
1734
if (OB_SUCC(ret) && !is_found) {
1735
//if fail to find proxy_scramble, ignore it, compatible with old proxy
1736
proxy_scramble.reset();
1741
int ObMPConnect::get_client_ip(ObString &client_ip) const
1743
int ret = OB_SUCCESS;
1744
bool is_found = false;
1746
key_str.assign_ptr(OB_MYSQL_CLIENT_IP , static_cast<int32_t>(STRLEN(OB_MYSQL_CLIENT_IP)));
1747
for (int64_t i = 0; i < hsr_.get_connect_attrs().count() && OB_SUCC(ret) && !is_found; ++i) {
1748
const ObStringKV &kv = hsr_.get_connect_attrs().at(i);
1749
if (key_str == kv.key_) {
1750
client_ip.assign_ptr(kv.value_.ptr(), kv.value_.length());
1755
if (OB_SUCC(ret) && !is_found) {
1756
//if fail to find, ignore it, compatible with old proxy
1762
int ObMPConnect::check_user_cluster(const ObString &server_cluster, const int64_t server_cluster_id) const
1764
int ret = OB_SUCCESS;
1765
ObString cluster_kv(OB_MYSQL_CLUSTER_NAME);
1766
ObString cluster_id_key(OB_MYSQL_CLUSTER_ID);
1768
bool found_cluster = false;
1769
bool found_cluster_id = false;
1770
for (int64_t i = 0; OB_SUCC(ret) && (!found_cluster || !found_cluster_id) && i < hsr_.get_connect_attrs().count(); ++i) {
1771
const ObStringKV &kv = hsr_.get_connect_attrs().at(i);
1772
if (!found_cluster && cluster_kv == kv.key_) {
1773
if (server_cluster != kv.value_) {
1774
ret = OB_CLUSTER_NO_MATCH;
1775
LOG_WARN("user cluster is not match to server cluster",
1776
"user cluster", kv.value_, "server cluster", server_cluster);
1778
found_cluster = true;
1779
} else if (!found_cluster_id && cluster_id_key == kv.key_) {
1780
int64_t user_cluster_id = 0;
1782
value.set_varchar(kv.value_);
1783
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
1784
ObCastCtx cast_ctx(&allocator, NULL, CM_NONE, ObCharset::get_system_collation());
1785
EXPR_GET_INT64_V2(value, user_cluster_id);
1787
ret = OB_CLUSTER_NO_MATCH;
1788
LOG_WARN("fail to cast user_cluster_id to int64", K(kv.value_), K(ret));
1790
if (server_cluster_id != user_cluster_id) {
1791
ret = OB_CLUSTER_NO_MATCH;
1792
LOG_WARN("user cluster id is not match to server cluster id",
1793
"user cluster id", kv.value_, "server cluster id", server_cluster_id);
1796
found_cluster_id = true;
1804
// check common property for obproxy or OCJ
1805
int ObMPConnect::check_common_property(ObSMConnection &conn, ObMySQLCapabilityFlags &client_cap) {
1806
int ret = OB_SUCCESS;
1807
uint64_t proxy_sessid = 0;
1808
uint32_t client_sessid = INVALID_SESSID;
1809
int32_t client_addr_port = 0;
1810
int64_t client_create_time = 0;
1811
int64_t sess_create_time = 0;
1812
if (OB_FAIL(check_user_cluster(ObString::make_string(GCONF.cluster), GCONF.cluster_id))) {
1813
LOG_WARN("fail to check user cluster", K(ret));
1814
} else if (OB_FAIL(check_update_proxy_capability(conn))) {
1815
LOG_WARN("fail to check_update_proxy_capability", K(ret));
1816
} else if (OB_FAIL(get_proxy_conn_id(proxy_sessid))) {
1817
LOG_WARN("get proxy connection id fail", K(ret));
1818
} else if (OB_FAIL(get_client_conn_id(client_sessid))) {
1819
LOG_WARN("get client connection id fail", K(ret), K(client_sessid));
1820
} else if (OB_FAIL(get_client_addr_port(client_addr_port))) {
1821
LOG_WARN("get client connection id fail", K(ret), K(client_addr_port));
1822
} else if (OB_FAIL(get_client_create_time(client_create_time))) {
1823
LOG_WARN("get client connection id fail", K(ret), K(client_addr_port));
1824
} else if (OB_FAIL(get_proxy_sess_create_time(sess_create_time))) {
1825
LOG_WARN("get proxy session create time fail", K(ret));
1827
conn.proxy_sessid_ = proxy_sessid;
1828
conn.client_sessid_ = client_sessid;
1829
conn.client_addr_port_ = client_addr_port;
1830
conn.client_create_time_ = client_create_time;
1831
conn.sess_create_time_ = sess_create_time;
1833
LOG_DEBUG("construct session id", K(conn.client_sessid_), K(conn.sessid_),
1834
K(conn.client_addr_port_), K(conn.client_create_time_) ,K(conn.proxy_sessid_));
1835
if (conn.proxy_cap_flags_.is_ob_protocol_v2_support()) {
1836
// when used 2.0 protocol, do not use mysql compress
1837
client_cap.cap_flags_.OB_CLIENT_COMPRESS = 0;
1839
if (conn.proxy_cap_flags_.is_checksum_support()) {
1840
client_cap.cap_flags_.OB_CLIENT_COMPRESS = 1;
1842
client_cap.cap_flags_.OB_CLIENT_COMPRESS = 0;
1849
int ObMPConnect::check_client_property(ObSMConnection &conn)
1851
int ret = OB_SUCCESS;
1852
ObMySQLCapabilityFlags client_cap = hsr_.get_capability_flags();
1854
if (OB_FAIL(set_client_version(conn))) {
1855
LOG_WARN("get proxy version fail", K(ret));
1860
} else if (conn.is_java_client_) {
1861
// the connection is from oceanbase-connector-java(OCJ)
1862
if (OB_FAIL(check_common_property(conn, client_cap))) {
1863
LOG_WARN("fail to check common property", K(ret));
1865
// if ocj enable extra_ok_packet, then track the system variables
1866
if (conn.proxy_cap_flags_.is_extra_ok_packet_for_ocj_support()) {
1867
client_cap.cap_flags_.OB_CLIENT_SESSION_TRACK = 1;
1870
} else if (conn.is_proxy_) {
1871
// the connection is from obproxy, set CLIENT_SESSION_TRACK flag
1872
client_cap.cap_flags_.OB_CLIENT_SESSION_TRACK = 1;
1874
ObString proxy_scramble;
1875
if (OB_FAIL(check_common_property(conn, client_cap))) {
1876
LOG_WARN("fail to check common property", K(ret));
1877
} else if (OB_FAIL(get_proxy_scramble(proxy_scramble))) {
1878
LOG_WARN("get proxy scramble fail", K(ret));
1879
} else if (OB_FAIL(extract_real_scramble(proxy_scramble))) {
1880
LOG_WARN("extract real scramble fail", K(ret));
1881
} else if (OB_FAIL(get_client_ip(client_ip))) {
1882
LOG_WARN("get client ip fail", K(ret));
1883
} else if (OB_FAIL(set_proxy_version(conn))) {
1884
LOG_WARN("get proxy version fail", K(ret));
1886
} else if (conn.is_jdbc_client_ || conn.is_oci_client_) {
1887
if (OB_FAIL(check_common_property(conn, client_cap))) {
1888
LOG_WARN("fail to check common property", K(ret));
1890
// jdbc and oci will never use compressed mysql protocol
1891
client_cap.cap_flags_.OB_CLIENT_COMPRESS = 0;
1894
//login observer directly
1898
if (client_ip.empty()) {
1899
get_peer().ip_to_string(client_ip_buf_, common::MAX_IP_ADDR_LENGTH);
1900
const char *peer_ip = client_ip_buf_;
1901
client_ip_.assign_ptr(peer_ip, static_cast<int32_t>(STRLEN(peer_ip)));
1903
client_ip_ = client_ip;
1905
// Distinguish client addr port between proxy mode and direct connection mode
1906
if (conn.client_addr_port_ == 0) {
1907
client_port_ = get_peer().get_port();
1909
client_port_ = conn.client_addr_port_;
1911
hsr_.set_capability_flags(client_cap);
1912
conn.cap_flags_ = client_cap;
1916
int ObMPConnect::extract_real_scramble(const ObString &proxy_scramble)
1918
int ret = OB_SUCCESS;
1919
ObSMConnection &conn = *get_conn();
1920
if (OB_UNLIKELY(STRLEN(conn.scramble_buf_) != ObSMConnection::SCRAMBLE_BUF_SIZE)) {
1921
ret = OB_ERR_UNEXPECTED;
1922
LOG_WARN("server orign scramble is unexpected", "length", STRLEN(conn.scramble_buf_),
1923
K(conn.scramble_buf_), K(ret));
1925
if (ObSMConnection::SCRAMBLE_BUF_SIZE == proxy_scramble.length()) {
1926
unsigned char real_scramble_buf[ObSMConnection::SCRAMBLE_BUF_SIZE] = {0};
1927
// The value of '__proxy_scramble' is not real scramble of proxy
1928
// In fact, it __proxy_scramble = proxy's xor server's scramble, just for simple encrypt
1929
// Here we need get orig proxy's scramble by this -- proxy's scramble = __proxy_scramble xor server's scramble
1930
if (OB_FAIL(ObEncryptedHelper::my_xor(reinterpret_cast<const unsigned char *>(proxy_scramble.ptr()),
1931
reinterpret_cast<const unsigned char *>(conn.scramble_buf_),
1932
static_cast<uint32_t>(ObSMConnection::SCRAMBLE_BUF_SIZE),
1933
real_scramble_buf))) {
1934
LOG_WARN("failed to calc xor real_scramble_buf", K(ret));
1936
MEMCPY(conn.scramble_buf_, real_scramble_buf, ObSMConnection::SCRAMBLE_BUF_SIZE);
1939
const ObString old_scramble("aaaaaaaabbbbbbbbbbbb");
1940
MEMCPY(conn.scramble_buf_, old_scramble.ptr(), old_scramble.length());
1946
int ObMPConnect::verify_connection(const uint64_t tenant_id) const
1948
int ret = OB_SUCCESS;
1949
const char *IPV4_LOCAL_STR = "127.0.0.1";
1950
const char *IPV6_LOCAL_STR = "::1";
1951
ObSMConnection *conn = get_conn();
1954
//if normal tenant can not login with error variables, sys tenant can recover the error variables
1955
//but if sys tenant set error variables, no one can recover it.
1956
//so we need leave a backdoor for root@sys from 127.0.0.1 to skip this verifing
1957
if (OB_SYS_TENANT_ID == tenant_id
1958
&& 0 == user_name_.compare(OB_SYS_USER_NAME)
1959
&& (0 == client_ip_.compare(IPV4_LOCAL_STR)
1960
|| 0 == client_ip_.compare(IPV6_LOCAL_STR))) {
1961
LOG_DEBUG("this is root@sys user from local host, no need verify_ip_white_list", K(ret));
1962
} else if (OB_SYS_TENANT_ID == tenant_id
1963
&& (SS_INIT == GCTX.status_ || SS_STARTING == GCTX.status_)) {
1964
LOG_INFO("server is initializing, ignore verify_ip_white_list", "status", GCTX.status_, K(ret));
1965
} else if (OB_FAIL(verify_ip_white_list(tenant_id))) {
1966
LOG_WARN("failed to verify_ip_white_list", K(ret));
1968
const int64_t tenant_id = conn->tenant_id_;
1969
// sys tenant or root(SYS) user is considered as vip
1970
bool check_max_sess = tenant_id != OB_SYS_TENANT_ID;
1971
if (check_max_sess) {
1972
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
1973
if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(tenant_id, compat_mode))) {
1974
LOG_WARN("get_compat_mode failed", K(ret), K(tenant_id));
1975
} else if (Worker::CompatMode::MYSQL == compat_mode) {
1976
check_max_sess = user_name_.compare(OB_SYS_USER_NAME) != 0;
1977
} else if (Worker::CompatMode::ORACLE == compat_mode) {
1978
check_max_sess = user_name_.compare(OB_ORA_SYS_USER_NAME) != 0;
1981
if (OB_SUCC(ret) && check_max_sess) {
1982
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
1983
if (tenant_config.is_valid()) {
1984
int64_t max_sess_num = 0;
1985
int64_t sess_count = 0;
1986
MTL_SWITCH(tenant_id) {
1987
auto *tenant_base = MTL_CTX();
1988
max_sess_num = tenant_base->get_max_session_num(tenant_config->_resource_limit_max_session_num);
1993
if (max_sess_num != 0) {
1994
bool tenant_exists = false;
1995
uint64_t cur_connections = 0;
1996
if (OB_FAIL(gctx_.conn_res_mgr_->get_tenant_cur_connections(tenant_id, tenant_exists,
1997
cur_connections))) {
1998
LOG_WARN("fail to get session count", K(ret));
1999
} else if (tenant_exists && cur_connections >= max_sess_num) {
2000
ret = OB_ERR_CON_COUNT_ERROR;
2001
LOG_WARN("too much sessions", K(ret), K(tenant_id), K(cur_connections), K(max_sess_num),
2002
K(tenant_name_), K(user_name_));
2012
int ObMPConnect::check_update_tenant_id(ObSMConnection &conn, uint64_t &tenant_id)
2014
int ret = OB_SUCCESS;
2015
if (OB_FAIL(get_tenant_id(conn, tenant_id))) {
2016
if (OB_ERR_TENANT_IS_LOCKED == ret) {
2017
LOG_WARN("tenant is locked", K(ret), K_(tenant_name));
2018
LOG_USER_ERROR(OB_ERR_TENANT_IS_LOCKED, tenant_name_.length(), tenant_name_.ptr());
2020
LOG_WARN("get_tenant_id failed", K(ret));
2023
conn.tenant_id_ = tenant_id;
2024
conn.resource_group_id_ = tenant_id;
2025
if (OBCG_DIAG_TENANT == conn.group_id_) {
2026
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
2027
if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(tenant_id, compat_mode))) {
2028
LOG_WARN("get_compat_mode failed", K(ret), K(tenant_id));
2029
} else if (Worker::CompatMode::MYSQL == compat_mode) {
2030
user_name_ = ObString::make_string(OB_SYS_USER_NAME);
2031
} else if (Worker::CompatMode::ORACLE == compat_mode) {
2032
user_name_ = ObString::make_string(OB_ORA_SYS_USER_NAME);
2034
LOG_WARN("invalid compat mode", K(ret), K(tenant_id), K(compat_mode));
2039
if (rpc::ObRequest::TRANSPORT_PROTO_EASY == req_->get_nio_protocol()) {
2040
easy_connection_t *c = req_->get_ez_req()->ms->c;
2041
c->pool->mod_stat = easy_fetch_mod_stat(tenant_id);
2047
int ObMPConnect::verify_identify(ObSMConnection &conn, ObSQLSessionInfo &session,
2048
const uint64_t tenant_id)
2050
int ret = OB_SUCCESS;
2051
//at this point, tenant_id and sessid are valid
2052
ObSessionStatEstGuard guard(tenant_id, conn.sessid_);
2053
ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
2054
if (OB_ISNULL(req_)) {
2055
ret = OB_ERR_UNEXPECTED;
2056
LOG_ERROR("null request", K(ret));
2057
} else if (OB_FAIL(load_privilege_info(session))) {
2059
if (SS_INIT == GCTX.status_) {
2060
ret = OB_SERVER_IS_INIT;
2062
LOG_WARN("load privilege info fail", K(pre_ret), K(ret), K(GCTX.status_));
2063
} else if (ORACLE_MODE == session.get_compatibility_mode()
2064
&& 0 == hsr_.get_capability_flags().cap_flags_.OB_CLIENT_SUPPORT_ORACLE_MODE) {
2065
ret = OB_NOT_SUPPORTED;
2066
LOG_USER_ERROR(OB_NOT_SUPPORTED, "Oracle tenant for current client driver is");
2068
session.update_last_active_time();
2069
SQL_REQ_OP.get_sock_desc(req_, session.get_sock_desc());
2070
SQL_REQ_OP.set_sql_session_to_sock_desc(req_, (void *)&session);
2071
session.set_peer_addr(get_peer());
2072
session.set_client_addr(get_peer());
2073
session.set_trans_type(transaction::ObTxClass::USER);
2074
session.set_tenant(tenant_name_, tenant_id);
2075
session.set_proxy_cap_flags(conn.proxy_cap_flags_);
2076
session.set_login_tenant_id(tenant_id);
2077
session.set_client_non_standard(common::OB_CLIENT_NON_STANDARD == conn.client_type_ ? true : false);
2078
// Check tenant after set tenant session is necessary!
2079
// Because if another client is deleting this tenant while the
2080
// session doesn't has been contructed completely, omt
2081
// woundn't be awared of this session. So that this session
2082
// maybe run normally but tenant has already deleted.
2083
if (NULL != gctx_.omt_) {
2084
if (OB_FAIL(gctx_.omt_->get_tenant_with_tenant_lock(conn.resource_group_id_, *conn.handle_, conn.tenant_))) {
2085
LOG_WARN("can't get tenant", K_(conn.tenant_id), K(ret));
2086
} else if (OB_ISNULL(conn.tenant_)) {
2087
ret = OB_ERR_UNEXPECTED;
2088
LOG_ERROR("null tenant", K(ret), K(conn.tenant_id_));
2089
} else if (FALSE_IT(conn.is_tenant_locked_ = true)) {
2090
} else if (conn.tenant_->has_stopped()) {
2091
ret = OB_TENANT_NOT_IN_SERVER;
2092
LOG_WARN("tenant is deleting, reject connecting", K(ret), "tenant_id", conn.tenant_id_);
2096
//at this point, conn.tenant_id_ and sessid are already set and won't be modified
2097
if (conn.tenant_id_ != 0 && conn.sessid_ != 0) {
2098
EVENT_INC(ACTIVE_SESSIONS);
2099
conn.has_inc_active_num_ = true;
2102
// init_connect is not executed for users that have the super privilege
2104
&& !(OB_PRIV_SUPER & session.get_user_priv_set())) {
2106
if (OB_FAIL(session.get_init_connect(sql_str))) {
2107
LOG_WARN("get sys variable init_connect failed.", K(ret));
2109
if (0 == sql_str.compare("")) {
2112
if (OB_FAIL(init_connect_process(sql_str, session))) {
2113
LOG_WARN("init connect failed.", K(sql_str), K(ret));
2117
LOG_DEBUG("INIT_CONNECT", K(ret), K(sql_str));
2118
//a statement that has a error will causing client connections to fail
2119
if (OB_SUCCESS != ret) {
2126
if(OB_FAIL(session.set_session_state(SESSION_SLEEP))) {
2127
LOG_WARN("fail to set session state", K(ret));
2134
int ObMPConnect::verify_ip_white_list(const uint64_t tenant_id) const
2136
int ret = OB_SUCCESS;
2137
const ObTenantSchema *tenant_schema = NULL;
2138
const ObSysVariableSchema *sys_variable_schema = NULL;
2139
share::schema::ObSchemaGetterGuard schema_guard;
2140
ObString var_name(OB_SV_TCP_INVITED_NODES);
2141
const ObSysVarSchema *sysvar = NULL;
2142
if (OB_UNLIKELY(client_ip_.empty())) {
2143
ret = OB_ERR_UNEXPECTED;
2144
LOG_WARN("client_ip is empty", K(ret));
2145
} else if (0 == client_ip_.compare(UNIX_SOCKET_CLIENT_IP)) {
2146
LOG_INFO("match unix socket connection", K(tenant_id), K(client_ip_));
2147
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
2148
LOG_WARN("get_schema_guard failed", K(ret));
2149
} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
2150
LOG_WARN("get tenant info failed", K(ret));
2151
} else if (OB_ISNULL(tenant_schema)) {
2152
ret = OB_ERR_UNEXPECTED;
2153
LOG_WARN("tenant_schema is null", K(ret));
2154
} else if (OB_FAIL(schema_guard.get_sys_variable_schema(tenant_id, sys_variable_schema))) {
2155
LOG_WARN("get sys variable schema failed", K(ret));
2156
} else if (OB_ISNULL(sys_variable_schema)) {
2157
ret = OB_ERR_UNEXPECTED;
2158
LOG_WARN("sys variable schema is null", K(ret));
2159
} else if (OB_FAIL(sys_variable_schema->get_sysvar_schema(var_name, sysvar))) {
2160
LOG_WARN("fail to get_sysvar_schema", K(ret));
2162
ObString var_value = sysvar->get_value();
2163
if (!obsys::ObNetUtil::is_in_white_list(client_ip_, var_value)) {
2164
ret = OB_ERR_NO_PRIVILEGE;
2165
LOG_WARN("client is not invited into this tenant", K(ret));
2171
int ObMPConnect::convert_oracle_object_name(const uint64_t tenant_id, ObString &object_name)
2173
int ret = OB_SUCCESS;
2174
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::MYSQL;
2175
if (object_name.empty()) {
2176
//这里传进来的obj_name是可能为空的,所以不赋错误码
2177
LOG_DEBUG("object name is null when try to convert it");
2178
} else if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(tenant_id, compat_mode))) {
2179
LOG_WARN("fail to get tenant mode in convert_oracle_object_name", K(ret));
2180
} else if (compat_mode == lib::Worker::CompatMode::ORACLE) {
2181
if (object_name.length() > 1 &&
2182
'\"' == object_name[0] &&
2183
'\"' == object_name[object_name.length() - 1]) {
2184
//如果object_name是带上了双引号,则不作任何转换
2186
if (2 != object_name.length()) {
2187
object_name.assign_ptr(object_name.ptr() + 1, object_name.length() - 2);
2189
object_name.reset();
2192
ObCharset::caseup(CS_TYPE_UTF8MB4_BIN, object_name);
2198
int ObMPConnect::set_proxy_version(ObSMConnection &conn)
2200
int ret = OB_SUCCESS;
2201
bool is_found = false;
2203
const char *proxy_version_str = NULL;
2205
key_str.assign_ptr(OB_MYSQL_PROXY_VEERSION,
2206
static_cast<int32_t>(STRLEN(OB_MYSQL_PROXY_VEERSION)));
2207
for (int64_t i = 0; !is_found && i < hsr_.get_connect_attrs().count(); ++i) {
2208
const ObStringKV &kv = hsr_.get_connect_attrs().at(i);
2209
if (key_str == kv.key_) {
2210
proxy_version_str = kv.value_.ptr();
2211
length = kv.value_.length();
2215
int64_t min_len = 5;//传过来的合法version字符串最短的为“1.1.1”,长度至少为5
2216
if (!is_found || OB_ISNULL(proxy_version_str) || length < min_len) {
2217
conn.proxy_version_ = 0;
2219
const int64_t VERSION_ITEM = 3;//版本号只需要取前三位就行,比如“1.7.6.1” 只需要取“1.7.6” 就能决定;
2220
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
2221
char *buff = static_cast<char *>(allocator.alloc(length + 1));
2222
if (OB_ISNULL(buff)) {
2223
ret = OB_SIZE_OVERFLOW;
2224
LOG_WARN("failed to alloc memory.", K(buff), K(ret));
2226
memset(buff, 0, length + 1);
2227
int64_t cur_item = 0;
2228
for (int64_t i = 0; cur_item != VERSION_ITEM && i < length; ++i) {
2229
if (proxy_version_str[i] == '.') {
2232
if (cur_item != VERSION_ITEM) {
2233
buff[i] = proxy_version_str[i];
2236
if (OB_FAIL(ObClusterVersion::get_version(buff, conn.proxy_version_))) {
2237
LOG_WARN("failed to get version", K(ret));
2238
} else {/*do nothing*/}
2244
int ObMPConnect::set_client_version(ObSMConnection &conn)
2246
int ret = OB_SUCCESS;
2247
bool is_found = false;
2249
const char *client_version_str = NULL;
2251
key_str.assign_ptr(OB_MYSQL_CLIENT_VERSION,
2252
static_cast<int32_t>(STRLEN(OB_MYSQL_CLIENT_VERSION)));
2253
for (int64_t i = 0; !is_found && i < hsr_.get_connect_attrs().count(); ++i) {
2254
const ObStringKV &kv = hsr_.get_connect_attrs().at(i);
2255
if (key_str == kv.key_) {
2256
client_version_str = kv.value_.ptr();
2257
length = kv.value_.length();
2261
int64_t min_len = 5;//传过来的合法version字符串最短的为“1.1.1”,长度至少为5
2262
if (!is_found || OB_ISNULL(client_version_str) || length < min_len) {
2263
conn.client_version_ = 0;
2265
const int64_t VERSION_ITEM = 3;//版本号只需要取前三位就行,比如“1.7.6.1” 只需要取“1.7.6” 就能决定;
2266
char buff[OB_MAX_VERSION_LENGTH];
2267
memset(buff, 0, OB_MAX_VERSION_LENGTH);
2268
int64_t cur_item = 0;
2269
for (int64_t i = 0; cur_item != VERSION_ITEM && i < length; ++i) {
2270
if (client_version_str[i] == '.') {
2273
if (cur_item != VERSION_ITEM) {
2274
buff[i] = client_version_str[i];
2277
if (OB_FAIL(ObClusterVersion::get_version(buff, conn.client_version_))) {
2278
LOG_WARN("failed to get version", K(ret));
2279
} else {/*do nothing*/}