2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
17
#include "lib/worker.h"
18
#include "lib/profile/ob_trace_id.h"
19
#include "lib/profile/ob_perf_event.h"
20
#include "lib/stat/ob_diagnose_info.h"
21
#include "lib/stat/ob_session_stat.h"
22
#include "lib/string/ob_sql_string.h"
23
#include "lib/utility/ob_macro_utils.h"
24
#include "lib/utility/utility.h"
25
#include "rpc/ob_request.h"
26
#include "rpc/obmysql/ob_mysql_packet.h"
27
#include "rpc/obmysql/packet/ompk_change_user.h"
28
#include "rpc/obmysql/packet/ompk_error.h"
29
#include "rpc/obmysql/packet/ompk_ok.h"
30
#include "rpc/obmysql/packet/ompk_eof.h"
31
#include "rpc/obmysql/packet/ompk_row.h"
32
#include "observer/mysql/obsm_row.h"
33
#include "observer/mysql/obsm_utils.h" // ObSMUtils
34
#include "rpc/obmysql/ob_mysql_request_utils.h"
35
#include "share/config/ob_server_config.h"
36
#include "share/config/ob_server_config.h"
37
#include "share/inner_table/ob_inner_table_schema_constants.h"
38
#include "share/client_feedback/ob_feedback_partition_struct.h"
39
#include "share/resource_manager/ob_resource_manager.h"
40
#include "sql/session/ob_sql_session_mgr.h"
41
#include "observer/mysql/obmp_utils.h"
42
#include "rpc/obmysql/obsm_struct.h"
43
#include "observer/mysql/ob_mysql_result_set.h"
44
#include "observer/mysql/ob_query_driver.h"
45
#include "share/config/ob_server_config.h"
46
#include "storage/tx/ob_trans_define.h"
47
#include "share/ob_lob_access_utils.h"
48
#include "sql/monitor/flt/ob_flt_utils.h"
49
#include "sql/session/ob_sess_info_verify.h"
50
#ifdef OB_BUILD_ORACLE_XML
51
#include "sql/engine/expr/ob_expr_xml_func_helper.h"
57
using namespace obmysql;
58
using namespace common;
59
using namespace transaction;
60
using namespace share::schema;
69
ObMPBase::ObMPBase(const ObGlobalContext &gctx)
70
: gctx_(gctx), process_timestamp_(0), proxy_version_(0)
76
// wakeup_request内部会判断has_req_wakeup_标,
77
// 这里调一次兜底异常路径忘记flush_buffer的场景
78
if (!THIS_WORKER.need_retry()) {
79
packet_sender_.finish_sql_request();
83
int ObMPBase::response(const int retcode)
87
if (!THIS_WORKER.need_retry()) {
88
if (OB_FAIL(flush_buffer(true))) {
89
LOG_WARN("failed to flush_buffer", K(ret));
95
int ObMPBase::setup_packet_sender()
98
if (OB_FAIL(packet_sender_.init(req_))) {
99
LOG_ERROR("packet sender init fail", KP(req_), K(ret));
100
send_error_packet(ret, NULL);
105
int ObMPBase::before_process()
107
int ret = OB_SUCCESS;
108
process_timestamp_ = common::ObTimeUtility::current_time();
112
int ObMPBase::update_transmission_checksum_flag(const ObSQLSessionInfo &session)
114
return packet_sender_.update_transmission_checksum_flag(session);
117
int ObMPBase::update_proxy_sys_vars(ObSQLSessionInfo &session)
119
int ret = OB_SUCCESS;
120
ObSMConnection* conn = get_conn();
121
if (OB_UNLIKELY(NULL == conn)) {
122
ret = OB_CONNECT_ERROR;
123
LOG_WARN("connection in error, maybe has disconnected", K(ret));
124
} else if (OB_FAIL(session.set_proxy_user_privilege(session.get_user_priv_set()))) {
125
LOG_WARN("fail to set proxy user privilege system variables", K(ret));
126
} else if (OB_FAIL(session.set_proxy_capability(conn->proxy_cap_flags_.capability_))) {
127
LOG_WARN("fail to set proxy capability", K(ret));
132
int ObMPBase::after_process(int error_code)
134
int ret = OB_SUCCESS;
135
if (!lib::is_diagnose_info_enabled()) {
137
NG_TRACE_EXT(process_end, OB_ID(run_ts), get_run_timestamp());
138
const int64_t elapsed_time = common::ObTimeUtility::current_time() - get_receive_timestamp();
139
bool is_slow = (elapsed_time > GCONF.trace_log_slow_query_watermark)
140
&& !THIS_WORKER.need_retry();
142
if (THIS_WORKER.need_retry() && OB_TRY_LOCK_ROW_CONFLICT == error_code) {
143
//如果是锁冲突,且接下来会重试,则不需要打印这条日志了
145
FORCE_PRINT_TRACE(THE_TRACE, "[slow query]");
147
// slow query will flush cache
150
} else if (can_force_print(error_code)) {
151
// 需要打印TRACE日志的错误码添加在这里
152
int process_ret = error_code;
153
NG_TRACE_EXT(process_ret, OB_Y(process_ret));
154
FORCE_PRINT_TRACE(THE_TRACE, "[err query]");
155
} else if (THIS_WORKER.need_retry()) {
156
if (OB_TRY_LOCK_ROW_CONFLICT != error_code) {
157
FORCE_PRINT_TRACE(THE_TRACE, "[packet retry query]");
160
PRINT_TRACE(THE_TRACE);
163
if (common::OB_SUCCESS != error_code) {
167
ObFLTUtils::clean_flt_env();
171
void ObMPBase::cleanup()
173
ObActiveSessionGuard::setup_default_ash();
176
void ObMPBase::disconnect()
178
return packet_sender_.disconnect();
181
void ObMPBase::force_disconnect()
183
return packet_sender_.force_disconnect();
186
int ObMPBase::clean_buffer()
188
return packet_sender_.clean_buffer();
191
int ObMPBase::flush_buffer(const bool is_last)
193
return packet_sender_.is_disable_response()? OB_SUCCESS: packet_sender_.flush_buffer(is_last);
196
ObSMConnection* ObMPBase::get_conn() const
198
return packet_sender_.get_conn();
201
int ObMPBase::get_conn_id(uint32_t &sessid) const
203
return packet_sender_.get_conn_id(sessid);
206
int ObMPBase::read_packet(obmysql::ObICSMemPool& mem_pool, obmysql::ObMySQLPacket *&pkt)
208
return packet_sender_.read_packet(mem_pool, pkt);
211
int ObMPBase::release_packet(obmysql::ObMySQLPacket* pkt)
213
return packet_sender_.release_packet(pkt);
215
int ObMPBase::send_error_packet(int err,
217
bool is_partition_hit /* = true */,
218
void *extra_err_info /* = NULL */)
220
return packet_sender_.send_error_packet(err, errmsg, is_partition_hit, extra_err_info);
223
int ObMPBase::send_switch_packet(ObString &auth_name, ObString& auth_data)
225
int ret = OB_SUCCESS;
226
OMPKChangeUser packet;
227
packet.set_auth_plugin_name(auth_name);
228
packet.set_auth_response(auth_data);
229
if (OB_FAIL(response_packet(packet, NULL))) {
230
LOG_WARN("failed to send switch packet", K(packet), K(ret));
235
int ObMPBase::load_system_variables(const ObSysVariableSchema &sys_variable_schema, ObSQLSessionInfo &session) const
237
int ret = OB_SUCCESS;
238
ObArenaAllocator calc_buf(ObModIds::OB_SQL_SESSION);
239
for (int64_t i = 0; OB_SUCC(ret) && i < sys_variable_schema.get_sysvar_count(); ++i) {
240
const ObSysVarSchema *sysvar = NULL;
241
sysvar = sys_variable_schema.get_sysvar_schema(i);
242
if (sysvar != NULL) {
243
LOG_DEBUG("load system variable", K(*sysvar));
244
if (OB_FAIL(session.load_sys_variable(calc_buf, sysvar->get_name(), sysvar->get_data_type(),
245
sysvar->get_value(), sysvar->get_min_val(),
246
sysvar->get_max_val(), sysvar->get_flags(), true))) {
247
LOG_WARN("load sys variable failed", K(ret), K(*sysvar));
253
session.set_global_vars_version(sys_variable_schema.get_schema_version());
255
if (OB_FAIL(session.gen_sys_var_in_pc_str())) {
256
LOG_WARN("fail to gen sys var in pc str", K(ret));
257
} else if (OB_FAIL(session.gen_configs_in_pc_str())) {
258
LOG_WARN("fail to gen configs in pc string", K(ret));
264
int ObMPBase::send_ok_packet(ObSQLSessionInfo &session, ObOKPParam &ok_param, obmysql::ObMySQLPacket* pkt)
266
return packet_sender_.send_ok_packet(session, ok_param, pkt);
269
int ObMPBase::send_eof_packet(const ObSQLSessionInfo &session, const ObMySQLResultSet &result, ObOKPParam *ok_param)
271
return packet_sender_.send_eof_packet(session, result, ok_param);
274
int ObMPBase::create_session(ObSMConnection *conn, ObSQLSessionInfo *&sess_info)
276
int ret = OB_SUCCESS;
277
if (OB_ISNULL(conn)) {
278
ret = OB_ERR_UNEXPECTED;
279
LOG_ERROR("get connection fail", K(ret));
280
} else if (OB_ISNULL(gctx_.session_mgr_)) {
281
ret = OB_ERR_UNEXPECTED;
282
LOG_ERROR("session manager is null", K(ret));
284
if (OB_FAIL(gctx_.session_mgr_->create_session(conn, sess_info))) {
285
LOG_WARN("create session fail", "sessid", conn->sessid_,
286
"proxy_sessid", conn->proxy_sessid_, K(ret));
288
LOG_DEBUG("create session successfully", "sessid", conn->sessid_,
289
"proxy_sessid", conn->proxy_sessid_);
290
conn->is_sess_alloc_ = true;
291
sess_info->set_user_session();
292
sess_info->set_shadow(false);
293
if (SQL_REQ_OP.get_sql_ssl_st(req_) != NULL) {
294
sess_info->set_ssl_cipher(SSL_get_cipher_name((SSL*)SQL_REQ_OP.get_sql_ssl_st(req_)));
296
sess_info->set_ssl_cipher("");
298
sess_info->set_client_sessid(conn->client_sessid_);
299
sess_info->gen_gtt_session_scope_unique_id();
300
sess_info->gen_gtt_trans_scope_unique_id();
306
int ObMPBase::free_session()
308
int ret = OB_SUCCESS;
309
ObSMConnection* conn = NULL;
310
if (NULL == (conn = packet_sender_.get_conn())) {
311
ret = OB_CONNECT_ERROR;
312
LOG_WARN("connection already disconnected", K(ret));
313
} else if (OB_ISNULL(gctx_.session_mgr_)) {
314
ret = OB_ERR_UNEXPECTED;
315
LOG_ERROR("session manager is null", K(ret));
317
bool is_need_clear = false;
318
ObFreeSessionCtx ctx;
319
ctx.tenant_id_ = conn->tenant_id_;
320
ctx.sessid_ = conn->sessid_;
321
ctx.proxy_sessid_ = conn->proxy_sessid_;
322
ctx.has_inc_active_num_ = conn->has_inc_active_num_;
323
if (OB_FAIL(gctx_.session_mgr_->free_session(ctx))) {
324
LOG_WARN("fail to free session", K(ctx), K(ret));
326
LOG_INFO("free session successfully", K(ctx));
327
conn->is_sess_free_ = true;
328
if (OB_UNLIKELY(OB_FAIL(sql::ObSQLSessionMgr::is_need_clear_sessid(conn, is_need_clear)))) {
329
LOG_ERROR("fail to judge need clear", K(ret), "sessid", conn->sessid_, "server_id", GCTX.server_id_);
330
} else if (is_need_clear) {
331
if (OB_FAIL(GCTX.session_mgr_->mark_sessid_unused(conn->sessid_))) {
332
LOG_WARN("mark session id unused failed", K(ret), "sessid", conn->sessid_);
334
LOG_INFO("mark session id unused", "sessid", conn->sessid_);
342
int ObMPBase::get_session(ObSQLSessionInfo *&sess_info)
344
return packet_sender_.get_session(sess_info);
347
int ObMPBase::revert_session(ObSQLSessionInfo *sess_info)
349
return packet_sender_.revert_session(sess_info);
352
int ObMPBase::init_process_var(sql::ObSqlCtx &ctx,
353
const ObMultiStmtItem &multi_stmt_item,
354
sql::ObSQLSessionInfo &session) const
356
int ret = OB_SUCCESS;
357
bool enable_udr = false;
358
if (!packet_sender_.is_conn_valid()) {
359
ret = OB_CONNECT_ERROR;
360
LOG_WARN("connection already disconnected", K(ret));
362
const int64_t debug_sync_timeout = GCONF.debug_sync_timeout;
363
// ignore session debug sync action actions to thread local actions error
364
if (debug_sync_timeout > 0) {
365
int tmp_ret = GDS.set_thread_local_actions(session.get_debug_sync_actions());
366
if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
367
LOG_WARN("set session debug sync actions to thread local actions failed", K(tmp_ret));
370
// construct sql context
371
ctx.multi_stmt_item_ = multi_stmt_item;
372
ctx.session_info_ = &session;
373
session.set_rpc_tenant_id(THIS_WORKER.get_rpc_tenant());
374
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
376
if (0 == multi_stmt_item.get_seq_num()) {
378
ctx.can_reroute_sql_ = (pkt.can_reroute_pkt() && get_conn()->is_support_proxy_reroute());
380
ctx.is_protocol_weak_read_ = pkt.is_weak_read();
381
ctx.set_enable_strict_defensive_check(GCONF.enable_strict_defensive_check());
382
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(session.get_effective_tenant_id()));
383
if (tenant_config.is_valid()) {
384
enable_udr = tenant_config->enable_user_defined_rewrite_rules;
386
ctx.set_enable_user_defined_rewrite(enable_udr);
387
LOG_TRACE("protocol flag info", K(ctx.can_reroute_sql_), K(ctx.is_protocol_weak_read_),
388
K(ctx.get_enable_strict_defensive_check()), K(enable_udr));
393
//外层调用会忽略do_after_process的错误码,因此这里将set_session_state的错误码返回也没有意义。
394
//因此这里忽略set_session_state错误码,warning buffer的reset和trace log 记录的流程不收影响。
395
int ObMPBase::do_after_process(sql::ObSQLSessionInfo &session,
397
bool async_resp_used) const
399
int ret = OB_SUCCESS;
401
// reset warning buffers
402
// 注意,此处req_has_wokenup_可能为true,不能再访问req对象
404
if (!async_resp_used) { // 异步回包不重置warning buffer,重置操作在callback中做
405
session.reset_warnings_buf();
406
session.set_session_sleep();
408
// clear tsi warning buffer
409
ob_setup_tsi_warning_buffer(NULL);
413
int ObMPBase::record_flt_trace(sql::ObSQLSessionInfo &session) const
415
int ret = OB_SUCCESS;
417
if (lib::is_diagnose_info_enabled()) {
420
if (session.is_use_trace_log()) {
422
// show trace will always show last request info
423
if (OB_FAIL(ObFLTUtils::clean_flt_show_trace_env(session))) {
424
LOG_WARN("failed to clean flt show trace env", K(ret));
427
// not need to record
430
if (OB_FAIL(session.set_last_flt_trace_id(trace_id))) {
431
LOG_WARN("failed to reset last flt trace id", K(ret));
438
int ObMPBase::setup_user_resource_group(
439
ObSMConnection &conn,
440
const uint64_t tenant_id,
441
sql::ObSQLSessionInfo *session)
443
int ret = OB_SUCCESS;
444
uint64_t group_id = 0;
445
uint64_t user_id = session->get_user_id();
446
if (OB_INVALID_ID != session->get_expect_group_id()) {
447
// Session->expected_group_id_ is set when hit plan cache or resolve a query, and find that
448
// expcted group is consistent with current group.
449
// Set group_id of req_ so that the req_ will be put in the corresponding queue when do packet retry.
451
req_->set_group_id(session->get_expect_group_id());
453
// also set conn.group_id_. It means use current consumer group when execute next query for first time.
454
conn.group_id_ = session->get_expect_group_id();
455
// reset to invalid because session.expected_group_id is single_use.
456
session->set_expect_group_id(OB_INVALID_ID);
457
} else if (!is_valid_tenant_id(tenant_id)) {
458
ret = OB_INVALID_ARGUMENT;
459
LOG_WARN("Invalid tenant", K(tenant_id), K(ret));
460
} else if (conn.group_id_ == OBCG_DIAG_TENANT) {
461
// OBCG_DIAG_TENANT was set in check_update_tenant_id, DO NOT overlap it.
462
} else if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_user(
463
tenant_id, user_id, group_id))) {
464
LOG_WARN("fail get group id by user", K(user_id), K(tenant_id), K(ret));
466
// 将 group id 设置到调度层,之后这个 session 上的所有请求都是用这个 cgroup 的资源
467
conn.group_id_ = group_id;
469
LOG_DEBUG("setup user resource group", K(user_id), K(tenant_id), K(ret));
473
// force refresh schema if local schema version < last schema version
474
int ObMPBase::check_and_refresh_schema(uint64_t login_tenant_id,
475
uint64_t effective_tenant_id,
476
ObSQLSessionInfo *session_info)
478
int ret = OB_SUCCESS;
479
int64_t local_version = 0;
480
int64_t last_version = 0;
482
if (login_tenant_id != effective_tenant_id) {
485
} else if (OB_ISNULL(gctx_.schema_service_)) {
486
ret = OB_INVALID_ARGUMENT;
487
LOG_WARN("null schema service", K(ret), K(gctx_));
489
bool need_revert_session = false;
490
if (NULL == session_info) {
491
if (OB_FAIL(get_session(session_info))) {
492
LOG_WARN("get session failed");
493
} else if (OB_ISNULL(session_info)) {
494
ret = OB_INVALID_ARGUMENT;
495
LOG_WARN("invalid session info", K(ret), K(session_info));
497
need_revert_session = true;
501
if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(effective_tenant_id, local_version))) {
502
LOG_WARN("fail to get tenant refreshed schema version", K(ret));
503
} else if (OB_FAIL(session_info->get_ob_last_schema_version(last_version))) {
504
LOG_WARN("failed to get_sys_variable", K(OB_SV_LAST_SCHEMA_VERSION));
505
} else if (local_version >= last_version) {
507
} else if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(effective_tenant_id, last_version))) {
508
LOG_WARN("failed to refresh schema", K(ret), K(effective_tenant_id), K(last_version));
510
if (need_revert_session && OB_LIKELY(NULL != session_info)) {
511
revert_session(session_info);
518
int ObMPBase::response_row(ObSQLSessionInfo &session,
519
common::ObNewRow &row,
520
const ColumnsFieldIArray *fields,
523
int ret = OB_SUCCESS;
524
ObArenaAllocator allocator;
526
bool has_charset_convert = false;
527
if (OB_ISNULL(fields) || row.get_count() != fields->count()) {
528
ret = OB_INVALID_ARGUMENT;
529
LOG_WARN("fields is null", K(ret), KP(fields));
530
} else if (OB_FAIL(ob_write_row(allocator, row, tmp_row))) {
531
LOG_WARN("deep copy row fail.", K(ret));
533
for (int64_t i = 0; OB_SUCC(ret) && i < tmp_row.get_count(); ++i) {
534
ObObj &value = tmp_row.get_cell(i);
535
ObCharsetType charset_type = CHARSET_INVALID;
536
ObCharsetType ncharset_type = CHARSET_INVALID;
538
if (!is_packed && value.get_type() != fields->at(i).type_.get_type()) {
539
ObCastCtx cast_ctx(&allocator, NULL, CM_WARN_ON_FAIL, fields->at(i).type_.get_collation_type());
540
if (ObDecimalIntType == fields->at(i).type_.get_type()) {
541
cast_ctx.res_accuracy_ = const_cast<ObAccuracy*>(&fields->at(i).accuracy_);
543
if (OB_FAIL(common::ObObjCaster::to_type(fields->at(i).type_.get_type(),
547
LOG_WARN("failed to cast object", K(ret), K(value), K(i),
548
K(value.get_type()), K(fields->at(i).type_.get_type()));
552
} else if (is_packed) {
554
} else if (OB_FAIL(session.get_character_set_results(charset_type))) {
555
LOG_WARN("fail to get result charset", K(ret));
556
} else if (OB_FAIL(session.get_ncharacter_set_connection(ncharset_type))) {
557
LOG_WARN("fail to get result charset", K(ret));
559
if (lib::is_oracle_mode()
560
&& (value.is_nchar() || value.is_nvarchar2())
561
&& ncharset_type != CHARSET_INVALID
562
&& ncharset_type != CHARSET_BINARY) {
563
charset_type = ncharset_type;
565
if (ob_is_string_tc(value.get_type())
566
&& CS_TYPE_INVALID != value.get_collation_type()
567
&& OB_FAIL(value.convert_string_value_charset(charset_type, allocator))) {
568
LOG_WARN("convert string value charset failed", K(ret), K(value));
569
} else if (value.is_clob_locator()
570
&& OB_FAIL(ObQueryDriver::convert_lob_value_charset(value, charset_type, allocator))) {
571
LOG_WARN("convert lob value charset failed", K(ret));
572
} else if (ob_is_text_tc(value.get_type())
573
&& OB_FAIL(ObQueryDriver::convert_text_value_charset(value, charset_type, allocator, &session))) {
574
LOG_WARN("convert text value charset failed", K(ret));
577
} else if(OB_FAIL(ObQueryDriver::process_lob_locator_results(value,
578
session.is_client_use_lob_locator(),
579
session.is_client_support_lob_locatorv2(),
582
LOG_WARN("convert lob locator to longtext failed", K(ret));
583
#ifdef OB_BUILD_ORACLE_XML
584
} else if (value.is_user_defined_sql_type()
585
&& OB_FAIL(ObXMLExprHelper::process_sql_udt_results(value,
588
LOG_WARN("convert udt to client format failed", K(ret), K(value.get_udt_subschema_id()));
595
const ObDataTypeCastParams dtc_params = ObBasicSessionInfo::create_dtc_params(&session);
596
ObSMRow sm_row(obmysql::BINARY, tmp_row, dtc_params, fields);
597
sm_row.set_packed(is_packed);
598
obmysql::OMPKRow rp(sm_row);
599
rp.set_is_packed(is_packed);
600
if (OB_FAIL(response_packet(rp, &session))) {
601
ret = OB_ERR_UNEXPECTED;
602
LOG_WARN("response packet fail", K(ret));
609
int ObMPBase::process_extra_info(sql::ObSQLSessionInfo &session,
610
const obmysql::ObMySQLRawPacket &pkt, bool &need_response_error)
612
int ret = OB_SUCCESS;
613
SessionInfoVerifacation sess_info_verification;
614
LOG_TRACE("process extra info", K(ret),K(pkt.get_extra_info().exist_sess_info_veri()));
615
if (FALSE_IT(session.set_has_query_executed(true))) {
616
} else if (pkt.get_extra_info().exist_sync_sess_info()
617
&& OB_FAIL(ObMPUtils::sync_session_info(session,
618
pkt.get_extra_info().get_sync_sess_info()))) {
619
// won't response error, disconnect will let proxy sens failure
620
need_response_error = false;
621
LOG_WARN("fail to update sess info", K(ret));
622
} else if (pkt.get_extra_info().exist_sess_info_veri()
623
&& OB_FAIL(ObSessInfoVerify::sync_sess_info_veri(session,
624
pkt.get_extra_info().get_sess_info_veri(),
625
sess_info_verification))) {
626
LOG_WARN("fail to get verify info requied", K(ret));
627
} else if (pkt.get_extra_info().exist_sess_info_veri() &&
628
pkt.is_proxy_switch_route() &&
629
OB_FAIL(ObSessInfoVerify::verify_session_info(session,
630
sess_info_verification))) {
631
LOG_WARN("fail to verify sess info", K(ret));
636
// The obmp layer handles the kill client session logic.
637
int ObMPBase::process_kill_client_session(sql::ObSQLSessionInfo &session, bool is_connect)
639
int ret = OB_SUCCESS;
640
uint64_t create_time = 0;
641
if (OB_ISNULL(gctx_.session_mgr_)) {
642
ret = OB_ERR_UNEXPECTED;
643
LOG_ERROR("invalid session mgr", K(ret), K(gctx_));
644
} else if (OB_UNLIKELY(session.is_mark_killed())) {
645
ret = OB_ERR_KILL_CLIENT_SESSION;
646
LOG_WARN("client session need be killed", K(session.get_session_state()),
647
K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(),
648
K(session.get_client_sessid()), K(ret));
649
} else if (is_connect) {
650
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != (gctx_.session_mgr_->get_kill_client_sess_map().
651
get_refactored(session.get_client_sessid(), create_time)))) {
652
if (session.get_client_create_time() == create_time) {
653
ret = OB_ERR_KILL_CLIENT_SESSION;
654
LOG_WARN("client session need be killed", K(session.get_session_state()),
655
K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(),
656
K(session.get_client_sessid()), K(ret),K(create_time));
658
LOG_DEBUG("client session is created later", K(create_time),
659
K(session.get_client_create_time()),
660
K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(),
661
K(session.get_client_sessid()));
669
int ObMPBase::update_charset_sys_vars(ObSMConnection &conn, ObSQLSessionInfo &sess_info)
671
int ret = OB_SUCCESS;
672
int64_t cs_type = conn.client_cs_type_;
673
const int64_t LATIN1_CS = 8;
674
//background: mysqltest give a default connect_charset=latin1
675
// but for history reason, oceanbase use utf8 as
676
// default charset for mysqltest
677
//TODO: after obclient&mysqltest support default charset = utf8
678
// login for cs_type != LATIN1_CS would be deleted
679
if (ObCharset::is_valid_collation(cs_type)) {
680
if (OB_FAIL(sess_info.update_sys_variable(SYS_VAR_CHARACTER_SET_CLIENT, cs_type))) {
681
SQL_ENG_LOG(WARN, "failed to update sys var", K(ret));
682
} else if (OB_FAIL(sess_info.update_sys_variable(SYS_VAR_CHARACTER_SET_RESULTS, cs_type))) {
683
SQL_ENG_LOG(WARN, "failed to update sys var", K(ret));
684
} else if (OB_FAIL(sess_info.update_sys_variable(SYS_VAR_CHARACTER_SET_CONNECTION, cs_type))) {
685
SQL_ENG_LOG(WARN, "failed to update sys var", K(ret));
686
} else if (OB_FAIL(sess_info.update_sys_variable(SYS_VAR_COLLATION_CONNECTION, cs_type))) {
687
SQL_ENG_LOG(WARN, "failed to update sys var", K(ret));
693
} // namespace observer
694
} // namespace oceanbase