oceanbase

Форк
0
/
obmp_base.cpp 
694 строки · 25.5 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "obmp_base.h"
16

17
#include "lib/worker.h"
18
#include "lib/profile/ob_trace_id.h"
19
#include "lib/profile/ob_perf_event.h"
20
#include "lib/stat/ob_diagnose_info.h"
21
#include "lib/stat/ob_session_stat.h"
22
#include "lib/string/ob_sql_string.h"
23
#include "lib/utility/ob_macro_utils.h"
24
#include "lib/utility/utility.h"
25
#include "rpc/ob_request.h"
26
#include "rpc/obmysql/ob_mysql_packet.h"
27
#include "rpc/obmysql/packet/ompk_change_user.h"
28
#include "rpc/obmysql/packet/ompk_error.h"
29
#include "rpc/obmysql/packet/ompk_ok.h"
30
#include "rpc/obmysql/packet/ompk_eof.h"
31
#include "rpc/obmysql/packet/ompk_row.h"
32
#include "observer/mysql/obsm_row.h"
33
#include "observer/mysql/obsm_utils.h"            // ObSMUtils
34
#include "rpc/obmysql/ob_mysql_request_utils.h"
35
#include "share/config/ob_server_config.h"
36
#include "share/config/ob_server_config.h"
37
#include "share/inner_table/ob_inner_table_schema_constants.h"
38
#include "share/client_feedback/ob_feedback_partition_struct.h"
39
#include "share/resource_manager/ob_resource_manager.h"
40
#include "sql/session/ob_sql_session_mgr.h"
41
#include "observer/mysql/obmp_utils.h"
42
#include "rpc/obmysql/obsm_struct.h"
43
#include "observer/mysql/ob_mysql_result_set.h"
44
#include "observer/mysql/ob_query_driver.h"
45
#include "share/config/ob_server_config.h"
46
#include "storage/tx/ob_trans_define.h"
47
#include "share/ob_lob_access_utils.h"
48
#include "sql/monitor/flt/ob_flt_utils.h"
49
#include "sql/session/ob_sess_info_verify.h"
50
#ifdef OB_BUILD_ORACLE_XML
51
#include "sql/engine/expr/ob_expr_xml_func_helper.h"
52
#endif
53
namespace oceanbase
54
{
55
using namespace share;
56
using namespace rpc;
57
using namespace obmysql;
58
using namespace common;
59
using namespace transaction;
60
using namespace share::schema;
61
namespace sql
62
{
63
  class ObPiece;
64
}
65

66
namespace observer
67
{
68

69
ObMPBase::ObMPBase(const ObGlobalContext &gctx)
70
    : gctx_(gctx), process_timestamp_(0), proxy_version_(0)
71
{
72
}
73

74
ObMPBase::~ObMPBase()
75
{
76
  // wakeup_request内部会判断has_req_wakeup_标,
77
  // 这里调一次兜底异常路径忘记flush_buffer的场景
78
  if (!THIS_WORKER.need_retry()) {
79
    packet_sender_.finish_sql_request();
80
  }
81
}
82

83
int ObMPBase::response(const int retcode)
84
{
85
  UNUSED(retcode);
86
  int ret = OB_SUCCESS;
87
  if (!THIS_WORKER.need_retry()) {
88
    if (OB_FAIL(flush_buffer(true))) {
89
      LOG_WARN("failed to flush_buffer", K(ret));
90
    }
91
  }
92
  return ret;
93
}
94

95
int ObMPBase::setup_packet_sender()
96
{
97
  int ret = OB_SUCCESS;
98
  if (OB_FAIL(packet_sender_.init(req_))) {
99
    LOG_ERROR("packet sender init fail", KP(req_), K(ret));
100
    send_error_packet(ret, NULL);
101
  }
102
  return ret;
103
}
104

105
int ObMPBase::before_process()
106
{
107
  int ret = OB_SUCCESS;
108
  process_timestamp_ = common::ObTimeUtility::current_time();
109
  return ret;
110
}
111

112
int ObMPBase::update_transmission_checksum_flag(const ObSQLSessionInfo &session)
113
{
114
  return packet_sender_.update_transmission_checksum_flag(session);
115
}
116

117
int ObMPBase::update_proxy_sys_vars(ObSQLSessionInfo &session)
118
{
119
  int ret = OB_SUCCESS;
120
  ObSMConnection* conn = get_conn();
121
  if (OB_UNLIKELY(NULL == conn)) {
122
    ret = OB_CONNECT_ERROR;
123
    LOG_WARN("connection in error, maybe has disconnected", K(ret));
124
  } else if (OB_FAIL(session.set_proxy_user_privilege(session.get_user_priv_set()))) {
125
    LOG_WARN("fail to set proxy user privilege system variables", K(ret));
126
  } else if (OB_FAIL(session.set_proxy_capability(conn->proxy_cap_flags_.capability_))) {
127
    LOG_WARN("fail to set proxy capability", K(ret));
128
  }
129
  return ret;
130
}
131

132
int ObMPBase::after_process(int error_code)
133
{
134
  int ret = OB_SUCCESS;
135
  if (!lib::is_diagnose_info_enabled()) {
136
  } else {
137
    NG_TRACE_EXT(process_end, OB_ID(run_ts), get_run_timestamp());
138
    const int64_t elapsed_time = common::ObTimeUtility::current_time() - get_receive_timestamp();
139
    bool is_slow = (elapsed_time > GCONF.trace_log_slow_query_watermark)
140
      && !THIS_WORKER.need_retry();
141
    if (is_slow) {
142
      if (THIS_WORKER.need_retry() && OB_TRY_LOCK_ROW_CONFLICT == error_code) {
143
        //如果是锁冲突,且接下来会重试,则不需要打印这条日志了
144
      } else {
145
        FORCE_PRINT_TRACE(THE_TRACE, "[slow query]");
146

147
        // slow query will flush cache
148
        FLUSH_TRACE();
149
      }
150
    } else if (can_force_print(error_code)) {
151
      // 需要打印TRACE日志的错误码添加在这里
152
      int process_ret = error_code;
153
      NG_TRACE_EXT(process_ret, OB_Y(process_ret));
154
      FORCE_PRINT_TRACE(THE_TRACE, "[err query]");
155
    } else if (THIS_WORKER.need_retry()) {
156
      if (OB_TRY_LOCK_ROW_CONFLICT != error_code) {
157
        FORCE_PRINT_TRACE(THE_TRACE, "[packet retry query]");
158
      }
159
    } else {
160
      PRINT_TRACE(THE_TRACE);
161
    }
162

163
    if (common::OB_SUCCESS != error_code) {
164
      FLUSH_TRACE();
165
    }
166
  }
167
  ObFLTUtils::clean_flt_env();
168
  return ret;
169
}
170

171
void ObMPBase::cleanup()
172
{
173
  ObActiveSessionGuard::setup_default_ash();
174
}
175

176
void ObMPBase::disconnect()
177
{
178
  return packet_sender_.disconnect();
179
}
180

181
void ObMPBase::force_disconnect()
182
{
183
  return packet_sender_.force_disconnect();
184
}
185

186
int ObMPBase::clean_buffer()
187
{
188
  return packet_sender_.clean_buffer();
189
}
190

191
int ObMPBase::flush_buffer(const bool is_last)
192
{
193
  return packet_sender_.is_disable_response()? OB_SUCCESS: packet_sender_.flush_buffer(is_last);
194
}
195

196
ObSMConnection* ObMPBase::get_conn() const
197
{
198
  return packet_sender_.get_conn();
199
}
200

201
int ObMPBase::get_conn_id(uint32_t &sessid) const
202
{
203
  return packet_sender_.get_conn_id(sessid);
204
}
205

206
int ObMPBase::read_packet(obmysql::ObICSMemPool& mem_pool, obmysql::ObMySQLPacket *&pkt)
207
{
208
  return packet_sender_.read_packet(mem_pool, pkt);
209
}
210

211
int ObMPBase::release_packet(obmysql::ObMySQLPacket* pkt)
212
{
213
  return packet_sender_.release_packet(pkt);
214
 }
215
int ObMPBase::send_error_packet(int err,
216
                                const char* errmsg,
217
                                bool is_partition_hit /* = true */,
218
                                void *extra_err_info /* = NULL */)
219
{
220
  return packet_sender_.send_error_packet(err, errmsg, is_partition_hit, extra_err_info);
221
}
222

223
int ObMPBase::send_switch_packet(ObString &auth_name, ObString& auth_data)
224
{
225
  int ret = OB_SUCCESS;
226
  OMPKChangeUser packet;
227
  packet.set_auth_plugin_name(auth_name);
228
  packet.set_auth_response(auth_data);
229
  if (OB_FAIL(response_packet(packet, NULL))) {
230
    LOG_WARN("failed to send switch packet", K(packet), K(ret));
231
  }
232
  return ret;
233
}
234

235
int ObMPBase::load_system_variables(const ObSysVariableSchema &sys_variable_schema, ObSQLSessionInfo &session) const
236
{
237
  int ret = OB_SUCCESS;
238
  ObArenaAllocator calc_buf(ObModIds::OB_SQL_SESSION);
239
  for (int64_t i = 0; OB_SUCC(ret) && i < sys_variable_schema.get_sysvar_count(); ++i) {
240
    const ObSysVarSchema *sysvar = NULL;
241
    sysvar = sys_variable_schema.get_sysvar_schema(i);
242
    if (sysvar != NULL) {
243
      LOG_DEBUG("load system variable", K(*sysvar));
244
      if (OB_FAIL(session.load_sys_variable(calc_buf, sysvar->get_name(), sysvar->get_data_type(),
245
                                            sysvar->get_value(), sysvar->get_min_val(),
246
                                            sysvar->get_max_val(), sysvar->get_flags(), true))) {
247
        LOG_WARN("load sys variable failed", K(ret), K(*sysvar));
248
      }
249
    }
250
  }
251
  if (OB_SUCC(ret)) {
252
    //设置系统变量的最大版本号
253
    session.set_global_vars_version(sys_variable_schema.get_schema_version());
254
    //将影响plan的系统变量序列化并缓存
255
    if (OB_FAIL(session.gen_sys_var_in_pc_str())) {
256
      LOG_WARN("fail to gen sys var in pc str", K(ret));
257
    } else if (OB_FAIL(session.gen_configs_in_pc_str())) {
258
      LOG_WARN("fail to gen configs in pc string", K(ret));
259
    }
260
  }
261
  return ret;
262
}
263

264
int ObMPBase::send_ok_packet(ObSQLSessionInfo &session, ObOKPParam &ok_param, obmysql::ObMySQLPacket* pkt)
265
{
266
  return packet_sender_.send_ok_packet(session, ok_param, pkt);
267
}
268

269
int ObMPBase::send_eof_packet(const ObSQLSessionInfo &session, const ObMySQLResultSet &result, ObOKPParam *ok_param)
270
{
271
  return packet_sender_.send_eof_packet(session, result, ok_param);
272
}
273

274
int ObMPBase::create_session(ObSMConnection *conn, ObSQLSessionInfo *&sess_info)
275
{
276
  int ret = OB_SUCCESS;
277
  if (OB_ISNULL(conn)) {
278
    ret = OB_ERR_UNEXPECTED;
279
    LOG_ERROR("get connection fail", K(ret));
280
  } else if (OB_ISNULL(gctx_.session_mgr_)) {
281
    ret = OB_ERR_UNEXPECTED;
282
    LOG_ERROR("session manager is null", K(ret));
283
  } else {
284
    if (OB_FAIL(gctx_.session_mgr_->create_session(conn, sess_info))) {
285
      LOG_WARN("create session fail", "sessid", conn->sessid_,
286
                "proxy_sessid", conn->proxy_sessid_, K(ret));
287
    } else {
288
      LOG_DEBUG("create session successfully", "sessid", conn->sessid_,
289
               "proxy_sessid", conn->proxy_sessid_);
290
      conn->is_sess_alloc_ = true;
291
      sess_info->set_user_session();
292
      sess_info->set_shadow(false);
293
      if (SQL_REQ_OP.get_sql_ssl_st(req_) != NULL) {
294
        sess_info->set_ssl_cipher(SSL_get_cipher_name((SSL*)SQL_REQ_OP.get_sql_ssl_st(req_)));
295
      } else {
296
        sess_info->set_ssl_cipher("");
297
      }
298
      sess_info->set_client_sessid(conn->client_sessid_);
299
      sess_info->gen_gtt_session_scope_unique_id();
300
      sess_info->gen_gtt_trans_scope_unique_id();
301
    }
302
  }
303
  return ret;
304
}
305

306
int ObMPBase::free_session()
307
{
308
  int ret = OB_SUCCESS;
309
  ObSMConnection* conn = NULL;
310
  if (NULL == (conn = packet_sender_.get_conn())) {
311
    ret = OB_CONNECT_ERROR;
312
    LOG_WARN("connection already disconnected", K(ret));
313
  } else if (OB_ISNULL(gctx_.session_mgr_)) {
314
    ret = OB_ERR_UNEXPECTED;
315
    LOG_ERROR("session manager is null", K(ret));
316
  } else {
317
    bool is_need_clear = false;
318
    ObFreeSessionCtx ctx;
319
    ctx.tenant_id_ = conn->tenant_id_;
320
    ctx.sessid_ = conn->sessid_;
321
    ctx.proxy_sessid_ = conn->proxy_sessid_;
322
    ctx.has_inc_active_num_ = conn->has_inc_active_num_;
323
    if (OB_FAIL(gctx_.session_mgr_->free_session(ctx))) {
324
      LOG_WARN("fail to free session", K(ctx), K(ret));
325
    } else {
326
      LOG_INFO("free session successfully", K(ctx));
327
      conn->is_sess_free_ = true;
328
      if (OB_UNLIKELY(OB_FAIL(sql::ObSQLSessionMgr::is_need_clear_sessid(conn, is_need_clear)))) {
329
        LOG_ERROR("fail to judge need clear", K(ret), "sessid", conn->sessid_, "server_id", GCTX.server_id_);
330
      } else if (is_need_clear) {
331
        if (OB_FAIL(GCTX.session_mgr_->mark_sessid_unused(conn->sessid_))) {
332
          LOG_WARN("mark session id unused failed", K(ret), "sessid", conn->sessid_);
333
        } else {
334
          LOG_INFO("mark session id unused", "sessid", conn->sessid_);
335
        }
336
      }
337
    }
338
  }
339
  return ret;
340
}
341

342
int ObMPBase::get_session(ObSQLSessionInfo *&sess_info)
343
{
344
  return packet_sender_.get_session(sess_info);
345
}
346

347
int ObMPBase::revert_session(ObSQLSessionInfo *sess_info)
348
{
349
  return packet_sender_.revert_session(sess_info);
350
}
351

352
int ObMPBase::init_process_var(sql::ObSqlCtx &ctx,
353
                               const ObMultiStmtItem &multi_stmt_item,
354
                               sql::ObSQLSessionInfo &session) const
355
{
356
  int ret = OB_SUCCESS;
357
  bool enable_udr = false;
358
  if (!packet_sender_.is_conn_valid()) {
359
    ret = OB_CONNECT_ERROR;
360
    LOG_WARN("connection already disconnected", K(ret));
361
  } else {
362
    const int64_t debug_sync_timeout = GCONF.debug_sync_timeout;
363
    // ignore session debug sync action actions to thread local actions error
364
    if (debug_sync_timeout > 0) {
365
      int tmp_ret = GDS.set_thread_local_actions(session.get_debug_sync_actions());
366
      if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
367
        LOG_WARN("set session debug sync actions to thread local actions failed", K(tmp_ret));
368
      }
369
    }
370
    // construct sql context
371
    ctx.multi_stmt_item_ = multi_stmt_item;
372
    ctx.session_info_ = &session;
373
    session.set_rpc_tenant_id(THIS_WORKER.get_rpc_tenant());
374
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
375

376
    if (0 == multi_stmt_item.get_seq_num()) {
377
      // 第一条sql
378
      ctx.can_reroute_sql_ = (pkt.can_reroute_pkt() && get_conn()->is_support_proxy_reroute());
379
    }
380
    ctx.is_protocol_weak_read_ = pkt.is_weak_read();
381
    ctx.set_enable_strict_defensive_check(GCONF.enable_strict_defensive_check());
382
    omt::ObTenantConfigGuard tenant_config(TENANT_CONF(session.get_effective_tenant_id()));
383
    if (tenant_config.is_valid()) {
384
      enable_udr = tenant_config->enable_user_defined_rewrite_rules;
385
    }
386
    ctx.set_enable_user_defined_rewrite(enable_udr);
387
    LOG_TRACE("protocol flag info", K(ctx.can_reroute_sql_), K(ctx.is_protocol_weak_read_),
388
        K(ctx.get_enable_strict_defensive_check()), K(enable_udr));
389
  }
390
  return ret;
391
}
392

393
//外层调用会忽略do_after_process的错误码,因此这里将set_session_state的错误码返回也没有意义。
394
//因此这里忽略set_session_state错误码,warning buffer的reset和trace log 记录的流程不收影响。
395
int ObMPBase::do_after_process(sql::ObSQLSessionInfo &session,
396
                               sql::ObSqlCtx &ctx,
397
                               bool async_resp_used) const
398
{
399
  int ret = OB_SUCCESS;
400

401
  // reset warning buffers
402
  // 注意,此处req_has_wokenup_可能为true,不能再访问req对象
403
  // @todo 重构wb逻辑
404
  if (!async_resp_used) { // 异步回包不重置warning buffer,重置操作在callback中做
405
    session.reset_warnings_buf();
406
    session.set_session_sleep();
407
  }
408
  // clear tsi warning buffer
409
  ob_setup_tsi_warning_buffer(NULL);
410
  return ret;
411
}
412

413
int ObMPBase::record_flt_trace(sql::ObSQLSessionInfo &session) const
414
{
415
  int ret = OB_SUCCESS;
416
  //trace end
417
  if (lib::is_diagnose_info_enabled()) {
418
    NG_TRACE(query_end);
419

420
    if (session.is_use_trace_log()) {
421
      //不影响正常逻辑
422
      // show trace will always show last request info
423
      if (OB_FAIL(ObFLTUtils::clean_flt_show_trace_env(session))) {
424
        LOG_WARN("failed to clean flt show trace env", K(ret));
425
      }
426
    } else {
427
      // not need to record
428
      ObString trace_id;
429
      trace_id.reset();
430
      if (OB_FAIL(session.set_last_flt_trace_id(trace_id))) {
431
        LOG_WARN("failed to reset last flt trace id", K(ret));
432
      }
433
    }
434
  }
435
  return ret;
436
}
437

438
int ObMPBase::setup_user_resource_group(
439
    ObSMConnection &conn,
440
    const uint64_t tenant_id,
441
    sql::ObSQLSessionInfo *session)
442
{
443
  int ret = OB_SUCCESS;
444
  uint64_t group_id = 0;
445
  uint64_t user_id = session->get_user_id();
446
  if (OB_INVALID_ID != session->get_expect_group_id()) {
447
    // Session->expected_group_id_ is set when hit plan cache or resolve a query, and find that
448
    // expcted group is consistent with current group.
449
    // Set group_id of req_ so that the req_ will be put in the corresponding queue when do packet retry.
450
    if (NULL != req_) {
451
      req_->set_group_id(session->get_expect_group_id());
452
    }
453
    // also set conn.group_id_. It means use current consumer group when execute next query for first time.
454
    conn.group_id_ = session->get_expect_group_id();
455
    // reset to invalid because session.expected_group_id is single_use.
456
    session->set_expect_group_id(OB_INVALID_ID);
457
  } else if (!is_valid_tenant_id(tenant_id)) {
458
    ret = OB_INVALID_ARGUMENT;
459
    LOG_WARN("Invalid tenant", K(tenant_id), K(ret));
460
  } else if (conn.group_id_ == OBCG_DIAG_TENANT) {
461
    // OBCG_DIAG_TENANT was set in check_update_tenant_id, DO NOT overlap it.
462
  } else if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_user(
463
              tenant_id, user_id, group_id))) {
464
    LOG_WARN("fail get group id by user", K(user_id), K(tenant_id), K(ret));
465
  } else {
466
    // 将 group id 设置到调度层,之后这个 session 上的所有请求都是用这个 cgroup 的资源
467
    conn.group_id_ = group_id;
468
  }
