oceanbase

Форк
0
/
obmp_query.cpp 
1396 строк · 62.9 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "observer/mysql/obmp_query.h"
16

17
#include "lib/utility/ob_macro_utils.h"
18
#include "lib/utility/ob_tracepoint.h"
19
#include "lib/worker.h"
20
#include "lib/stat/ob_session_stat.h"
21
#include "lib/profile/ob_perf_event.h"
22
#include "share/ob_debug_sync.h"
23
#include "share/config/ob_server_config.h"
24
#include "share/schema/ob_multi_version_schema_service.h"
25
#include "share/schema/ob_schema_getter_guard.h"
26
#include "share/client_feedback/ob_feedback_partition_struct.h"
27
#include "share/ob_resource_limit.h"
28
#include "rpc/ob_request.h"
29
#include "rpc/obmysql/ob_mysql_packet.h"
30
#include "rpc/obmysql/ob_mysql_request_utils.h"
31
#include "rpc/obmysql/packet/ompk_ok.h"
32
#include "rpc/obmysql/packet/ompk_error.h"
33
#include "rpc/obmysql/packet/ompk_resheader.h"
34
#include "rpc/obmysql/packet/ompk_field.h"
35
#include "rpc/obmysql/packet/ompk_eof.h"
36
#include "rpc/obmysql/packet/ompk_row.h"
37
#include "sql/ob_sql_context.h"
38
#include "sql/ob_sql.h"
39
#include "sql/ob_sql_trans_util.h"
40
#include "sql/session/ob_sql_session_mgr.h"
41
#include "sql/resolver/cmd/ob_variable_set_stmt.h"
42
#include "sql/engine/px/ob_px_admission.h"
43
#include "observer/mysql/ob_mysql_result_set.h"
44
#include "rpc/obmysql/obsm_struct.h"
45
#include "observer/mysql/ob_sync_plan_driver.h"
46
#include "observer/mysql/ob_sync_cmd_driver.h"
47
#include "observer/mysql/ob_async_cmd_driver.h"
48
#include "observer/mysql/ob_async_plan_driver.h"
49
#include "observer/ob_req_time_service.h"
50
#include "observer/omt/ob_tenant.h"
51
#include "observer/ob_server.h"
52
#include "observer/virtual_table/ob_virtual_table_iterator_factory.h"
53
#include "sql/monitor/ob_phy_plan_monitor_info.h"
54
#include "sql/monitor/ob_security_audit.h"
55
#include "lib/rc/context.h"
56
#include "sql/monitor/ob_security_audit_utils.h"
57
#include "observer/mysql/obmp_utils.h"
58
#include "lib/ash/ob_active_session_guard.h"
59
#include "lib/trace/ob_trace.h"
60

61
using namespace oceanbase::rpc;
62
using namespace oceanbase::obmysql;
63
using namespace oceanbase::common;
64
using namespace oceanbase::observer;
65
using namespace oceanbase::share;
66
using namespace oceanbase::share::schema;
67
using namespace oceanbase::trace;
68
using namespace oceanbase::sql;
69
ObMPQuery::ObMPQuery(const ObGlobalContext &gctx)
70
    : ObMPBase(gctx),
71
      single_process_timestamp_(0),
72
      exec_start_timestamp_(0),
73
      exec_end_timestamp_(0),
74
      is_com_filed_list_(false),
75
      params_value_len_(0),
76
      params_value_(NULL)
77
{
78
  ctx_.exec_type_ = MpQuery;
79
}
80

81

82
ObMPQuery::~ObMPQuery()
83
{
84
}
85

86

87
int ObMPQuery::process()
88
{
89
  int ret = OB_SUCCESS;
90
  int tmp_ret = OB_SUCCESS;
91
  ObSQLSessionInfo *sess = NULL;
92
  uint32_t sessid = 0;
93
  bool need_response_error = true;
94
  bool need_disconnect = true;
95
  bool async_resp_used = false; // 由事务提交线程异步回复客户端
96
  int64_t query_timeout = 0;
97
  ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id();
98
  ObSMConnection *conn = get_conn();
99
  static int64_t concurrent_count = 0;
100
  bool need_dec = false;
101
  bool do_ins_batch_opt = false;
102
  if (RL_IS_ENABLED) {
103
    if (ATOMIC_FAA(&concurrent_count, 1) > RL_CONF.get_max_concurrent_query_count()) {
104
      ret = OB_RESOURCE_OUT;
105
      LOG_WARN("reach max concurrent limit", K(ret), K(concurrent_count),
106
          K(RL_CONF.get_max_concurrent_query_count()));
107
    }
108
    need_dec = true;
109
  }
110
  DEFER(if (need_dec) (void)ATOMIC_FAA(&concurrent_count, -1));
111
  if (OB_FAIL(ret)) {
112
    // do-nothing
113
  } else if (OB_ISNULL(req_) || OB_ISNULL(conn) || OB_ISNULL(cur_trace_id)) {
114
    ret = OB_ERR_UNEXPECTED;
115
    LOG_WARN("null conn ptr", K_(sql), K_(req), K(conn), K(cur_trace_id), K(ret));
116
  } else if (OB_UNLIKELY(!conn->is_in_authed_phase())) {
117
    ret = OB_ERR_NO_PRIVILEGE;
118
    LOG_WARN("receive sql without session", K_(sql), K(ret));
119
  } else if (OB_ISNULL(conn->tenant_)) {
120
    ret = OB_ERR_UNEXPECTED;
121
    LOG_ERROR("invalid tenant", K_(sql), K(conn->tenant_), K(ret));
122
  } else if (OB_FAIL(get_session(sess))) {
123
    LOG_WARN("get session fail", K_(sql), K(ret));
124
  } else if (OB_ISNULL(sess)) {
125
    ret = OB_ERR_UNEXPECTED;
126
    LOG_WARN("session is NULL or invalid", K_(sql), K(sess), K(ret));
127
  } else {
128
    lib::CompatModeGuard g(sess->get_compatibility_mode() == ORACLE_MODE ?
129
                             lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL);
130
    THIS_WORKER.set_session(sess);
131
    ObSQLSessionInfo &session = *sess;
132
    ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
133
    session.set_current_trace_id(ObCurTraceId::get_trace_id());
134
    session.init_use_rich_format();
135
    int64_t val = 0;
136
    const bool check_throttle = !is_root_user(sess->get_user_id());
137

138
    if (check_throttle &&
139
        !sess->is_inner() &&
140
        sess->get_raw_audit_record().try_cnt_ == 0 &&
141
        lib::Worker::WS_OUT_OF_THROTTLE == THIS_THWORKER.check_rate_limiter()) {
142
      ret = OB_KILLED_BY_THROTTLING;
143
      LOG_WARN("query is throttled", K(ret), K(sess->get_user_id()));
144
      need_disconnect = false;
145
    } else if (OB_SUCC(sess->get_sql_throttle_current_priority(val))) {
146
      THIS_WORKER.set_sql_throttle_current_priority(check_throttle ? val : -1);
147
      if (lib::Worker::WS_OUT_OF_THROTTLE == THIS_THWORKER.check_qtime_throttle()) {
148
        ret = OB_KILLED_BY_THROTTLING;
149
        LOG_WARN("query is throttled", K(ret));
150
        need_disconnect = false;
151
      }
152
    } else {
153
      LOG_WARN("get system variable sql_throttle_current_priority fail", K(ret));
154
      // reset ret for compatibility.
155
      ret = OB_SUCCESS;
156
    }
157
    if (OB_SUCC(ret)) {
158
      sessid = conn->sessid_;
159
      int64_t tenant_version = 0;
160
      int64_t sys_version = 0;
161
      session.set_thread_id(GETTID());
162
      const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
163
      int64_t packet_len = pkt.get_clen();
164
      req_->set_trace_point(ObRequest::OB_EASY_REQUEST_MPQUERY_PROCESS);
165
      if (OB_UNLIKELY(!session.is_valid())) {
166
        ret = OB_ERR_UNEXPECTED;
167
        LOG_ERROR("invalid session", K_(sql), K(ret));
168
      } else if (OB_FAIL(process_kill_client_session(session))) {
169
        LOG_WARN("client session has been killed", K(ret));
170
      } else if (OB_UNLIKELY(session.is_zombie())) {
171
        //session has been killed some moment ago
172
        ret = OB_ERR_SESSION_INTERRUPTED;
173
        LOG_WARN("session has been killed", K(session.get_session_state()), K_(sql),
174
                 K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(), K(ret));
175
      } else if (OB_FAIL(session.check_and_init_retry_info(*cur_trace_id, sql_))) {
176
        // 注意,retry info和last query trace id的逻辑要写在query lock内,否则会有并发问题
177
        LOG_WARN("fail to check and init retry info", K(ret), K(*cur_trace_id), K_(sql));
178
      } else if (OB_FAIL(session.get_query_timeout(query_timeout))) {
179
        LOG_WARN("fail to get query timeout", K_(sql), K(ret));
180
      } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
181
                  session.get_effective_tenant_id(), tenant_version))) {
182
        LOG_WARN("fail get tenant broadcast version", K(ret));
183
      } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
