oceanbase

Форк
0
/
obmp_stmt_prepare.cpp 
707 строк · 29.5 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "observer/mysql/obmp_stmt_prepare.h"
16

17
#include "lib/worker.h"
18
#include "lib/oblog/ob_log.h"
19
#include "lib/stat/ob_session_stat.h"
20
#include "rpc/ob_request.h"
21
#include "rpc/obmysql/ob_mysql_packet.h"
22
#include "rpc/obmysql/packet/ompk_prepare.h"
23
#include "rpc/obmysql/packet/ompk_field.h"
24
#include "observer/mysql/ob_mysql_result_set.h"
25
#include "observer/mysql/ob_async_plan_driver.h"
26
#include "observer/mysql/ob_sync_cmd_driver.h"
27
#include "observer/mysql/ob_sync_plan_driver.h"
28
#include "rpc/obmysql/obsm_struct.h"
29
#include "observer/omt/ob_tenant.h"
30
#include "share/schema/ob_schema_getter_guard.h"
31
#include "sql/ob_sql_context.h"
32
#include "sql/session/ob_sql_session_info.h"
33
#include "sql/ob_sql.h"
34
#include "observer/ob_req_time_service.h"
35
#include "observer/mysql/obmp_utils.h"
36

37
namespace oceanbase
38
{
39

40
using namespace rpc;
41
using namespace common;
42
using namespace share;
43
using namespace obmysql;
44
using namespace sql;
45

46
namespace observer
47
{
48

49
ObMPStmtPrepare::ObMPStmtPrepare(const ObGlobalContext &gctx)
50
    : ObMPBase(gctx),
51
      retry_ctrl_(/*ctx_.retry_info_*/),
52
      sql_(),
53
      sql_len_(),
54
      single_process_timestamp_(0),
55
      exec_start_timestamp_(0),
56
      exec_end_timestamp_(0)
57
{
58
  ctx_.exec_type_ = MpQuery;
59
}
60

61
int ObMPStmtPrepare::deserialize()
62
{
63
  int ret = OB_SUCCESS;
64
  if ((OB_ISNULL(req_)) || (req_->get_type() != ObRequest::OB_MYSQL)) {
65
    ret = OB_INVALID_ARGUMENT;
66
    LOG_ERROR("invalid request", K(ret), K(req_));
67
  } else {
68
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
69
    sql_.assign_ptr(const_cast<char *>(pkt.get_cdata()), pkt.get_clen()-1);
70
  }
71

72
  return ret;
73
}
74

75
int ObMPStmtPrepare::before_process()
76
{
77
  int ret = OB_SUCCESS;
78
  if (OB_FAIL(ObMPBase::before_process())) {
79
    LOG_WARN("failed to pre processing packet", K(ret));
80
  } else if (0 == sql_.case_compare("call dbms_output.get_line(?, ?)")) {
81
    // do nothing
82
  } else if (!GCONF._ob_enable_prepared_statement) {
83
    ret = OB_NOT_SUPPORTED;
84
    LOG_USER_ERROR(OB_NOT_SUPPORTED,
85
    "while parameter _ob_enable_prepared_statement is disabled, prepared statement");
86
    send_error_packet(ret, NULL);
87
  }
88

89
  return ret;
90
}
91

92
int ObMPStmtPrepare::multiple_query_check(ObSQLSessionInfo &session,
93
                                          ObString &sql,
94
                                          bool &force_sync_resp,
95
                                          bool &need_response_error)
96
{
97
  int ret = OB_SUCCESS;
98
  if (OB_UNLIKELY(1 == session.get_capability().cap_flags_.OB_CLIENT_MULTI_STATEMENTS)) {
99
    ObSEArray<ObString, 1> queries;
100
    ObParser parser(THIS_WORKER.get_allocator(),
101
                    session.get_sql_mode(), session.get_charsets4parser());
102
    bool parse_fail = false;
103
    ObMPParseStat parse_stat;
104
    force_sync_resp = true;
105
    /* MySQL处理Multi-Stmt出错时候的行为:
106
      * 遇到首次运行失败(包括解析或执行)的SQL后,停止读取后继数据
107
      *  例如:
108
      *  (1) select 1; selct 2; select 3;
109
      *  select 1执行成功,selct 2报语法错误,select 3不被执行
110
      *  (2) select 1; drop table not_exists_table; select 3;
111
      *  select 1执行成功,drop table not_exists_table报表不存在错误,select 3不被执行
112
      *
113
      * 特别注意:
114
      * split_multiple_stmt是根据分号来分割语句,但有可能遇到“语法错误”,
115
      * 这里说的“语法错误”不是说select写成了selct,而是“token”级别的语法错误,例如语句
116
      * select 1;`select 2; select 3;
117
      * 上面`和'都没有形成闭合的字符串token,token parser会报告语法错误
118
      * 上面的例子中,得到的queries.count() 等于 2,分别为select 1和 `select 2; select 3;
119
      */
120
    ret = parser.split_multiple_stmt(sql, queries, parse_stat, false, true);
121
    if (OB_SUCC(ret)) { // ret=SUCC,并不意味着parse就一定成功,可能最后一个query是parse失败的
122
      if (OB_UNLIKELY(queries.count() <= 0)) {
123
        LOG_ERROR("emtpy query count. client would have suspended. never be here!",
124
                  K(sql), K(parse_fail));
125
      } else if (queries.count() > 1) {
126
        ret = OB_NOT_SUPPORTED;
127
        need_response_error = true;
128
        LOG_WARN("can't not prepare multi stmt", K(ret), K(queries.count()));
129
      } else {
130
        if (OB_UNLIKELY(parse_stat.parse_fail_ && (0 == parse_stat.fail_query_idx_)
131
                        && ObSQLUtils::check_need_disconnect_parser_err(parse_stat.fail_ret_))) {
132
          // 进入本分支,说明在multi_query中的某条query parse失败,如果不是语法错,则进入该分支
133
          // 如果当前query_count 为1, 则不断连接;如果大于1,
134
          // 则需要在发错误包之后断连接,防止客户端一直在等接下来的回包
135
          // 这个改动是为了解决
136
          ret = parse_stat.fail_ret_;
137
          need_response_error = true;
138
        }
139
      }
140
    } else {
141
      // 进入本分支,说明push_back出错,OOM,委托外层代码返回错误码
142
      // 且进入改分支之后,要断连接
143
      need_response_error = true;
144
      LOG_WARN("need response error", K(ret));
145
    }
146
  }
147
  return ret;
148
}
149

150
int ObMPStmtPrepare::process()
151
{
152
  int ret = OB_SUCCESS;
153
  ObSQLSessionInfo *sess = NULL;
154
  bool need_response_error = true;
155
  bool async_resp_used = false; // 由事务提交线程异步回复客户端
156
  int64_t query_timeout = 0;
157
  ObSMConnection *conn = get_conn();
158
  bool need_disconnect = true;
159

160
  if (OB_ISNULL(req_) || OB_ISNULL(conn)) {
161
    ret = OB_ERR_UNEXPECTED;
162
    LOG_WARN("req or conn is null", K_(req), K(conn), K(ret));
163
  } else if (OB_UNLIKELY(!conn->is_in_authed_phase())) {
164
    ret = OB_ERR_NO_PRIVILEGE;
165
    LOG_WARN("receive sql without session", K_(sql), K(ret));
166
  } else if (OB_ISNULL(conn->tenant_)) {
167
    ret = OB_ERR_UNEXPECTED;
168
    LOG_ERROR("invalid tenant", K_(sql), K(conn->tenant_), K(ret));
169
  } else if (OB_FAIL(get_session(sess))) {
170
    LOG_WARN("get session fail", K_(sql), K(ret));
171
  } else if (OB_ISNULL(sess)) {
172
    ret = OB_ERR_UNEXPECTED;
173
    LOG_WARN("session is NULL or invalid", K_(sql), K(sess), K(ret));
174
  } else if (OB_FAIL(update_transmission_checksum_flag(*sess))) {
175
    LOG_WARN("update transmisson checksum flag failed", K(ret));
176
  } else {
177
    ObSQLSessionInfo &session = *sess;
178
    THIS_WORKER.set_session(sess);
179
    lib::CompatModeGuard g(sess->get_compatibility_mode() == ORACLE_MODE ?
180
                             lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL);
181
    ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
182
    session.set_current_trace_id(ObCurTraceId::get_trace_id());
183
    session.init_use_rich_format();
184
    session.get_raw_audit_record().request_memory_used_ = 0;
185
    observer::ObProcessMallocCallback pmcb(0,
186
          session.get_raw_audit_record().request_memory_used_);
187
    lib::ObMallocCallbackGuard guard(pmcb);
188
    session.set_proxy_version(get_proxy_version());
189
    int64_t tenant_version = 0;
190
    int64_t sys_version = 0;
191
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
192
    int64_t packet_len = pkt.get_clen();
193
    if (OB_UNLIKELY(!session.is_valid())) {
194
      ret = OB_ERR_UNEXPECTED;
195
      LOG_ERROR("invalid session", K_(sql), K(ret));
196
    } else if (OB_FAIL(process_kill_client_session(session))) {
197
      LOG_WARN("client session has been killed", K(ret));
198
    } else if (OB_UNLIKELY(session.is_zombie())) {
199
      ret = OB_ERR_SESSION_INTERRUPTED;
200
      LOG_WARN("session has been killed", K(session.get_session_state()), K_(sql),
201
               K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(), K(ret));
202
    } else if (OB_FAIL(session.get_query_timeout(query_timeout))) {
203
      LOG_WARN("fail to get query timeout", K_(sql), K(ret));
204
    } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
205
                session.get_effective_tenant_id(), tenant_version))) {
206
      LOG_WARN("fail get tenant broadcast version", K(ret));
207
    } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
208
                OB_SYS_TENANT_ID, sys_version))) {
209
      LOG_WARN("fail get tenant broadcast version", K(ret));
210
    } else if (pkt.exist_trace_info()
211
               && OB_FAIL(session.update_sys_variable(SYS_VAR_OB_TRACE_INFO,
212
                                                      pkt.get_trace_info()))) {
213
      LOG_WARN("fail to update trace info", K(ret));
214
    } else if (FALSE_IT(session.set_txn_free_route(pkt.txn_free_route()))) {
215
    } else if (OB_FAIL(process_extra_info(session, pkt, need_response_error))) {
216
      LOG_WARN("fail get process extra info", K(ret));
217
    } else if (FALSE_IT(session.post_sync_session_info())) {
218
    } else if (OB_UNLIKELY(packet_len > session.get_max_packet_size())) {
219
      ret = OB_ERR_NET_PACKET_TOO_LARGE;
220
      need_disconnect = false;
221
      LOG_WARN("packet too large than allowd for the session", K_(sql), K(ret));
222
    } else if (OB_FAIL(sql::ObFLTUtils::init_flt_info(pkt.get_extra_info(), session,
223
                            conn->proxy_cap_flags_.is_full_link_trace_support()))) {
224
      LOG_WARN("failed to init flt extra info", K(ret));
225
    } else {
226
      FLTSpanGuard(ps_prepare);
227
      FLT_SET_TAG(log_trace_id, ObCurTraceId::get_trace_id_str(),
228
                    receive_ts, get_receive_timestamp(),
229
                    client_info, session.get_client_info(),
230
                    module_name, session.get_module_name(),
231
                    action_name, session.get_action_name(),
232
                    sess_id, session.get_sessid());
233
      THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
234
      retry_ctrl_.set_tenant_global_schema_version(tenant_version);
235
      retry_ctrl_.set_sys_global_schema_version(sys_version);
236
      session.partition_hit().reset();
237
      session.set_pl_can_retry(true);
238

239
      bool has_more = false;
240
      bool force_sync_resp = false;
241
      need_disconnect = false;
242
      need_response_error = false;
243
      if (OB_FAIL(multiple_query_check(session, sql_, force_sync_resp, need_response_error))) {
244
        need_disconnect = OB_NOT_SUPPORTED == ret ? false : true; 
245
        LOG_WARN("check multiple query fail.", K(ret));
246
      } else {
247
        ret = process_prepare_stmt(ObMultiStmtItem(false, 0, sql_), session, has_more, force_sync_resp, async_resp_used);
248
      }
249

250
      if (OB_FAIL(ret)) {
251
        //if (OB_EAGAIN == ret) {
252
          //large query, do nothing
253
        //} else
254
        if (is_conn_valid()) {//The memory of sql sting is invalid if conn_valid_ has ben set false.
255
          LOG_WARN("execute sql failed", "sql_id", ctx_.sql_id_, K_(sql), K(ret));
256
        } else {
257
          LOG_WARN("execute sql failed", K(ret));
258
        }
259
      }
260
    }