469
  LOG_DEBUG("setup user resource group", K(user_id), K(tenant_id), K(ret));
470
  return ret;
471
}
472

473
// force refresh schema if local schema version < last schema version
474
int ObMPBase::check_and_refresh_schema(uint64_t login_tenant_id,
475
                                       uint64_t effective_tenant_id,
476
                                       ObSQLSessionInfo *session_info)
477
{
478
  int ret = OB_SUCCESS;
479
  int64_t local_version = 0;
480
  int64_t last_version = 0;
481

482
  if (login_tenant_id != effective_tenant_id) {
483
    // do nothing
484
    //
485
  } else if (OB_ISNULL(gctx_.schema_service_)) {
486
    ret = OB_INVALID_ARGUMENT;
487
    LOG_WARN("null schema service", K(ret), K(gctx_));
488
  } else {
489
    bool need_revert_session = false;
490
    if (NULL == session_info) {
491
      if (OB_FAIL(get_session(session_info))) {
492
        LOG_WARN("get session failed");
493
      } else if (OB_ISNULL(session_info)) {
494
        ret = OB_INVALID_ARGUMENT;
495
        LOG_WARN("invalid session info", K(ret), K(session_info));
496
      } else {
497
        need_revert_session = true;
498
      }
499
    }
500
    if (OB_SUCC(ret)) {
501
      if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(effective_tenant_id, local_version))) {
502
        LOG_WARN("fail to get tenant refreshed schema version", K(ret));
503
      } else if (OB_FAIL(session_info->get_ob_last_schema_version(last_version))) {
504
        LOG_WARN("failed to get_sys_variable", K(OB_SV_LAST_SCHEMA_VERSION));
505
      } else if (local_version >= last_version) {
506
        // skip
507
      } else if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(effective_tenant_id, last_version))) {
508
        LOG_WARN("failed to refresh schema", K(ret), K(effective_tenant_id), K(last_version));
509
      }