184
                  OB_SYS_TENANT_ID, sys_version))) {
185
        LOG_WARN("fail get tenant broadcast version", K(ret));
186
      } else if (pkt.exist_trace_info()
187
                 && OB_FAIL(session.update_sys_variable(SYS_VAR_OB_TRACE_INFO,
188
                                                        pkt.get_trace_info()))) {
189
        LOG_WARN("fail to update trace info", K(ret));
190
      } else if (FALSE_IT(session.set_txn_free_route(pkt.txn_free_route()))) {
191
      } else if (OB_FAIL(process_extra_info(session, pkt, need_response_error))) {
192
        LOG_WARN("fail get process extra info", K(ret));
193
      } else if (FALSE_IT(session.post_sync_session_info())) {
194
      } else if (OB_UNLIKELY(packet_len > session.get_max_packet_size())) {
195
        //packet size check with session variable max_allowd_packet or net_buffer_length
196
        need_disconnect = false;
197
        ret = OB_ERR_NET_PACKET_TOO_LARGE;
198
        LOG_WARN("packet too large than allowed for the session", K_(sql), K(ret));
199
      } else if (OB_FAIL(sql::ObFLTUtils::init_flt_info(pkt.get_extra_info(), session,
200
                              conn->proxy_cap_flags_.is_full_link_trace_support()))) {
201
        LOG_WARN("failed to update flt extra info", K(ret));
202
      } else if (OB_FAIL(session.gen_configs_in_pc_str())) {
203
        LOG_WARN("fail to generate configuration strings that can influence execution plan", K(ret));
204
      } else {
205
        FLTSpanGuard(com_query_process);
206
        FLT_SET_TAG(log_trace_id, ObCurTraceId::get_trace_id_str(),
207
                    receive_ts, get_receive_timestamp(),
208
                    client_info, session.get_client_info(),
209
                    module_name, session.get_module_name(),
210
                    action_name, session.get_action_name(),
211
                    sess_id, session.get_sessid());
212

213
        THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
214
        retry_ctrl_.set_tenant_global_schema_version(tenant_version);
215
        retry_ctrl_.set_sys_global_schema_version(sys_version);
216
        session.partition_hit().reset();
217
        session.set_pl_can_retry(true);
218
        ObLockWaitNode &lock_wait_node  = req_->get_lock_wait_node();
219
        lock_wait_node.set_session_info(session.get_sessid());
220

221
        bool has_more = false;
222
        bool force_sync_resp = false;
223
        need_response_error = false;
224
        ObParser parser(THIS_WORKER.get_sql_arena_allocator(),
225
                        session.get_sql_mode(), session.get_charsets4parser());
226
        //为了性能优化考虑,减少数组长度,降低无用元素的构造和析构开销
227
        ObSEArray<ObString, 1> queries;
228
        ObSEArray<ObString, 1> ins_queries;
229
        ObMPParseStat parse_stat;
230
        if (GCONF.enable_record_trace_id) {
231
          PreParseResult pre_parse_result;
232
          if (OB_FAIL(ObParser::pre_parse(sql_, pre_parse_result))) {
233
            LOG_WARN("fail to pre parse", K(ret));
234
          } else {
235
            session.set_app_trace_id(pre_parse_result.trace_id_);
236
            LOG_DEBUG("app trace id", "app_trace_id", pre_parse_result.trace_id_,
237
                                      "sessid", session.get_sessid(), K_(sql));
238
          }
239
        }
240

241
        if (OB_FAIL(ret)) {
242
          //do nothing
243
        } else if (OB_FAIL(parser.split_multiple_stmt(sql_, queries, parse_stat))) {
244
          // 进入本分支,说明push_back出错,OOM,委托外层代码返回错误码
245
          // 且进入此分支之后,要断连接
246
          need_response_error = true;
247
        } else if (OB_UNLIKELY(queries.count() <= 0)) {
248
          ret = OB_ERR_UNEXPECTED;
249
          need_response_error = true;//进入此分支之后,要断连接,极其严重错误
250
          LOG_ERROR("emtpy query count. client would have suspended. never be here!", K_(sql), K(ret));
251
        } else if (OB_UNLIKELY(1 == session.get_capability().cap_flags_.OB_CLIENT_MULTI_STATEMENTS)) {
252
          // 处理Multiple Statement
253
          /* MySQL处理Multi-Stmt出错时候的行为:
254
          * 遇到首次运行失败(包括解析或执行)的SQL后,停止读取后继数据
255
            *  例如:
256
            *  (1) select 1; selct 2; select 3;
257
            *  select 1执行成功,selct 2报语法错误,select 3不被执行
258
            *  (2) select 1; drop table not_exists_table; select 3;
259
            *  select 1执行成功,drop table not_exists_table报表不存在错误,select 3不被执行
260
            *
261
            * 特别注意:
262
            * split_multiple_stmt是根据分号来分割语句,但有可能遇到“语法错误”,
263
            * 这里说的“语法错误”不是说select写成了selct,而是“token”级别的语法错误,例如语句
264
            * select 1;`select 2; select 3;
265
            * 上面`和'都没有形成闭合的字符串token,token parser会报告语法错误
266
            * 上面的例子中,得到的queries.count() 等于 2,分别为select 1和 `select 2; select 3;
267
          */
268
          bool optimization_done = false;
269
          const char *p_normal_start = nullptr;
270
          if (queries.count() > 1 && session.is_txn_free_route_temp()) {
271
            need_disconnect = false;
272
            need_response_error = true;
273
            ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED;
274
            LOG_WARN("multi stmt is not supported to be executed on txn temporary node", KR(ret),
275
                     "tx_free_route_ctx", session.get_txn_free_route_ctx(),
276
                     "trans_id", session.get_tx_id(), K(session));
277
          } else if (queries.count() > 1
278
            && OB_FAIL(try_batched_multi_stmt_optimization(session,
279
                                                          queries,
280
                                                          parse_stat,
281
                                                          optimization_done,
282
                                                          async_resp_used,
283
                                                          need_disconnect,
284
                                                          false))) {
285
            LOG_WARN("failed to try multi-stmt-optimization", K(ret));
286
          } else if (!optimization_done
287
                     && session.is_enable_batched_multi_statement()
288
                     && ObSQLUtils::is_enable_explain_batched_multi_statement()
289
                     && ObParser::is_explain_stmt(queries.at(0), p_normal_start)) {
290
            ret = OB_SUCC(ret) ? OB_NOT_SUPPORTED : ret;
291
            need_disconnect = false;
292
            need_response_error = true;
293
            LOG_WARN("explain batch statement failed", K(ret));
294
          } else if (!optimization_done) {
295
            ARRAY_FOREACH(queries, i) {
296
              // in multistmt sql, audit_record will record multistmt_start_ts_ when count over 1
297
              // queries.count()>1 -> batch,(m)sql1,(m)sql2,...    |    queries.count()=1 -> sql1
298
              if (i > 0) {
299
                session.get_raw_audit_record().exec_timestamp_.multistmt_start_ts_
300
                                                              = ObTimeUtility::current_time();
301
                // before handle multi-stmt's followers, re-calc the txn_free_route's baseline
302
                // in order to capture accurate state changed by current stmt
303
                session.prep_txn_free_route_baseline();
304
              }
305
              need_disconnect = true;
306
              //FIXME qianfu NG_TRACE_EXT(set_disconnect, OB_ID(disconnect), true, OB_ID(pos), "multi stmt begin");
307
              if (OB_UNLIKELY(parse_stat.parse_fail_
308
                  && (i == parse_stat.fail_query_idx_)
309
                  && ObSQLUtils::check_need_disconnect_parser_err(parse_stat.fail_ret_))) {
310
                // 进入本分支,说明在multi_query中的某条query parse失败,如果不是语法错,则进入该分支
311
                // 如果当前query_count 为1, 则不断连接;如果大于1,
312
                // 则需要在发错误包之后断连接,防止客户端一直在等接下来的回包
313
                // 这个改动是为了解决
314
                ret = parse_stat.fail_ret_;
315
                need_response_error = true;
316
                break;
317
              } else {
318
                has_more = (queries.count() > i + 1);
319
                // 本来可以做成不管queries.count()是多少,最后一个query都可以异步回包的,
320
                // 但是目前的代码实现难以在不同的线程处理同一个请求的回包,
321
                // 因此这里只允许只有一个query的multi query请求异步回包。
322
                force_sync_resp = queries.count() <= 1? false : true;
323
                // is_part_of_multi 表示当前sql是 multi stmt 中的一条,
324
                // 原来的值默认为true,会影响单条sql的二次路由,现在改为用 queries.count() 判断。
325
                bool is_part_of_multi = queries.count() > 1 ? true : false;
326
                ret = process_single_stmt(ObMultiStmtItem(is_part_of_multi, i, queries.at(i)),
327
                                          session,
328
                                          has_more,
329
                                          force_sync_resp,
330
                                          async_resp_used,
331
                                          need_disconnect);
332
              }
333
            }
334
          }
335
          // 以multiple query协议发过来的语句总数
336
          EVENT_INC(SQL_MULTI_QUERY_COUNT);
337
          // 以multiple query协议发过来,但实际只包含一条SQL的语句的个数
338
          if (queries.count() <= 1) {
339
            EVENT_INC(SQL_MULTI_ONE_QUERY_COUNT);
340
          }
341
        } else { // OB_CLIENT_MULTI_STATEMENTS not enabled
342
          if (OB_UNLIKELY(queries.count() != 1)) {
343
            ret = OB_ERR_PARSER_SYNTAX;
344
            need_disconnect = false;
345
            need_response_error = true;
346
            LOG_WARN("unexpected error. multi stmts sql while OB_CLIENT_MULTI_STATEMENTS not enabled.", K(ret), K(sql_));
347
          } else {
348
            EVENT_INC(SQL_SINGLE_QUERY_COUNT);
349
            // 处理普通的Single Statement
350
            ret = process_single_stmt(ObMultiStmtItem(false, 0, sql_),
351
                                      session,
352
                                      has_more,
353
                                      force_sync_resp,
354
                                      async_resp_used,
355
                                      need_disconnect);
356
          }
357
        }
358
        if (OB_FAIL(ret)) {
359
          FLT_SET_TAG(err_code, ret);
360
        }
361
      }