261

262
    if (!session.get_in_transaction()) {
263
        // transcation ends, end trace
264
        FLT_END_TRACE();
265
    }
266

267
    if (OB_FAIL(ret) && is_conn_valid()) {
268
      if (need_response_error) {
269
        send_error_packet(ret, NULL);
270
      }
271
      if (need_disconnect) {
272
        force_disconnect();
273
        LOG_WARN("disconnect connection when process query", K(ret));
274
      }
275
    }
276

277
    session.set_last_trace_id(ObCurTraceId::get_trace_id());
278
    THIS_WORKER.set_session(NULL);
279
    revert_session(sess); //current ignore revert session ret
280
  }
281
  return ret;
282
}
283

284
int ObMPStmtPrepare::process_prepare_stmt(const ObMultiStmtItem &multi_stmt_item,
285
                                          ObSQLSessionInfo &session,
286
                                          bool has_more_result,
287
                                          bool force_sync_resp,
288
                                          bool &async_resp_used)
289
{
290
  int ret = OB_SUCCESS;
291
  bool need_response_error = true;
292
  int64_t tenant_version = 0;
293
  int64_t sys_version = 0;
294
  setup_wb(session);
295

296
  ObSessionStatEstGuard stat_est_guard(get_conn()->tenant_->id(), session.get_sessid());
297
  if (OB_FAIL(init_process_var(ctx_, multi_stmt_item, session))) {
298
    LOG_WARN("init process var faield.", K(ret), K(multi_stmt_item));
299
  } else {
300
    const bool enable_trace_log = lib::is_trace_log_enabled();
301
    if (enable_trace_log) {
302
      ObThreadLogLevelUtils::init(session.get_log_id_level_map());
303
    }
304
    if (OB_FAIL(check_and_refresh_schema(session.get_login_tenant_id(),
305
                                         session.get_effective_tenant_id()))) {
306
      LOG_WARN("failed to check_and_refresh_schema", K(ret));
307
    } else if (OB_FAIL(session.update_timezone_info())) {
308
      LOG_WARN("fail to update time zone info", K(ret));
309
    } else {
310
      ctx_.self_add_plan_ = false;
311
      ctx_.is_prepare_protocol_ = true; //set to prepare protocol
312
      ctx_.is_prepare_stage_ = true;
313
      need_response_error = false;
314
      do {
315
        share::schema::ObSchemaGetterGuard schema_guard;
316
        retry_ctrl_.clear_state_before_each_retry(session.get_retry_info_for_update());
317
        if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
318
                    session.get_effective_tenant_id(), schema_guard))) {
319
          LOG_WARN("get schema guard failed", K(ret));
320
        } else if (OB_FAIL(schema_guard.get_schema_version(
321
                    session.get_effective_tenant_id(), tenant_version))) {
322
          LOG_WARN("fail get schema version", K(ret));
323
        } else if (OB_FAIL(schema_guard.get_schema_version(
324
                    OB_SYS_TENANT_ID, sys_version))) {
325
          LOG_WARN("fail get sys schema version", K(ret));
326
        } else {
327
          ctx_.schema_guard_ = &schema_guard;
328
          retry_ctrl_.set_tenant_local_schema_version(tenant_version);
329
          retry_ctrl_.set_sys_local_schema_version(sys_version);
330
        }
331
        if (OB_SUCC(ret)) {
332
          ret = do_process(session,
333
                           has_more_result,
334
                           force_sync_resp,
335
                           async_resp_used);
336
          session.set_session_in_retry(retry_ctrl_.need_retry());
337
        }
338
      } while (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type());