510
      if (need_revert_session && OB_LIKELY(NULL != session_info)) {
511
        revert_session(session_info);
512
      }
513
    }
514
  }
515
  return ret;
516
}
517

518
int ObMPBase::response_row(ObSQLSessionInfo &session,
519
                           common::ObNewRow &row,
520
                           const ColumnsFieldIArray *fields,
521
                           bool is_packed)
522
{
523
  int ret = OB_SUCCESS;
524
  ObArenaAllocator allocator;
525
  ObNewRow tmp_row;
526
  bool has_charset_convert = false;
527
  if (OB_ISNULL(fields) || row.get_count() != fields->count()) {
528
    ret = OB_INVALID_ARGUMENT;
529
    LOG_WARN("fields is null", K(ret), KP(fields));
530
  } else if (OB_FAIL(ob_write_row(allocator, row, tmp_row))) {
531
    LOG_WARN("deep copy row fail.", K(ret));
532
  } else {
533
    for (int64_t i = 0; OB_SUCC(ret) && i < tmp_row.get_count(); ++i) {
534
      ObObj &value = tmp_row.get_cell(i);
535
      ObCharsetType charset_type = CHARSET_INVALID;
536
      ObCharsetType ncharset_type = CHARSET_INVALID;
537
      // need at ps mode
538
      if (!is_packed && value.get_type() != fields->at(i).type_.get_type()) {
539
        ObCastCtx cast_ctx(&allocator, NULL, CM_WARN_ON_FAIL, fields->at(i).type_.get_collation_type());
540
        if (ObDecimalIntType == fields->at(i).type_.get_type()) {
541
          cast_ctx.res_accuracy_ = const_cast<ObAccuracy*>(&fields->at(i).accuracy_);
542
        }
543
        if (OB_FAIL(common::ObObjCaster::to_type(fields->at(i).type_.get_type(),
544
                                          cast_ctx,
545
                                          value,
546
                                          value))) {
547
          LOG_WARN("failed to cast object", K(ret), K(value), K(i),
548
                    K(value.get_type()), K(fields->at(i).type_.get_type()));
549
        }
550
      }
551
      if (OB_FAIL(ret)) {
552
      } else if (is_packed) {
553
        // do nothing
554
      } else if (OB_FAIL(session.get_character_set_results(charset_type))) {
555
        LOG_WARN("fail to get result charset", K(ret));
556
      } else if (OB_FAIL(session.get_ncharacter_set_connection(ncharset_type))) {
557
        LOG_WARN("fail to get result charset", K(ret));
558
      } else {
559
        if (lib::is_oracle_mode()
560
            && (value.is_nchar() || value.is_nvarchar2())
561
            && ncharset_type != CHARSET_INVALID
562
            && ncharset_type != CHARSET_BINARY) {
563
          charset_type = ncharset_type;
564
        }
565
        if (ob_is_string_tc(value.get_type())
566
            && CS_TYPE_INVALID != value.get_collation_type()
567
            && OB_FAIL(value.convert_string_value_charset(charset_type, allocator))) {
568
          LOG_WARN("convert string value charset failed", K(ret), K(value));
569
        } else if (value.is_clob_locator()
570
                    && OB_FAIL(ObQueryDriver::convert_lob_value_charset(value, charset_type, allocator))) {
571
          LOG_WARN("convert lob value charset failed", K(ret));
572
        } else if (ob_is_text_tc(value.get_type())
573
                    && OB_FAIL(ObQueryDriver::convert_text_value_charset(value, charset_type, allocator, &session))) {
574
          LOG_WARN("convert text value charset failed", K(ret));
575
        }
576
        if (OB_FAIL(ret)) {
577
        } else if(OB_FAIL(ObQueryDriver::process_lob_locator_results(value,
578
                                    session.is_client_use_lob_locator(),
579
                                    session.is_client_support_lob_locatorv2(),
580
                                    &allocator,
581
                                    &session))) {
582
          LOG_WARN("convert lob locator to longtext failed", K(ret));
583
#ifdef OB_BUILD_ORACLE_XML
584
        } else if (value.is_user_defined_sql_type()
585
                   && OB_FAIL(ObXMLExprHelper::process_sql_udt_results(value,
586
                                    &allocator,
587
                                    &session))) {
588
          LOG_WARN("convert udt to client format failed", K(ret), K(value.get_udt_subschema_id()));
589
#endif
590
        }
591
      }
592
    }
593

594
    if (OB_SUCC(ret)) {
595
      const ObDataTypeCastParams dtc_params = ObBasicSessionInfo::create_dtc_params(&session);
596
      ObSMRow sm_row(obmysql::BINARY, tmp_row, dtc_params, fields);
597
      sm_row.set_packed(is_packed);
598
      obmysql::OMPKRow rp(sm_row);
599
      rp.set_is_packed(is_packed);
600
      if (OB_FAIL(response_packet(rp, &session))) {
601
        ret = OB_ERR_UNEXPECTED;
602
        LOG_WARN("response packet fail", K(ret));
603
      }
604
    }
605
  }
606
  return ret;
607
}
608