362
    }
363
    // THIS_WORKER.need_retry()是指是否扔回队列重试,包括大查询被扔回队列的情况。
364
    session.check_and_reset_retry_info(*cur_trace_id, THIS_WORKER.need_retry());
365
    session.set_last_trace_id(ObCurTraceId::get_trace_id());
366
    IGNORE_RETURN record_flt_trace(session);
367
  }
368

369
  if (OB_UNLIKELY(NULL != GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && is_conn_valid()) {
370
    int tmp_ret = OB_SUCCESS;
371
    // Call setup_user_resource_group no matter OB_SUCC or OB_FAIL
372
    // because we have to reset conn.group_id_ according to user_name.
373
    // Otherwise, suppose we execute a query with a mapping rule on the column in the query at first,
374
    // we switch to the defined consumer group, batch_group for example,
375
    // and after that, the next query will also be executed with batch_group.
376
    if (OB_UNLIKELY(OB_SUCCESS !=
377
            (tmp_ret = setup_user_resource_group(*conn, sess->get_effective_tenant_id(), sess)))) {
378
      LOG_WARN("fail setup user resource group", K(tmp_ret), K(ret));
379
      ret = OB_SUCC(ret) ? tmp_ret : ret;
380
    }
381
  }
382

383
  if (OB_FAIL(ret) && need_response_error && is_conn_valid()) {
384
    send_error_packet(ret, NULL);
385
  }
386
  if (OB_FAIL(ret) && OB_UNLIKELY(need_disconnect) && is_conn_valid()) {
387
    force_disconnect();
388
    LOG_WARN("disconnect connection", KR(ret));
389
  }
390

391
  // 如果已经异步回包,则这部分逻辑在cb中执行,这里跳过flush_buffer()
392
  if (!THIS_WORKER.need_retry()) {
393
    if (async_resp_used) {
394
      async_resp_used_ = true;
395
      packet_sender_.disable_response();
396
    } else if (OB_UNLIKELY(!is_conn_valid())) {
397
      tmp_ret = OB_CONNECT_ERROR;
398
      LOG_WARN("connection in error, maybe has disconnected", K(tmp_ret));
399
    } else if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = flush_buffer(true)))) {
400
      LOG_WARN("failed to flush_buffer", K(tmp_ret));
401
    }
402
  } else {
403
    need_retry_ = true;
404
  }
405

406
  // bugfix:
407
  // 必须总是将 THIS_WORKER 里的指针设置为 null
408
  THIS_WORKER.set_session(NULL); // clear session
409

410
  if (sess != NULL) {
411
    revert_session(sess); //current ignore revert session ret
412
  }
413

414
  return (OB_SUCCESS != ret) ? ret : tmp_ret;
415
}
416

417
/*
418
 * Try to evaluate multiple update queries as a single query to optimize rpc cost
419
 * for details, please ref to
420
 */
421
int ObMPQuery::try_batched_multi_stmt_optimization(sql::ObSQLSessionInfo &session,
422
                                                   common::ObIArray<ObString> &queries,
423
                                                   const ObMPParseStat &parse_stat,
424
                                                   bool &optimization_done,
425
                                                   bool &async_resp_used,
426
                                                   bool &need_disconnect,
427
                                                   bool is_ins_multi_val_opt)
428
{
429
  int ret = OB_SUCCESS;
430
  bool has_more = false;
431
  bool force_sync_resp = true;
432
  bool enable_batch_opt = session.is_enable_batched_multi_statement();
433
  bool use_plan_cache = session.get_local_ob_enable_plan_cache();
434
  optimization_done = false;
435
  if (queries.count() <= 1 || parse_stat.parse_fail_) {
436
    /*do nothing*/
437
  } else if (!enable_batch_opt) {
438
    // 未打开batch开关
439
  } else if (!use_plan_cache) {
440
    // 不打开plan_cache开关,则优化不支持
441
  } else if (OB_FAIL(process_single_stmt(ObMultiStmtItem(false, 0, sql_, &queries, is_ins_multi_val_opt),
442
                                         session,
443
                                         has_more,
444
                                         force_sync_resp,
445
                                         async_resp_used,
446
                                         need_disconnect))) {
447
    int tmp_ret = ret;
448
    if (THIS_WORKER.need_retry()) {
449
      // fail optimize, is a large query, just go back to large query queue and retry
450
    } else {
451
      ret = OB_SUCCESS;
452
    }
453
    LOG_WARN("failed to process batch stmt, cover the error code and reset retry flag",
454
        K(tmp_ret), K(ret), K(THIS_WORKER.need_retry()));
455
  } else {
456
    optimization_done = true;
457
  }
458

459
  LOG_TRACE("after to try batched multi-stmt optimization", K(optimization_done),
460
      K(queries), K(enable_batch_opt), K(ret), K(THIS_WORKER.need_retry()), K(retry_ctrl_.need_retry()), K(retry_ctrl_.get_retry_type()));
461
  return ret;
462
}
463

464
int ObMPQuery::process_single_stmt(const ObMultiStmtItem &multi_stmt_item,
465
                                   ObSQLSessionInfo &session,
466
                                   bool has_more_result,
467
                                   bool force_sync_resp,
468
                                   bool &async_resp_used,
469
                                   bool &need_disconnect)
470
{
471
  int ret = OB_SUCCESS;
472
  FLTSpanGuard(mpquery_single_stmt);
473
  ctx_.spm_ctx_.reset();
474
  bool need_response_error = true;
475
  const bool enable_trace_log = lib::is_trace_log_enabled();
476
  session.get_raw_audit_record().request_memory_used_ = 0;
477
  observer::ObProcessMallocCallback pmcb(0,
478
        session.get_raw_audit_record().request_memory_used_);
479
  lib::ObMallocCallbackGuard guard(pmcb);
480
  // 执行setup_wb后,所有WARNING都会写入到当前session的WARNING BUFFER中
481
  setup_wb(session);
482
  // 当新语句开始的时候,将该值归0,因为curr_trans_last_stmt_end_time是用于
483
  // 实现事务内部的语句执行间隔过长超时功能的。
484
  session.set_curr_trans_last_stmt_end_time(0);
485

486
  //============================ 注意这些变量的生命周期 ================================
487
  ObSessionStatEstGuard stat_est_guard(get_conn()->tenant_->id(), session.get_sessid());
488
  if (OB_FAIL(init_process_var(ctx_, multi_stmt_item, session))) {
489
    LOG_WARN("init process var failed.", K(ret), K(multi_stmt_item));
490
  } else {
491
    if (enable_trace_log) {
492
      //set session log_level.Must use ObThreadLogLevelUtils::clear() in pair
493
      ObThreadLogLevelUtils::init(session.get_log_id_level_map());
494
    }
495
    // obproxy may use 'SET @@last_schema_version = xxxx' to set newest schema,
496
    // observer will force refresh schema if local_schema_version < last_schema_version;
497
    if (OB_FAIL(check_and_refresh_schema(session.get_login_tenant_id(),
498
                                         session.get_effective_tenant_id(),
499
                                         &session))) {
500
      LOG_WARN("failed to check_and_refresh_schema", K(ret));
501
    } else if (OB_FAIL(session.update_timezone_info())) {
502
      LOG_WARN("fail to update time zone info", K(ret));
503
    } else {
504
      need_response_error = false;
505
      //每次执行不同sql都需要更新
506
      ctx_.self_add_plan_ = false;
507
      retry_ctrl_.reset_retry_times();//每个statement单独记录retry times
508
      oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
509
      do {
510
        ret = OB_SUCCESS; //当发生本地重试的时候,需要重置错误码,不然无法推进重试
511
        need_disconnect = true;
512
        // do the real work
513
        //create a new temporary memory context for executing sql can
514
        //avoid the problem the memory cannot be released in time due to too many sql items
515
        //but it will drop the sysbench performance about 1~4%
516
        //so we execute the first sql with the default memory context and
517
        //execute the rest sqls with a temporary memory context to avoid memory dynamic leaks
518
        retry_ctrl_.clear_state_before_each_retry(session.get_retry_info_for_update());
519
        bool first_exec_sql = session.get_is_in_retry() ? false :
520
            (multi_stmt_item.is_part_of_multi_stmt() ? multi_stmt_item.get_seq_num() <= 1 : true);
521
        if (OB_LIKELY(first_exec_sql)) {
522
          ret = do_process(session,
523
                           has_more_result,
524
                           force_sync_resp,
525
                           async_resp_used,
526
                           need_disconnect);
527
          ctx_.clear();
528
        } else {
529
          ret = process_with_tmp_context(session,
530
                                         has_more_result,
531
                                         force_sync_resp,
532
                                         async_resp_used,
533
                                         need_disconnect);
534
        }
535
        //set session retry state
536
        session.set_session_in_retry(retry_ctrl_.need_retry());
537
      } while (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type());
538
      //@notice: after the async packet is responsed,
539
      //the easy_buf_ hold by the sql string may have been released.
540
      //from here on, we can no longer access multi_stmt_item.sql_,
541
      //otherwise there is a risk of coredump
542
      //@TODO: need to determine a mechanism to ensure the safety of memory access here
543
    }
544
    ObThreadLogLevelUtils::clear();
545
    const int64_t debug_sync_timeout = GCONF.debug_sync_timeout;
546
    if (debug_sync_timeout > 0) {
547
      // ignore thread local debug sync actions to session actions failed
548
      int tmp_ret = OB_SUCCESS;
549
      tmp_ret = GDS.collect_result_actions(session.get_debug_sync_actions());
550
      if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
551
        LOG_WARN("set thread local debug sync actions to session actions failed", K(tmp_ret));
552
      }
553
    }
554
  }