339
      if (OB_SUCC(ret) && retry_ctrl_.get_retry_times() > 0) {
340
        LOG_TRACE("sql retry succeed", K(ret),
341
                  "retry_times", retry_ctrl_.get_retry_times(), K(multi_stmt_item));
342
      }
343
    }
344
    if (enable_trace_log) {
345
      ObThreadLogLevelUtils::clear();
346
    }
347
  }
348

349
  //对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret
350
  int tmp_ret = OB_SUCCESS;
351
  //清空WARNING BUFFER
352
  tmp_ret = do_after_process(session, ctx_, async_resp_used);
353
  tmp_ret = record_flt_trace(session);
354
  // need_response_error这个变量保证仅在
355
  // do { do_process } while(retry) 之前出错才会
356
  // 走到send_error_packet逻辑
357
  // 所以无需考虑当前为sync还是async模式
358
  if (!OB_SUCC(ret) && need_response_error && is_conn_valid()) {
359
    send_error_packet(ret, NULL);
360
  }
361
  UNUSED(tmp_ret);
362
  return ret;
363
}
364

365
int ObMPStmtPrepare::check_and_refresh_schema(uint64_t login_tenant_id,
366
                                              uint64_t effective_tenant_id)
367
{
368
  int ret = OB_SUCCESS;
369
  int64_t local_version = 0;
370
  int64_t last_version = 0;
371

372
  if (login_tenant_id != effective_tenant_id) {
373
    // do nothing
374
    //
375
  } else if (OB_ISNULL(gctx_.schema_service_)) {
376
    ret = OB_INVALID_ARGUMENT;
377
    LOG_WARN("null schema service", K(ret), K(gctx_));
378
  } else {
379
    if (OB_ISNULL(ctx_.session_info_)) {
380
      ret = OB_INVALID_ARGUMENT;
381
      LOG_WARN("invalid session info", K(ret), K(ctx_.session_info_));
382
    } else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(effective_tenant_id, local_version))) {
383
      LOG_WARN("fail to get tenant refreshed schema version", K(ret));
384
    } else if (OB_FAIL(ctx_.session_info_->get_ob_last_schema_version(last_version))) {
385
      LOG_WARN("failed to get_sys_variable", K(OB_SV_LAST_SCHEMA_VERSION));
386
    } else if (local_version >= last_version) {
387
      // skip
388
    } else if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(effective_tenant_id, last_version))) {
389
      LOG_WARN("failed to refresh schema", K(ret), K(effective_tenant_id), K(last_version));
390
    }