609
int ObMPBase::process_extra_info(sql::ObSQLSessionInfo &session,
610
              const obmysql::ObMySQLRawPacket &pkt, bool &need_response_error)
611
{
612
  int ret = OB_SUCCESS;
613
  SessionInfoVerifacation sess_info_verification;
614
  LOG_TRACE("process extra info", K(ret),K(pkt.get_extra_info().exist_sess_info_veri()));
615
  if (FALSE_IT(session.set_has_query_executed(true))) {
616
  } else if (pkt.get_extra_info().exist_sync_sess_info()
617
              && OB_FAIL(ObMPUtils::sync_session_info(session,
618
                          pkt.get_extra_info().get_sync_sess_info()))) {
619
    // won't response error, disconnect will let proxy sens failure
620
    need_response_error = false;
621
    LOG_WARN("fail to update sess info", K(ret));
622
  } else if (pkt.get_extra_info().exist_sess_info_veri()
623
              && OB_FAIL(ObSessInfoVerify::sync_sess_info_veri(session,
624
                        pkt.get_extra_info().get_sess_info_veri(),
625
                        sess_info_verification))) {
626
    LOG_WARN("fail to get verify info requied", K(ret));
627
  } else if (pkt.get_extra_info().exist_sess_info_veri() &&
628
              pkt.is_proxy_switch_route() &&
629
              OB_FAIL(ObSessInfoVerify::verify_session_info(session,
630
              sess_info_verification))) {
631
    LOG_WARN("fail to verify sess info", K(ret));
632
  }
633
  return ret;
634
}
635