555

556
  //对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret
557
  int tmp_ret = OB_SUCCESS;
558
  //清空WARNING BUFFER
559
  tmp_ret = do_after_process(session, ctx_, async_resp_used);
560

561
  // 设置上一条语句的结束时间,由于这里只用于实现事务内部的语句之间的执行超时,
562
  // 因此,首先,需要判断是否处于事务执行的过程中。然后对于事务提交的时候的异步回包,
563
  // 也不需要在这里设置结束时间,因为这已经相当于事务的最后一条语句了。
564
  // 最后,需要判断ret错误码,只有成功执行的sql才记录结束时间
565
  if (session.get_in_transaction() && !async_resp_used && OB_SUCC(ret)) {
566
    session.set_curr_trans_last_stmt_end_time(common::ObTimeUtility::current_time());
567
  }
568

569
  // need_response_error这个变量保证仅在
570
  // do { do_process } while(retry) 之前出错才会
571
  // 走到send_error_packet逻辑
572
  // 所以无需考虑当前为sync还是async模式
573
  if (!OB_SUCC(ret) && need_response_error && is_conn_valid()) {
574
    send_error_packet(ret, NULL);
575
  }
576
  ctx_.reset();
577
  return ret;
578
}
579

580
OB_NOINLINE int ObMPQuery::process_with_tmp_context(ObSQLSessionInfo &session,
581
                                                    bool has_more_result,
582
                                                    bool force_sync_resp,
583
                                                    bool &async_resp_used,
584
                                                    bool &need_disconnect)
585
{
586
  int ret = OB_SUCCESS;
587
  //create a temporary memory context to process retry or the rest sql of multi-query,
588
  //avoid memory dynamic leaks caused by query retry or too many multi-query items
589
  lib::ContextParam param;
590
  param.set_mem_attr(MTL_ID(),
591
      ObModIds::OB_SQL_EXECUTOR, ObCtxIds::DEFAULT_CTX_ID)
592
    .set_properties(lib::USE_TL_PAGE_OPTIONAL)
593
    .set_page_size(OB_MALLOC_REQ_NORMAL_BLOCK_SIZE)
594
    .set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE);
595
  CREATE_WITH_TEMP_CONTEXT(param) {
596
    ret = do_process(session,
597
                     has_more_result,
598
                     force_sync_resp,
599
                     async_resp_used,
600
                     need_disconnect);
601
    ctx_.first_plan_hash_ = 0;
602
    ctx_.first_outline_data_.reset();
603
    ctx_.clear();
604
  }
605
  return ret;
606
}
607

608
OB_INLINE int ObMPQuery::get_tenant_schema_info_(const uint64_t tenant_id,
609
                                                ObTenantCachedSchemaGuardInfo *cache_info,
610
                                                ObSchemaGetterGuard *&schema_guard,
611
                                                int64_t &tenant_version,
612
                                                int64_t &sys_version)
613
{
614
  int ret = OB_SUCCESS;
615
  ObSchemaGetterGuard &cached_guard = cache_info->get_schema_guard();
616
  bool need_refresh = false;
617

618
  if (!cached_guard.is_inited()) {
619
    // 第一次获取schema guard
620
    need_refresh = true;
621
  } else if (tenant_id != cached_guard.get_tenant_id()) {
622
    // change tenant
623
    need_refresh = true;
624
  } else {
625
    int64_t tmp_tenant_version = 0;
626
    int64_t tmp_sys_version = 0;
627
    if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(tenant_id, tmp_tenant_version))) {
628
      LOG_WARN("get tenant refreshed schema version error", K(ret), K(tenant_id));
629
    } else if (OB_FAIL(cached_guard.get_schema_version(tenant_id, tenant_version))) {
630
      LOG_WARN("fail get schema version", K(ret), K(tenant_id));
631
    } else if (tmp_tenant_version != tenant_version) {
632
      //需要获取schema guard
633
      need_refresh = true;
634
    } else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(OB_SYS_TENANT_ID, tmp_sys_version))) {
635
      LOG_WARN("get sys tenant refreshed schema version error", K(ret), "sys_tenant_id", OB_SYS_TENANT_ID);
636
    } else if (OB_FAIL(cached_guard.get_schema_version(OB_SYS_TENANT_ID, sys_version))) {
637
      LOG_WARN("fail get sys schema version", K(ret));
638
    } else if (tmp_sys_version != sys_version) {
639
      //需要获取schema guard
640
      need_refresh = true;
641
    } else {
642
      // do nothing
643
    }
644
  }
645
  if (OB_SUCC(ret)) {
646
    if (!need_refresh) {
647
      //获取session上缓存的最新schema guard
648
      schema_guard = &(cache_info->get_schema_guard());
649
    } else if (OB_FAIL(cache_info->refresh_tenant_schema_guard(tenant_id))) {
650
      LOG_WARN("refresh tenant schema guard failed", K(ret), K(tenant_id));
651
    } else {
652
      //获取session上缓存的最新schema guard
653
      schema_guard = &(cache_info->get_schema_guard());
654
      if (OB_FAIL(schema_guard->get_schema_version(tenant_id, tenant_version))) {
655
        LOG_WARN("fail get schema version", K(ret), K(tenant_id));
656
      } else if (OB_FAIL(schema_guard->get_schema_version(OB_SYS_TENANT_ID, sys_version))) {
657
        LOG_WARN("fail get sys schema version", K(ret));
658
      } else {
659
        // do nothing
660
      }
661
    }
662
  }
663

664
  return ret;
665
}
666

667
OB_INLINE int ObMPQuery::do_process(ObSQLSessionInfo &session,
668
                                    bool has_more_result,
669
                                    bool force_sync_resp,
670
                                    bool &async_resp_used,
671
                                    bool &need_disconnect)