391
  }
392
  return ret;
393
}
394

395
int ObMPStmtPrepare::do_process(ObSQLSessionInfo &session,
396
                                const bool has_more_result,
397
                                const bool force_sync_resp,
398
                                bool &async_resp_used)
399
{
400
  int ret = OB_SUCCESS;
401
  ObAuditRecordData &audit_record = session.get_raw_audit_record();
402
  audit_record.try_cnt_++;
403
  const bool enable_perf_event = lib::is_diagnose_info_enabled();
404
  const bool enable_sql_audit = GCONF.enable_sql_audit
405
                                && session.get_local_ob_enable_sql_audit();
406
  single_process_timestamp_ = ObTimeUtility::current_time();
407
  bool is_diagnostics_stmt = false;
408
  bool need_response_error = true;
409
  const ObString &sql = ctx_.multi_stmt_item_.get_sql();
410
  ObPsStmtId inner_stmt_id = OB_INVALID_ID;
411

412
  /* !!!
413
   * 注意req_timeinfo_guard一定要放在result前面
414
   * !!!
415
   */
416
  ObReqTimeGuard req_timeinfo_guard;
417
  SMART_VAR(ObMySQLResultSet, result, session, THIS_WORKER.get_allocator()) {
418
    ObWaitEventStat total_wait_desc;
419
    ObDiagnoseSessionInfo *di = ObDiagnoseSessionInfo::get_local_diagnose_info();
420
    {
421
      ObMaxWaitGuard max_wait_guard(enable_perf_event ? &audit_record.exec_record_.max_wait_event_ : NULL, di);
422
      ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di);
423
      if (enable_perf_event) {
424
        audit_record.exec_record_.record_start(di);
425
      }
426
      result.set_has_more_result(has_more_result);
427
      ObTaskExecutorCtx *task_ctx = result.get_exec_context().get_task_executor_ctx();
428
      int64_t execution_id = 0;
429
      if (OB_ISNULL(task_ctx)) {
430
        ret = OB_ERR_UNEXPECTED;
431
        LOG_ERROR("task executor ctx can not be NULL", K(task_ctx), K(ret));
432
      } else {
433
        task_ctx->set_query_tenant_begin_schema_version(retry_ctrl_.get_tenant_global_schema_version());
434
        task_ctx->set_query_sys_begin_schema_version(retry_ctrl_.get_sys_global_schema_version());
435
        task_ctx->set_min_cluster_version(GET_MIN_CLUSTER_VERSION());
436
        ctx_.retry_times_ = retry_ctrl_.get_retry_times();
437
        if (OB_ISNULL(ctx_.schema_guard_)) {
438
          ret = OB_INVALID_ARGUMENT;
439
          LOG_WARN("newest schema is NULL", K(ret));
440
        } else if (OB_FAIL(result.init())) {
441
          LOG_WARN("result set init failed", K(ret));
442
        } else if (OB_ISNULL(gctx_.sql_engine_)) {
443
          ret = OB_ERR_UNEXPECTED;
444
          LOG_ERROR("invalid sql engine", K(ret), K(gctx_));
445
        } else if (FALSE_IT(execution_id = gctx_.sql_engine_->get_execution_id())) {
446
          //nothing to do
447
        } else if (OB_FAIL(set_session_active(sql, session, ObTimeUtil::current_time(), obmysql::ObMySQLCmd::COM_STMT_PREPARE))) {
448
          LOG_WARN("fail to set session active", K(ret));
449
        } else if (OB_FAIL(gctx_.sql_engine_->stmt_prepare(sql, ctx_, result, false/*is_inner_sql*/))) {
450
          exec_start_timestamp_ = ObTimeUtility::current_time();
451
          int cli_ret = OB_SUCCESS;
452
          retry_ctrl_.test_and_save_retry_state(gctx_, ctx_, result, ret, cli_ret);
453
          LOG_WARN("run stmt_query failed, check if need retry",
454
                   K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K(sql));
455
          ret = cli_ret;
456
        } else if (common::OB_INVALID_ID != result.get_statement_id()
457
                   && OB_FAIL(session.get_inner_ps_stmt_id(result.get_statement_id(), inner_stmt_id))) {
458
          ret = OB_ERR_UNEXPECTED;
459
          LOG_WARN("ps : get inner stmt id fail.", K(ret), K(result.get_statement_id()));
460
        } else {
461
          //监控项统计开始
462
          exec_start_timestamp_ = ObTimeUtility::current_time();
463

464
          // 本分支内如果出错,全部会在response_result内部处理妥当
465
          // 无需再额外处理回复错误包
466
          need_response_error = false;
467
          is_diagnostics_stmt = ObStmt::is_diagnostic_stmt(result.get_literal_stmt_type());
468
          ctx_.is_show_trace_stmt_ = ObStmt::is_show_trace_stmt(result.get_literal_stmt_type());
469
          session.set_current_execution_id(execution_id);
470

471
          //response_result
472
          if (OB_SUCC(ret) && OB_FAIL(response_result(result,
473
                                                      session,
474
                                                      force_sync_resp,
475
                                                      async_resp_used))) {
476
            ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
477
            if (OB_ISNULL(plan_ctx)) {
478
              LOG_ERROR("execute query fail, and plan_ctx is NULL", K(ret));
479
            } else {
480
              LOG_WARN("execute query fail", K(ret), "timeout_timestamp",
481
                      plan_ctx->get_timeout_timestamp());
482
            }
483
          }
484
          //监控项统计结束
485
          exec_end_timestamp_ = ObTimeUtility::current_time();
486

487
          // some statistics must be recorded for plan stat, even though sql audit disabled
488
          bool first_record = (1 == audit_record.try_cnt_);
489
          ObExecStatUtils::record_exec_timestamp(*this, first_record, audit_record.exec_timestamp_);
490
          audit_record.exec_timestamp_.update_stage_time();
491

492
          if (enable_perf_event) {
493
            audit_record.exec_record_.record_end(di);
494
            audit_record.exec_record_.wait_time_end_ = total_wait_desc.time_waited_;
495
            audit_record.exec_record_.wait_count_end_ = total_wait_desc.total_waits_;
496
            audit_record.update_event_stage_state();
497
            if (!THIS_THWORKER.need_retry()) {
498
              const int64_t time_cost = exec_end_timestamp_ - get_receive_timestamp();
499
              EVENT_INC(SQL_PS_PREPARE_COUNT);
500
              EVENT_ADD(SQL_PS_PREPARE_TIME, time_cost);
501
            }
502
          }
503
        }
504
      }
505
    } // diagnose end
506

507
    // 重试需要满足一下条件:
508
    // 1. rs.open 执行失败
509
    // 2. 没有给客户端返回结果,本次执行没有副作用
510
    // 3. need_retry(result, ret):schema 或 location cache 失效
511
    // 4. 小于重试次数限制
512
    if (OB_UNLIKELY(retry_ctrl_.need_retry())) {
513
      LOG_WARN("try to execute again",
514
              K(ret),
515
              N_TYPE, result.get_stmt_type(),
516
              "retry_type", retry_ctrl_.get_retry_type(),
517
              "timeout_remain", THIS_WORKER.get_timeout_remain());
518
    } else {
519
      // 首个plan执行完成后立即freeze partition hit
520
      // partition_hit一旦freeze后,后继的try_set_bool操作都不生效
521
      if (OB_LIKELY(NULL != result.get_physical_plan())) {
522
        session.partition_hit().freeze();
523
      }
524

525
      // store the warning message from the most recent statement in the current session
526
      if (OB_SUCC(ret) && is_diagnostics_stmt) {
527
        // if diagnostic stmt execute successfully, it dosen't clear the warning message
528
        session.update_show_warnings_buf();
529
      } else {
530
        session.set_show_warnings_buf(ret); // TODO: 挪个地方性能会更好,减少部分wb拷贝
531
      }
532

533
      if (!OB_SUCC(ret) && !async_resp_used && need_response_error && is_conn_valid() && !THIS_WORKER.need_retry()) {
534
        LOG_WARN("query failed", K(ret), K(retry_ctrl_.need_retry()), K_(sql));
535
        // 当need_retry=false时,可能给客户端回过包了,可能还没有回过任何包。
536
        // 不过,可以确定:这个请求出错了,还没处理完。如果不是已经交给异步EndTrans收尾,
537
        // 则需要在下面回复一个error_packet作为收尾。否则后面没人帮忙发错误包给客户端了,
538
        // 可能会导致客户端挂起等回包。
539
        bool is_partition_hit = session.get_err_final_partition_hit(ret);
540
        int err = send_error_packet(ret, NULL, is_partition_hit);
541
        if (OB_SUCCESS != err) {  // 发送error包
542
          LOG_WARN("send error packet failed", K(ret), K(err));
543
        }
544
      }
545
    }
546
    if (enable_sql_audit) {
547
      audit_record.status_ = ret;
548
      audit_record.client_addr_ = session.get_peer_addr();
549
      audit_record.user_client_addr_ = session.get_user_client_addr();
550
      audit_record.user_group_ = THIS_WORKER.get_group_id();
551
      audit_record.ps_stmt_id_ = result.get_statement_id();
552
      audit_record.ps_inner_stmt_id_ = inner_stmt_id;
553
      audit_record.is_perf_event_closed_ = !lib::is_diagnose_info_enabled();
554
    }
555
    bool need_retry = (THIS_THWORKER.need_retry()
556
                       || RETRY_TYPE_NONE != retry_ctrl_.get_retry_type());
557
    ObSQLUtils::handle_audit_record(need_retry, EXECUTE_PS_PREPARE, session, ctx_.is_sensitive_);
558
  }