636
// The obmp layer handles the kill client session logic.
637
int ObMPBase::process_kill_client_session(sql::ObSQLSessionInfo &session, bool is_connect)
638
{
639
  int ret = OB_SUCCESS;
640
  uint64_t create_time = 0;
641
  if (OB_ISNULL(gctx_.session_mgr_)) {
642
    ret = OB_ERR_UNEXPECTED;
643
    LOG_ERROR("invalid session mgr", K(ret), K(gctx_));
644
  } else if (OB_UNLIKELY(session.is_mark_killed())) {
645
    ret = OB_ERR_KILL_CLIENT_SESSION;
646
    LOG_WARN("client session need be killed", K(session.get_session_state()),
647
            K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(),
648
            K(session.get_client_sessid()), K(ret));
649
  } else if (is_connect) {
650
    if (OB_UNLIKELY(OB_HASH_NOT_EXIST != (gctx_.session_mgr_->get_kill_client_sess_map().
651
              get_refactored(session.get_client_sessid(), create_time)))) {
652
      if (session.get_client_create_time() == create_time) {
653
        ret = OB_ERR_KILL_CLIENT_SESSION;
654
        LOG_WARN("client session need be killed", K(session.get_session_state()),
655
                K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(),
656
                K(session.get_client_sessid()), K(ret),K(create_time));
657
      } else {
658
        LOG_DEBUG("client session is created later", K(create_time),
659
                K(session.get_client_create_time()),
660
                K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(),
661
                K(session.get_client_sessid()));
662
      }
663
    }
664
  } else {
665
  }
666
  return ret;
667
}
668