672
{
673
  int ret = OB_SUCCESS;
674
  ObAuditRecordData &audit_record = session.get_raw_audit_record();
675
  audit_record.try_cnt_++;
676
  bool is_diagnostics_stmt = false;
677
  bool need_response_error = true;
678
  const ObString &sql = ctx_.multi_stmt_item_.get_sql();
679
  const bool enable_perf_event = lib::is_diagnose_info_enabled();
680
  const bool enable_sql_audit =
681
    GCONF.enable_sql_audit && session.get_local_ob_enable_sql_audit();
682
  single_process_timestamp_ = ObTimeUtility::current_time();
683
  /* !!!
684
   * 注意req_timeinfo_guard一定要放在result前面
685
   * !!!
686
   */
687
  ObReqTimeGuard req_timeinfo_guard;
688
  ObPhysicalPlan *plan = nullptr;
689
  ObSchemaGetterGuard* schema_guard = nullptr;
690
  ObTenantCachedSchemaGuardInfo &cached_schema_info = session.get_cached_schema_guard_info();
691
  int64_t tenant_version = 0;
692
  int64_t sys_version = 0;
693
  common::ObSqlInfoGuard si_guard(sql);
694
  ObSqlFatalErrExtraInfoGuard extra_info_guard;
695
  extra_info_guard.set_cur_sql(sql);
696
  extra_info_guard.set_tenant_id(session.get_effective_tenant_id());
697
  ObIAllocator &allocator = CURRENT_CONTEXT->get_arena_allocator();
698
  SMART_VAR(ObMySQLResultSet, result, session, allocator) {
699
    if (OB_FAIL(get_tenant_schema_info_(session.get_effective_tenant_id(),
700
                                        &cached_schema_info,
701
                                        schema_guard,
702
                                        tenant_version,
703
                                        sys_version))) {
704
      LOG_WARN("get tenant schema info error", K(ret), K(session));
705
    } else if (OB_FAIL(session.update_query_sensitive_system_variable(*schema_guard))) {
706
      LOG_WARN("update query sensitive system vairable in session failed", K(ret));
707
    } else if (OB_FAIL(update_transmission_checksum_flag(session))) {
708
      LOG_WARN("update transmisson checksum flag failed", K(ret));
709
    } else if (OB_ISNULL(gctx_.sql_engine_)) {
710
      ret = OB_ERR_UNEXPECTED;
711
      LOG_ERROR("invalid sql engine", K(ret), K(gctx_));
712
    } else {
713
      session.set_current_execution_id(GCTX.sql_engine_->get_execution_id());
714
      result.get_exec_context().set_need_disconnect(true);
715
      ctx_.schema_guard_ = schema_guard;
716
      retry_ctrl_.set_tenant_local_schema_version(tenant_version);
717
      retry_ctrl_.set_sys_local_schema_version(sys_version);
718
      extra_info_guard.set_exec_context(&(result.get_exec_context()));
719
    }
720

721
    ObWaitEventStat total_wait_desc;
722
    ObDiagnoseSessionInfo *di = NULL;
723
    if (OB_SUCC(ret)) {
724
      if (enable_perf_event) {
725
        di = ObDiagnoseSessionInfo::get_local_diagnose_info();
726
      }
727
      ObMaxWaitGuard max_wait_guard(enable_perf_event ? &audit_record.exec_record_.max_wait_event_ : NULL, di);
728
      ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di);
729
      if (enable_perf_event) {
730
        audit_record.exec_record_.record_start(di);
731
      }
732
      result.set_has_more_result(has_more_result);
733
      ObTaskExecutorCtx &task_ctx = result.get_exec_context().get_task_exec_ctx();
734
      task_ctx.schema_service_ = gctx_.schema_service_;
735
      task_ctx.set_query_tenant_begin_schema_version(retry_ctrl_.get_tenant_local_schema_version());
736
      task_ctx.set_query_sys_begin_schema_version(retry_ctrl_.get_sys_local_schema_version());
737
      task_ctx.set_min_cluster_version(GET_MIN_CLUSTER_VERSION());
738
      ctx_.retry_times_ = retry_ctrl_.get_retry_times();
739
      ctx_.enable_sql_resource_manage_ = true;
740
      //storage::ObPartitionService* ps = static_cast<storage::ObPartitionService *> (GCTX.par_ser_);
741
      //bool is_read_only = false;
742
      if (OB_FAIL(ret)) {
743
        // do nothing
744
      } else if (OB_ISNULL(ctx_.schema_guard_)) {
745
        ret = OB_INVALID_ARGUMENT;
746
        LOG_WARN("newest schema is NULL", K(ret));
747
      } else if (OB_FAIL(set_session_active(sql, session, single_process_timestamp_))) {
748
        LOG_WARN("fail to set session active", K(ret));
749
      } else if (OB_FAIL(gctx_.sql_engine_->stmt_query(sql, ctx_, result))) {
750
        exec_start_timestamp_ = ObTimeUtility::current_time();
751
        if (!THIS_WORKER.need_retry()) {
752
          int cli_ret = OB_SUCCESS;
753
          retry_ctrl_.test_and_save_retry_state(gctx_, ctx_, result, ret, cli_ret);
754
          if (OB_ERR_PROXY_REROUTE == ret) {
755
            LOG_DEBUG("run stmt_query failed, check if need retry",
756
                      K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K(sql));
757
          } else {
758
            LOG_WARN("run stmt_query failed, check if need retry",
759
                     K(ret), K(cli_ret), K(retry_ctrl_.need_retry()),
760
                     "sql", ctx_.is_sensitive_ ? ObString(OB_MASKED_STR) : sql);
761
          }
762
          ret = cli_ret;
763
          if (OB_ERR_PROXY_REROUTE == ret) {
764
            // 该错误码在下层是由编译阶段被设置,async_resp_used标志一定为false
765
            // 所以此时可以同步回包,设置need_response_error
766
            // 向客户端返回一个error包,表示需要二次路由
767
            need_response_error = true;
768
          } else if (ctx_.multi_stmt_item_.is_batched_multi_stmt()) {
769
            // batch execute with error,should not response error packet
770
            need_response_error = false;
771
          } else if (OB_BATCHED_MULTI_STMT_ROLLBACK == ret) {
772
            need_response_error = false;
773
          }
774
        } else {
775
          retry_ctrl_.set_packet_retry(ret);
776
          session.get_retry_info_for_update().set_last_query_retry_err(ret);
777
          session.get_retry_info_for_update().inc_retry_cnt();
778
        }
779
      } else {
780
        //监控项统计开始
781
        exec_start_timestamp_ = ObTimeUtility::current_time();
782
        result.get_exec_context().set_plan_start_time(exec_start_timestamp_);
783
        // 本分支内如果出错,全部会在response_result内部处理妥当
784
        // 无需再额外处理回复错误包
785
        need_response_error = false;
786
        is_diagnostics_stmt = ObStmt::is_diagnostic_stmt(result.get_literal_stmt_type());
787
        ctx_.is_show_trace_stmt_ = ObStmt::is_show_trace_stmt(result.get_literal_stmt_type());
788
        plan = result.get_physical_plan();
789
        extra_info_guard.set_cur_plan(plan);
790

791
        if (get_is_com_filed_list()) {
792
          result.set_is_com_filed_list();
793
          result.set_wildcard_string(wild_str_);
794
        }
795

796
        //response_result
797
        if (OB_FAIL(ret)) {
798
        //TODO shengle, confirm whether 4.0 is required
799
        //} else if (OB_FAIL(fill_feedback_session_info(*result, session))) {
800
          //need_response_error = true;
801
          //LOG_WARN("failed to fill session info", K(ret));
802
        } else if (OB_FAIL(response_result(result,
803
                                           force_sync_resp,
804
                                           async_resp_used))) {
805
          ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
806
          if (OB_ISNULL(plan_ctx)) {
807
            LOG_ERROR("execute query fail, and plan_ctx is NULL", K(ret));
808
          } else {
809
            if (OB_TRANSACTION_SET_VIOLATION != ret && OB_REPLICA_NOT_READABLE != ret) {
810
              LOG_WARN("execute query fail", K(ret), "timeout_timestamp",
811
                       plan_ctx->get_timeout_timestamp());
812
            }
813
          }
814
        }
815
      }
816

817
      int tmp_ret = OB_SUCCESS;
818
      tmp_ret = OB_E(EventTable::EN_PRINT_QUERY_SQL) OB_SUCCESS;
819
      if (OB_SUCCESS != tmp_ret) {
820
        LOG_INFO("query info:", K(sql_),
821
                 "sess_id", result.get_session().get_sessid(),
822
                 "trans_id", result.get_session().get_tx_id());
823
      }
824

825
      //监控项统计结束
826
      exec_end_timestamp_ = ObTimeUtility::current_time();
827

828
      // some statistics must be recorded for plan stat, even though sql audit disabled
829
      bool first_record = (1 == audit_record.try_cnt_);
830
      ObExecStatUtils::record_exec_timestamp(*this, first_record, audit_record.exec_timestamp_);
831
      audit_record.exec_timestamp_.update_stage_time();
832

833
      if (enable_perf_event) {
834
        audit_record.exec_record_.record_end(di);
835
        record_stat(result.get_stmt_type(), exec_end_timestamp_);
836
        audit_record.exec_record_.wait_time_end_ = total_wait_desc.time_waited_;
837
        audit_record.exec_record_.wait_count_end_ = total_wait_desc.total_waits_;
838
        audit_record.update_event_stage_state();
839
      }
840

841
      if (enable_perf_event && !THIS_THWORKER.need_retry()
842
        && OB_NOT_NULL(result.get_physical_plan())) {
843
        const int64_t time_cost = exec_end_timestamp_ - get_receive_timestamp();
844
        ObSQLUtils::record_execute_time(result.get_physical_plan()->get_plan_type(), time_cost);
845
      }
846
      // 重试需要满足一下条件:
847
      // 1. rs.open 执行失败
848
      // 2. 没有给客户端返回结果,本次执行没有副作用
849
      // 3. need_retry(result, ret):schema 或 location cache 失效
850
      // 4. 小于重试次数限制
851
      if (OB_UNLIKELY(retry_ctrl_.need_retry())) {
852
        if (OB_TRANSACTION_SET_VIOLATION != ret && OB_REPLICA_NOT_READABLE != ret && OB_TRY_LOCK_ROW_CONFLICT != ret) {
853
          //锁冲突重试不打印日志, 避免刷屏
854
          LOG_WARN("try to execute again",
855
                   K(ret),
856
                   N_TYPE, result.get_stmt_type(),
857
                   "retry_type", retry_ctrl_.get_retry_type(),
858
                   "timeout_remain", THIS_WORKER.get_timeout_remain());
859
        }
860
      } else {
861

862
        // 首个plan执行完成后立即freeze partition hit
863
        // partition_hit一旦freeze后,后继的try_set_bool操作都不生效
864
        if (OB_LIKELY(NULL != result.get_physical_plan())) {
865
          session.partition_hit().freeze();
866
        }
867

868
        // store the warning message from the most recent statement in the current session
869
        if ((OB_SUCC(ret) && is_diagnostics_stmt) || async_resp_used) {
870
          // If diagnostic stmt execute successfully, it dosen't clear the warning message.
871
          // Or if it response to client asynchronously, it doesn't clear the warning message here,
872
          // but will do it in the callback thread.
873
          session.update_show_warnings_buf();
874
        } else {
875
          session.set_show_warnings_buf(ret); // TODO: 挪个地方性能会更好,减少部分wb拷贝
876
        }
877

878
        if (OB_FAIL(ret) && !async_resp_used && need_response_error && is_conn_valid() && !THIS_WORKER.need_retry()) {
879
          if (OB_ERR_PROXY_REROUTE == ret) {
880
            LOG_DEBUG("query should be rerouted", K(ret), K(async_resp_used));
881
          } else {
882
            LOG_WARN("query failed", K(ret), K(session),
883
                     "sql", ctx_.is_sensitive_ ? ObString(OB_MASKED_STR) : sql,
884
                     K(retry_ctrl_.need_retry()));
885
          }
886
          // 当need_retry=false时,可能给客户端回过包了,可能还没有回过任何包。
887
          // 不过,可以确定:这个请求出错了,还没处理完。如果不是已经交给异步EndTrans收尾,
888
          // 则需要在下面回复一个error_packet作为收尾。否则后面没人帮忙发错误包给客户端了,
889
          // 可能会导致客户端挂起等回包。
890
          bool is_partition_hit = session.get_err_final_partition_hit(ret);
891
          int err = send_error_packet(ret, NULL, is_partition_hit, (void *)ctx_.get_reroute_info());
892
          if (OB_SUCCESS != err) {  // 发送error包
893
            LOG_WARN("send error packet failed", K(ret), K(err));
894
          }
895
        }
896
      }
897
    }
898

899
    audit_record.status_ = (0 == ret || OB_ITER_END == ret)
900
        ? REQUEST_SUCC : (ret);
901
    if (enable_sql_audit) {
902
      audit_record.seq_ = 0;  //don't use now
903
      audit_record.execution_id_ = session.get_current_execution_id();
904
      audit_record.client_addr_ = session.get_peer_addr();
905
      audit_record.user_client_addr_ = session.get_user_client_addr();
906
      audit_record.user_group_ = THIS_WORKER.get_group_id();
907
      MEMCPY(audit_record.sql_id_, ctx_.sql_id_, (int32_t)sizeof(audit_record.sql_id_));
908
      if (NULL != plan) {
909
        audit_record.plan_type_ = plan->get_plan_type();
910
        audit_record.table_scan_ = plan->contain_table_scan();
911
        audit_record.plan_id_ = plan->get_plan_id();
912
        audit_record.plan_hash_ = plan->get_plan_hash_value();
913
        audit_record.rule_name_ = const_cast<char *>(plan->get_rule_name().ptr());
914
        audit_record.rule_name_len_ = plan->get_rule_name().length();
915
        audit_record.partition_hit_ = session.partition_hit().get_bool();
916
      }
917
      if (OB_FAIL(ret) && audit_record.trans_id_ == 0) {
918
        // normally trans_id is set in the `start-stmt` phase,
919
        // if `start-stmt` hasn't run, set trans_id from session if an active txn exist
920
        audit_record.trans_id_ = session.get_tx_id();
921
      }
922
      audit_record.affected_rows_ = result.get_affected_rows();
923
      audit_record.return_rows_ = result.get_return_rows();
924
      audit_record.partition_cnt_ = result.get_exec_context()
925
                                          .get_das_ctx()
926
                                          .get_related_tablet_cnt();
927
      audit_record.expected_worker_cnt_ = result.get_exec_context()
928
                                                .get_task_exec_ctx()
929
                                                .get_expected_worker_cnt();
930
      audit_record.used_worker_cnt_ = result.get_exec_context()
931
                                            .get_task_exec_ctx()
932
                                            .get_admited_worker_cnt();
933

934
      audit_record.is_executor_rpc_ = false;
935
      audit_record.is_inner_sql_ = false;
936
      audit_record.is_hit_plan_cache_ = result.get_is_from_plan_cache();
937
      audit_record.is_multi_stmt_ = session.get_capability().cap_flags_.OB_CLIENT_MULTI_STATEMENTS;
938
      audit_record.is_batched_multi_stmt_ = ctx_.multi_stmt_item_.is_batched_multi_stmt();
939

940
      OZ (store_params_value_to_str(allocator, session, result.get_ps_params()));
941
      audit_record.params_value_ = params_value_;
942
      audit_record.params_value_len_ = params_value_len_;
943
      audit_record.is_perf_event_closed_ = !lib::is_diagnose_info_enabled();
944

945
      ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
946
      if (OB_ISNULL(plan_ctx)) {
947
        //do nothing
948
      } else {
949
        audit_record.consistency_level_ = plan_ctx->get_consistency_level();
950
      }
951
    }
952
      //update v$sql statistics
953
    if (session.get_local_ob_enable_plan_cache()
954
        && !retry_ctrl_.need_retry()) {
955
      ObIArray<ObTableRowCount> *table_row_count_list = NULL;
956
      ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
957
      if (OB_ISNULL(plan_ctx)) {
958
        // do nothing
959
      } else {
960
        table_row_count_list = &(plan_ctx->get_table_row_count_list());
961
        audit_record.table_scan_stat_ = plan_ctx->get_table_scan_stat();
962
      }
963
      if (NULL != plan) {
964
        if (!(ctx_.self_add_plan_) && ctx_.plan_cache_hit_) {
965
          plan->update_plan_stat(audit_record,
966
                                 false, // false mean not first update plan stat
967
                                 result.get_exec_context().get_is_evolution(),
968
                                 table_row_count_list);
969
          plan->update_cache_access_stat(audit_record.table_scan_stat_);
970
        } else if (ctx_.self_add_plan_ && !ctx_.plan_cache_hit_) {
971
          plan->update_plan_stat(audit_record,
972
                                 true,
973
                                 result.get_exec_context().get_is_evolution(),
974
                                 table_row_count_list);
975
          plan->update_cache_access_stat(audit_record.table_scan_stat_);
976
        } else if (ctx_.self_add_plan_ && ctx_.plan_cache_hit_) {
977
          // spm evolution plan first execute
978
          plan->update_plan_stat(audit_record,
979
                                 true,
980
                                 result.get_exec_context().get_is_evolution(),
981
                                 table_row_count_list);
982
          plan->update_cache_access_stat(audit_record.table_scan_stat_);
983
        }
984
      }
985
    }
986
    // reset thread waring buffer in sync mode
987
    if (!async_resp_used) {
988
      clear_wb_content(session);
989
    }
990

991
    need_disconnect = (result.get_exec_context().need_disconnect()
992
                       && !is_query_killed_return(ret));//明确是kill query时,不应该断连接
993
    if (need_disconnect) {
994
      LOG_WARN("need disconnect", K(ret), K(need_disconnect));
995
    }
996
    bool is_need_retry = THIS_THWORKER.need_retry() ||
997
        RETRY_TYPE_NONE != retry_ctrl_.get_retry_type();
998
#ifdef OB_BUILD_SPM
999
    if (!is_need_retry) {
1000
      (void)ObSQLUtils::handle_plan_baseline(audit_record, plan, ret, ctx_);
1001
    }
1002
#endif
1003
    (void)ObSQLUtils::handle_audit_record(is_need_retry, EXECUTE_LOCAL, session,
1004
        ctx_.is_sensitive_);
1005
#ifdef OB_BUILD_AUDIT_SECURITY
1006
    // 对于触发重试的语句不需要进行审计,以免一条语句被审计多次
1007
    if (!retry_ctrl_.need_retry() && !async_resp_used) {
1008
      (void)ObSecurityAuditUtils::handle_security_audit(result,
1009
                                                        ctx_.schema_guard_,
1010
                                                        ctx_.cur_stmt_,
1011
                                                        ObString::make_empty_string(),
1012
                                                        ret);
1013
    }
1014
#endif
1015
  }