559

560
  // reset thread waring buffer in sync mode
561
  if (!async_resp_used) {
562
    clear_wb_content(session);
563
  }
564
  return ret;
565
}
566

567
// return false only if send packet fail.
568
int ObMPStmtPrepare::response_result(
569
    ObMySQLResultSet &result,
570
    ObSQLSessionInfo &session,
571
    bool force_sync_resp,
572
    bool &async_resp_used)
573
{
574
  int ret = OB_SUCCESS;
575
  UNUSED(force_sync_resp);
576
  UNUSED(async_resp_used);
577
//  const ObMySQLRawPacket &packet = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
578
  if (OB_FAIL(send_prepare_packet(result))) {
579
    LOG_WARN("send prepare packet failed", K(ret));
580
  } else if (OB_FAIL(send_param_packet(session, result))) {
581
    LOG_WARN("send param packet failed", K(ret));
582
  } else if (OB_FAIL(send_column_packet(session, result))) {
583
    LOG_WARN("send column packet failed", K(ret));
584
  }
585
  return ret;
586
}
587

588
int ObMPStmtPrepare::send_prepare_packet(const ObMySQLResultSet &result)
589
{
590
  int ret = OB_SUCCESS;
591
  OMPKPrepare prepare_packet;
592
  const ParamsFieldIArray *params = result.get_param_fields();
593
  const ColumnsFieldIArray *columns = result.get_field_columns();
594
  if (OB_ISNULL(params) || OB_ISNULL(columns)) {
595
    ret = OB_INVALID_ARGUMENT;
596
    LOG_WARN("invalid argument", K(ret), K(columns), K(params));
597
  } else {
598
    prepare_packet.set_statement_id(static_cast<uint32_t>(result.get_statement_id()));
599
    prepare_packet.set_column_num(static_cast<uint16_t>(result.get_field_cnt()));
600
    prepare_packet.set_warning_count(static_cast<uint16_t>(result.get_warning_count()));
601
    if (OB_ISNULL(result.get_param_fields())) {
602
      ret = OB_INVALID_ARGUMENT;
603
      LOG_WARN("invalid argument", K(ret), K(result.get_param_fields()));
604
    } else {
605
      prepare_packet.set_param_num(
606
        static_cast<uint16_t>(result.get_param_fields()->count()));
607
    }
608
  }
609

610
  if (OB_SUCC(ret) && OB_FAIL(response_packet(prepare_packet, const_cast<ObSQLSessionInfo *>(&result.get_session())))) {
611
    LOG_WARN("response packet failed", K(ret));
612
  }
613

614
  if (OB_SUCC(ret) && need_send_extra_ok_packet() && columns->count() == 0 && params->count() == 0) {
615
    ObOKPParam ok_param;
616
    if (OB_FAIL(send_ok_packet(*(const_cast<ObSQLSessionInfo *>(&result.get_session())), ok_param))) {
617
      LOG_WARN("fail to send ok packet", K(ret));
618
    }
619
  }
620
  return ret;
621
}
622