669
int ObMPBase::update_charset_sys_vars(ObSMConnection &conn, ObSQLSessionInfo &sess_info)
670
{
671
  int ret = OB_SUCCESS;
672
  int64_t cs_type = conn.client_cs_type_;
673
  const int64_t LATIN1_CS = 8;
674
  //background: mysqltest give a default connect_charset=latin1
675
  //            but for history reason, oceanbase use utf8 as
676
  //            default charset for mysqltest
677
  //TODO: after obclient&mysqltest support default charset = utf8
678
  //      login for cs_type != LATIN1_CS would be deleted
679
  if (ObCharset::is_valid_collation(cs_type)) {
680
    if (OB_FAIL(sess_info.update_sys_variable(SYS_VAR_CHARACTER_SET_CLIENT, cs_type))) {
681
      SQL_ENG_LOG(WARN, "failed to update sys var", K(ret));
682
    } else if (OB_FAIL(sess_info.update_sys_variable(SYS_VAR_CHARACTER_SET_RESULTS, cs_type))) {
683
      SQL_ENG_LOG(WARN, "failed to update sys var", K(ret));
684
    } else if (OB_FAIL(sess_info.update_sys_variable(SYS_VAR_CHARACTER_SET_CONNECTION, cs_type))) {
685
      SQL_ENG_LOG(WARN, "failed to update sys var", K(ret));
686
    } else if (OB_FAIL(sess_info.update_sys_variable(SYS_VAR_COLLATION_CONNECTION, cs_type))) {
687
      SQL_ENG_LOG(WARN, "failed to update sys var", K(ret));
688
    }
689
  }
690
  return ret;
691
}
692

693
} // namespace observer
694
} // namespace oceanbase
695

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.