1016
  return ret;
1017
}
1018

1019
int ObMPQuery::store_params_value_to_str(ObIAllocator &allocator,
1020
                                         sql::ObSQLSessionInfo &session,
1021
                                         common::ParamStore &params)
1022
{
1023
  int ret = OB_SUCCESS;
1024
  int64_t pos = 0;
1025
  int64_t length = OB_MAX_SQL_LENGTH;
1026
  CK (OB_NOT_NULL(params_value_ = static_cast<char *>(allocator.alloc(OB_MAX_SQL_LENGTH))));
1027
  for (int64_t i = 0; OB_SUCC(ret) && i < params.count(); ++i) {
1028
    const common::ObObjParam &param = params.at(i);
1029
    if (param.is_ext()) {
1030
      pos = 0;
1031
      params_value_ = NULL;
1032
      params_value_len_ = 0;
1033
      break;
1034
    } else {
1035
      OZ (param.print_sql_literal(params_value_, length, pos, allocator, TZ_INFO(&session)));
1036
      if (i != params.count() - 1) {
1037
        OZ (databuff_printf(params_value_, length, pos, allocator, ","));
1038
      }
1039
    }
1040
  }
1041
  if (OB_FAIL(ret)) {
1042
    params_value_ = NULL;
1043
    params_value_len_ = 0;
1044
    ret = OB_SUCCESS;
1045
  } else {
1046
    params_value_len_ = pos;
1047
  }
1048
  return ret;
1049
}
1050