623
int ObMPStmtPrepare::send_column_packet(const ObSQLSessionInfo &session,
624
                                        ObMySQLResultSet &result)
625
{
626
  int ret = OB_SUCCESS;
627
  const ColumnsFieldIArray *columns = result.get_field_columns();
628
  if (OB_ISNULL(columns)) {
629
    ret = OB_INVALID_ARGUMENT;
630
    LOG_WARN("invalid argument", K(ret), K(columns));
631
  } else if (columns->count() > 0) {
632
    ObMySQLField field;
633
    ret = result.next_field(field);
634
    while (OB_SUCC(ret)) {
635
      OMPKField fp(field);
636
      if (OB_FAIL(response_packet(fp, const_cast<ObSQLSessionInfo *>(&session)))) {
637
        LOG_WARN("response packet fail", K(ret));
638
      } else {
639
        LOG_DEBUG("response field succ", K(field));
640
        ret = result.next_field(field);
641
      }
642
    }
643
    if (OB_ITER_END == ret) {
644
      ret = OB_SUCCESS;
645
    }
646
    if (OB_SUCC(ret)) {
647
      if (OB_FAIL(packet_sender_.update_last_pkt_pos())) {
648
        LOG_WARN("failed to update last packet pos", K(ret));
649
      } else {
650
        if (need_send_extra_ok_packet()) {
651
          ObOKPParam ok_param;
652
          if (OB_FAIL(send_eof_packet(session, result, &ok_param))) {
653
            LOG_WARN("send eof field failed", K(ret));
654
          }
655
        } else {
656
          if (OB_FAIL(send_eof_packet(session, result))) {
657
            LOG_WARN("send eof field failed", K(ret));
658
          }
659
        }
660
      }
661
    }
662
  }
663
  return ret;
664
}
665

666
int ObMPStmtPrepare::send_param_packet(const ObSQLSessionInfo &session,
667
                                       ObMySQLResultSet &result)
668
{
669
  int ret = OB_SUCCESS;
670
  const ParamsFieldIArray *params = result.get_param_fields();
671
  const ColumnsFieldIArray *columns = result.get_field_columns();
672
  if (OB_ISNULL(params) || OB_ISNULL(columns)) {
673
    ret = OB_INVALID_ARGUMENT;
674
    LOG_WARN("invalid argument", K(ret), K(columns), K(params));
675
  } else if (params->count() > 0) {
676
    ObMySQLField field;
677
    ret = result.next_param(field);
678
    while (OB_SUCC(ret)) {
679
      OMPKField fp(field);
680
      if (OB_FAIL(response_packet(fp, const_cast<ObSQLSessionInfo *>(&session)))) {
681
        LOG_DEBUG("response packet fail", K(ret));
682
      } else {
683
//        LOG_INFO("response field succ", K(field));
684
        ret = result.next_param(field);
685
      }
686
    }
687
    if (OB_ITER_END == ret) {
688
      ret = OB_SUCCESS;
689
    }
690
    if (OB_SUCC(ret)) {
691
      if (need_send_extra_ok_packet() && columns->count() == 0) {
692
        ObOKPParam ok_param;
693
        if (OB_FAIL(send_eof_packet(session, result, &ok_param))) {
694
          LOG_WARN("send eof field failed", K(ret));
695
        }
696
      } else {
697
        if (OB_FAIL(send_eof_packet(session, result))) {
698
          LOG_WARN("send eof field failed", K(ret));
699
        }
700
      }
701
    }
702
  }
703
  return ret;
704
}
705

706
} //end of namespace observer
707
} //end of namespace oceanbase
708

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.