1051
//int ObMPQuery::fill_feedback_session_info(ObMySQLResultSet &result,
1052
//                                          ObSQLSessionInfo &session)
1053
//{
1054
//  int ret = OB_SUCCESS;
1055
//  ObPhysicalPlan *temp_plan = NULL;
1056
//  ObTaskExecutorCtx *temp_task_ctx = NULL;
1057
//  ObSchemaGetterGuard *schema_guard = NULL;
1058
//  if (session.is_abundant_feedback_support() &&
1059
//      NULL != (temp_plan = result.get_physical_plan()) &&
1060
//      NULL != (temp_task_ctx = result.get_exec_context().get_task_executor_ctx()) &&
1061
//      NULL != (schema_guard = ctx_.schema_guard_) &&
1062
//      temp_plan->get_plan_type() == ObPhyPlanType::OB_PHY_PLAN_REMOTE &&
1063
//      temp_plan->get_location_type() != ObPhyPlanType::OB_PHY_PLAN_UNCERTAIN &&
1064
//      temp_task_ctx->get_table_locations().count() == 1 &&
1065
//      temp_task_ctx->get_table_locations().at(0).get_partition_location_list().count() == 1) {
1066
//    bool is_cache_hit = false;
1067
//    ObFBPartitionParam param;
1068
//    //FIXME: should remove ObPartitionKey
1069
//    ObPartitionKey partition_key;
1070
//    ObPartitionLocation partition_loc;
1071
//    const ObTableSchema *table_schema = NULL;
1072
//    ObPartitionReplicaLocationIArray &pl_array =
1073
//        temp_task_ctx->get_table_locations().at(0).get_partition_location_list();
1074
//    if (OB_FAIL(pl_array.at(0).get_partition_key(partition_key))) {
1075
//      LOG_WARN("failed to get partition key", K(ret));
1076
//    } else if (OB_FAIL(temp_cache->get(partition_key,
1077
//                                       partition_loc,
1078
//                                       0,
1079
//                                       is_cache_hit))) {
1080
//      LOG_WARN("failed to get partition location", K(ret));
1081
//    } else if (OB_FAIL(schema_guard->get_table_schema(partition_key.get_tenant_id(),
1082
//                                                      partition_key.get_table_id(),
1083
//                                                      table_schema))) {
1084
//      LOG_WARN("failed to get table schema", K(ret), K(partition_key));
1085
//    } else if (OB_ISNULL(table_schema)) {
1086
//      ret = OB_ERR_UNEXPECTED;
1087
//      LOG_WARN("null table schema", K(ret));
1088
//    } else if (OB_FAIL(build_fb_partition_param(*table_schema, partition_loc, param))) {
1089
//      LOG_WARN("failed to build fb partition pararm", K(ret));
1090
//    } else if (OB_FAIL(session.set_partition_location_feedback(param))) {
1091
//      LOG_WARN("failed to set partition location feedback", K(param), K(ret));
1092
//    } else { /*do nothing*/ }
1093
//  } else { /*do nothing*/}
1094
//  return ret;
1095
//}
1096

1097
//int ObMPQuery::build_fb_partition_param(
1098
//    const ObTableSchema &table_schema,
1099
//    const ObPartitionLocation &partition_loc,
1100
//    ObFBPartitionParam &param) {
1101
//  INIT_SUCC(ret);
1102
//  param.schema_version_ = table_schema.get_schema_version();
1103
//  int64_t origin_partition_idx = OB_INVALID_ID;
1104
//  if (OB_FAIL(param.pl_.assign(partition_loc))) {
1105
//    LOG_WARN("fail to assign pl", K(partition_loc), K(ret));
1106
//  }
1107
//  // when table partition_id to client, we need convert it to
1108
//  // real partition idx(e.g. hash partition split)
1109
//  else if (OB_FAIL(table_schema.convert_partition_id_to_idx(
1110
//          partition_loc.get_partition_id(), origin_partition_idx))) {
1111
//    LOG_WARN("fail to convert partition id", K(partition_loc), K(ret));
1112
//  } else {
1113
//    param.original_partition_id_ = origin_partition_idx;
1114
//  }
1115
//
1116
//  return ret;
1117
//}
1118

1119
int ObMPQuery::check_readonly_stmt(ObMySQLResultSet &result)
1120
{
1121
  int ret = OB_SUCCESS;
1122
  bool is_readonly = false;
1123
  //在该阶段,show语句会转换为select语句,
1124
  //literal_stmt_type若不为stmt::T_NONE,则表示原有的类型show
1125
  const stmt::StmtType type = stmt::T_NONE == result.get_literal_stmt_type() ?
1126
                              result.get_stmt_type() :
1127
                              result.get_literal_stmt_type();
1128
  ObConsistencyLevel consistency = INVALID_CONSISTENCY;
1129
  if (OB_FAIL(is_readonly_stmt(result, is_readonly))) {
1130
    LOG_WARN("check stmt is readonly fail", K(ret), K(result));
1131
  } else if (!is_readonly) {
1132
    ret = OB_ERR_READ_ONLY;
1133
    LOG_WARN("stmt is not readonly", K(ret), K(result));
1134
  } else if (stmt::T_SELECT == type) {
1135
    //对select语句,需要禁强一致读
1136
    //通过设置/*+read_consistency()*/ hint
1137
    //or 指定session级别ob_read_consistency = 2
1138
    //来设置弱一致性读
1139
    const int64_t table_count = DAS_CTX(result.get_exec_context()).get_table_loc_list().size();
1140
    if (0 == table_count) {
1141
      //此处比较特殊,jdbc在发送查询语句时会带上特殊的语句select @@session.tx_read_only;
1142
      //为方便obtest测试,需要放开对无table_locatitons的select语句的限制
1143
    } else if (OB_FAIL(result.get_read_consistency(consistency))) {
1144
      LOG_WARN("get read consistency fail", K(ret));
1145
    } else if (WEAK != consistency) {
1146
      ret = OB_ERR_READ_ONLY;
1147
      LOG_WARN("strong consistency read is not allowed", K(ret), K(type), K(consistency));
1148
    }
1149
  }
1150
  return ret;
1151
}
1152

1153
int ObMPQuery::is_readonly_stmt(ObMySQLResultSet &result, bool &is_readonly)
1154
{
1155
  int ret = OB_SUCCESS;
1156
  is_readonly = false;
1157
  const stmt::StmtType type = stmt::T_NONE == result.get_literal_stmt_type() ?
1158
                              result.get_stmt_type() :
1159
                              result.get_literal_stmt_type();
1160
  switch (type) {
1161
    case stmt::T_SELECT: {
1162
      //对select...for update语句,也需要禁止
1163
      ObPhysicalPlan *physical_plan = result.get_physical_plan();
1164
      if (NULL == physical_plan) {
1165
        ret = OB_ERR_UNEXPECTED;
1166
        LOG_WARN("physical_plan should not be null", K(ret));
1167
      } else if (physical_plan->has_for_update()) {
1168
        is_readonly = false;
1169
      } else {
1170
        is_readonly = true;
1171
      }
1172
      break;
1173
    }
1174
    case stmt::T_VARIABLE_SET: {
1175
      //禁止set @@global.variable语句
1176
      if (result.has_global_variable()) {
1177
        is_readonly = false;
1178
      } else {
1179
        is_readonly = true;
1180
      }
1181
      break;
1182
    }
1183
    case stmt::T_EXPLAIN:
1184
    case stmt::T_SHOW_TABLES:
1185
    case stmt::T_SHOW_DATABASES:
1186
    case stmt::T_SHOW_COLUMNS:
1187
    case stmt::T_SHOW_VARIABLES:
1188
    case stmt::T_SHOW_TABLE_STATUS:
1189
    case stmt::T_SHOW_SCHEMA:
1190
    case stmt::T_SHOW_CREATE_DATABASE:
1191
    case stmt::T_SHOW_CREATE_TABLE:
1192
    case stmt::T_SHOW_CREATE_VIEW:
1193
    case stmt::T_SHOW_PARAMETERS:
1194
    case stmt::T_SHOW_SERVER_STATUS:
1195
    case stmt::T_SHOW_INDEXES:
1196
    case stmt::T_SHOW_WARNINGS:
1197
    case stmt::T_SHOW_ERRORS:
1198
    case stmt::T_SHOW_PROCESSLIST:
1199
    case stmt::T_SHOW_CHARSET:
1200
    case stmt::T_SHOW_COLLATION:
1201
    case stmt::T_SHOW_TABLEGROUPS:
1202
    case stmt::T_SHOW_STATUS:
1203
    case stmt::T_SHOW_TENANT:
1204
    case stmt::T_SHOW_CREATE_TENANT:
1205
    case stmt::T_SHOW_TRACE:
1206
    case stmt::T_SHOW_TRIGGERS:
1207
    case stmt::T_SHOW_ENGINES:
1208
    case stmt::T_SHOW_PRIVILEGES:
1209
    case stmt::T_SHOW_RESTORE_PREVIEW:
1210
    case stmt::T_SHOW_GRANTS:
1211
    case stmt::T_SHOW_QUERY_RESPONSE_TIME:
1212
    case stmt::T_SHOW_RECYCLEBIN:
1213
    case stmt::T_SHOW_SEQUENCES:
1214
    case stmt::T_HELP:
1215
    case stmt::T_USE_DATABASE:
1216
    case stmt::T_SET_NAMES: //read only not restrict it
1217
    case stmt::T_START_TRANS:
1218
    case stmt::T_END_TRANS: {
1219
      is_readonly = true;
1220
      break;
1221
    }
1222
    default: {
1223
      is_readonly = false;
1224
      break;
1225
    }
1226
  }
1227
  return ret;
1228
}
1229
int ObMPQuery::deserialize()
1230
{
1231
  int ret = OB_SUCCESS;
1232

1233
  //OB_ASSERT(req_);
1234
  //OB_ASSERT(req_->get_type() == ObRequest::OB_MYSQL);
1235
  if ( (OB_ISNULL(req_)) || (req_->get_type() != ObRequest::OB_MYSQL)) {
1236
    ret = OB_INVALID_ARGUMENT;
1237
    LOG_ERROR("invalid request", K(ret), K(req_));
1238
  } else if (get_is_com_filed_list()) {
1239
    if (OB_FAIL(deserialize_com_field_list())) {
1240
      LOG_WARN("failed to deserialize com field list", K(ret));
1241
    }
1242
  } else {
1243
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
1244
    sql_.assign_ptr(const_cast<char *>(pkt.get_cdata()), pkt.get_clen()-1);
1245
  }
1246

1247
  return ret;
1248
}
1249

1250
// return false only if send packet fail.
1251
OB_INLINE int ObMPQuery::response_result(ObMySQLResultSet &result,
1252
                                         bool force_sync_resp,
1253
                                         bool &async_resp_used)
1254
{
1255
  int ret = OB_SUCCESS;
1256
  FLTSpanGuard(sql_execute);
1257
  //ac = 1时线程新启事务进行oracle临时表数据清理会和clog回调形成死锁, 这里改成同步方式
1258
  ObSQLSessionInfo &session = result.get_session();
1259
  CHECK_COMPATIBILITY_MODE(&session);
1260

1261
#ifndef OB_BUILD_SPM
1262
  bool need_trans_cb  = result.need_end_trans_callback() && (!force_sync_resp);
1263
#else
1264
  bool need_trans_cb  = result.need_end_trans_callback() &&
1265
                        (!force_sync_resp) &&
1266
                        (!ctx_.spm_ctx_.check_execute_status_);
1267
#endif
1268

1269
  // 通过判断 plan 是否为 null 来确定是 plan 还是 cmd
1270
  // 针对 plan 和 cmd 分开处理,逻辑会较为清晰。
1271
  if (OB_LIKELY(NULL != result.get_physical_plan())) {
1272
    if (need_trans_cb) {
1273
      ObAsyncPlanDriver drv(gctx_, ctx_, session, retry_ctrl_, *this);
1274
      // NOTE: sql_end_cb必须在drv.response_result()之前初始化好
1275
      ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
1276
      if (OB_FAIL(sql_end_cb.init(packet_sender_, &session))) {
1277
        LOG_WARN("failed to init sql end callback", K(ret));
1278
      } else if (OB_FAIL(drv.response_result(result))) {
1279
        LOG_WARN("fail response async result", K(ret));
1280
      }
1281
      async_resp_used = result.is_async_end_trans_submitted();
1282
    } else {
1283
      // 试点ObQuerySyncDriver
1284
      ObSyncPlanDriver drv(gctx_, ctx_, session, retry_ctrl_, *this);
1285
      ret = drv.response_result(result);
1286
    }
1287
  } else {
1288
    if (need_trans_cb) {
1289
      ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
1290
      ObAsyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this);
1291
      if (OB_FAIL(sql_end_cb.init(packet_sender_, &session))) {
1292
        LOG_WARN("failed to init sql end callback", K(ret));
1293
      } else if (OB_FAIL(drv.response_result(result))) {
1294
        LOG_WARN("fail response async result", K(ret));
1295
      }
1296
      async_resp_used = result.is_async_end_trans_submitted();
1297
    } else {
1298
      ObSyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this);
1299
      session.set_pl_query_sender(&drv);
1300
      session.set_ps_protocol(result.is_ps_protocol());
1301
      ret = drv.response_result(result);
1302
      session.set_pl_query_sender(NULL);
1303
    }
1304
  }
1305

1306
  return ret;
1307
}
1308

1309
inline void ObMPQuery::record_stat(const stmt::StmtType type, const int64_t end_time) const
1310
{
1311
#define ADD_STMT_STAT(type)                     \
1312
  case stmt::T_##type:                          \
1313
    EVENT_INC(SQL_##type##_COUNT);              \
1314
    EVENT_ADD(SQL_##type##_TIME, time_cost);    \
1315
    break
1316
  const int64_t time_cost = end_time - get_receive_timestamp();
1317
  if (!THIS_THWORKER.need_retry())
1318
  {
1319
    switch (type)
1320
    {
1321
      ADD_STMT_STAT(SELECT);
1322
      ADD_STMT_STAT(INSERT);
1323
      ADD_STMT_STAT(REPLACE);
1324
      ADD_STMT_STAT(UPDATE);
1325
      ADD_STMT_STAT(DELETE);
1326
    default:
1327
    {
1328
      EVENT_INC(SQL_OTHER_COUNT);
1329
      EVENT_ADD(SQL_OTHER_TIME, time_cost);
1330
    }
1331
    }
1332
  }
1333
#undef ADD_STMT_STAT
1334
}
1335

1336
int ObMPQuery::deserialize_com_field_list()
1337
{
1338
  int ret = OB_SUCCESS;
1339
  //如果设置了只需要返回列定义,说明client传来的是COM_FIELD_LIST命令
1340
  /* mysql中的COM_FIELD_LIST命令用于获取table中的列定义,其从client to server的packet为:
1341
  *  1              [04] COM_FIELD_LIST
1342
  *  string[NUL]    table
1343
  *  string[EOF]    field wildcard
1344
  *  首先是CMD类型,然后是表名,最后是匹配条件
1345
  *
1346
  * server to client的packet为下面中的其中一个:
1347
  * 1. a ERR_Packet(返回一个错误包)
1348
  * 2. one or more Column Definition packets and a closing EOF_Packet(返回n个列定义+EOF)
1349
  *
1350
  * 由于普通的select的查询结果已经包含了column定义,同时最大限度复用当前的代码逻辑,因此可以将COM_FIELD_LIST的命令
1351
  * 等价于:
1352
  * select * from table limit 0 ==> 获取filed define ==> 根据field wildcard 按需反回 Column Definition
1353
  *
1354
  * 参考:https://dev.mysql.com/doc/internals/en/com-field-list.html
1355
   */
1356
  ObIAllocator *alloc = &THIS_WORKER.get_sql_arena_allocator();
1357
  if (OB_ISNULL(alloc)) {
1358
    ret = OB_ERR_UNEXPECTED;
1359
    LOG_WARN("get unexpected null", K(alloc), K(ret));
1360
  } else {
1361
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
1362
    const char *str = pkt.get_cdata();
1363
    uint32_t length = pkt.get_clen();
1364
    const char *str1 = "select * from ";
1365
    const char *str2 = " where 0";
1366
    int64_t i = 0;
1367
    const int64_t str1_len = strlen(str1);
1368
    const int64_t str2_len = strlen(str2);
1369
    const int pre_size = str1_len + str2_len;
1370
    //寻找client传过来的table_name和filed wildcard之间的分隔符(table_name [NULL] filed wildcard)
1371
    for (; static_cast<int>(str[i]) != 0 && i < length; ++i) {}
1372
    char *dest_str = static_cast<char *>(alloc->alloc(length + pre_size));
1373
    if (OB_ISNULL(dest_str)) {
1374
      ret = OB_ALLOCATE_MEMORY_FAILED;
1375
      LOG_WARN("failed to alloc", K(dest_str));
1376
    } else {
1377
      char *buf = dest_str;
1378
      uint32_t real_len = 0;
1379
      MEMSET(buf, 0, length + pre_size);
1380
      MEMCPY(buf, str1, str1_len);
1381
      buf = buf + str1_len;
1382
      real_len = real_len + str1_len;
1383
      MEMCPY(buf, str, i);
1384
      buf = buf + i;
1385
      real_len = real_len + i;
1386
      MEMCPY(buf, str2, str2_len);
1387
      real_len = real_len + str2_len;
1388
      sql_.assign_ptr(dest_str, real_len);
1389
      //extract wildcard
1390
      if (i + 1 < length - 1) {
1391
        wild_str_.assign_ptr(str + i + 1, (length - 1) - (i +1));
1392
      }
1393
    }
1394
  }
1395
  return ret;
1396
}
1397

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.