oceanbase

Форк
0
/
obmp_stmt_execute.cpp 
3118 строк · 125.7 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "observer/mysql/obmp_stmt_execute.h"
16

17
#include "lib/oblog/ob_log.h"
18
#include "lib/stat/ob_session_stat.h"
19
#include "lib/profile/ob_perf_event.h"
20
#include "lib/timezone/ob_time_convert.h"
21
#include "lib/encode/ob_base64_encode.h"
22
#include "observer/mysql/obsm_utils.h"
23
#include "rpc/ob_request.h"
24
#include "rpc/obmysql/ob_mysql_packet.h"
25
#include "rpc/obmysql/ob_mysql_util.h"
26
#include "rpc/obmysql/packet/ompk_eof.h"
27
#include "rpc/obmysql/packet/ompk_resheader.h"
28
#include "rpc/obmysql/packet/ompk_field.h"
29
#include "rpc/obmysql/packet/ompk_row.h"
30
#include "rpc/obmysql/obsm_struct.h"
31
#include "observer/mysql/obsm_row.h"
32
#include "share/schema/ob_schema_getter_guard.h"
33
#include "share/ob_time_utility2.h"
34
#include "sql/ob_sql.h"
35
#include "sql/ob_sql_context.h"
36
#include "sql/session/ob_sql_session_info.h"
37
#include "sql/plan_cache/ob_prepare_stmt_struct.h"
38
#include "observer/omt/ob_tenant.h"
39
#include "observer/mysql/ob_sync_plan_driver.h"
40
#include "observer/mysql/ob_sync_cmd_driver.h"
41
#include "observer/mysql/ob_async_cmd_driver.h"
42
#include "observer/mysql/ob_async_plan_driver.h"
43
#include "observer/ob_req_time_service.h"
44
#include "pl/ob_pl_user_type.h"
45
#include "pl/ob_pl_package.h"
46
#include "pl/ob_pl_resolver.h"
47
#include "pl/ob_pl_exception_handling.h"
48
#include "sql/plan_cache/ob_prepare_stmt_struct.h"
49
#include "observer/mysql/obmp_stmt_prexecute.h"
50
#include "observer/mysql/obmp_stmt_send_piece_data.h"
51
#include "observer/mysql/obmp_utils.h"
52
#include "share/ob_lob_access_utils.h"
53
#include "sql/plan_cache/ob_ps_cache.h"
54

55
namespace oceanbase
56
{
57

58
using namespace common;
59
using namespace share;
60
using namespace obmysql;
61
using namespace rpc;
62
using namespace sql;
63
using namespace pl;
64
namespace observer
65
{
66
inline int ObPSAnalysisChecker::detection(const int64_t len)
67
{
68
  int ret = OB_SUCCESS;
69
  if (!need_check_) {
70
  } else if (*pos_ + len > end_pos_) {
71
    ret = OB_ERR_MALFORMED_PS_PACKET;
72
    LOG_USER_ERROR(OB_ERR_MALFORMED_PS_PACKET);
73
    LOG_ERROR("malformed ps data packet, please check the number and content of data packet parameters", K(ret), KP(pos_), KP(begin_pos_),
74
    K(end_pos_ - begin_pos_), K(len), K(data_len_), K(remain_len()));
75
  }
76
  return ret;
77
}
78

79
ObMPStmtExecute::ObMPStmtExecute(const ObGlobalContext &gctx)
80
    : ObMPBase(gctx),
81
      retry_ctrl_(/*ctx_.retry_info_*/),
82
      ctx_(),
83
      stmt_id_(),
84
      stmt_type_(stmt::T_NONE),
85
      params_(NULL),
86
      arraybinding_params_(NULL),
87
      arraybinding_columns_(NULL),
88
      arraybinding_row_(NULL),
89
      is_arraybinding_(false),
90
      is_save_exception_(false),
91
      arraybinding_size_(0),
92
      arraybinding_rowcnt_(0),
93
      ps_cursor_type_(ObNormalType),
94
      single_process_timestamp_(0),
95
      exec_start_timestamp_(0),
96
      exec_end_timestamp_(0),
97
      prepare_packet_sent_(false),
98
      params_num_(0),
99
      params_value_len_(0),
100
      params_value_(NULL),
101
      curr_sql_idx_(0)
102
{
103
  ctx_.exec_type_ = MpQuery;
104
}
105

106
int ObMPStmtExecute::init_arraybinding_field(int64_t column_field_cnt,
107
                                             const ColumnsFieldIArray *column_fields)
108
{
109
  int ret = OB_SUCCESS;
110

111
  ObField sql_no_field, err_no_field, err_msg_field;
112

113
  OX (sql_no_field.charsetnr_ = CS_TYPE_UTF8MB4_GENERAL_CI);
114
  OX (sql_no_field.type_.set_type(ObIntType));
115
  OZ (common::ObField::get_field_mb_length(sql_no_field.type_.get_type(),
116
                                           sql_no_field.accuracy_,
117
                                           common::CS_TYPE_INVALID,
118
                                           sql_no_field.length_));
119
  OX (sql_no_field.cname_ = ObString("sql_no"));
120

121
  OX (err_no_field.charsetnr_ = CS_TYPE_UTF8MB4_GENERAL_CI);
122
  OX (err_no_field.type_.set_type(ObIntType));
123
  OZ (common::ObField::get_field_mb_length(err_no_field.type_.get_type(),
124
                                           err_no_field.accuracy_,
125
                                           common::CS_TYPE_INVALID, err_no_field.length_));
126
  OX (err_no_field.cname_ = ObString("error_code"));
127

128
  OX (err_msg_field.charsetnr_ = CS_TYPE_UTF8MB4_GENERAL_CI);
129
  OX (err_msg_field.type_.set_type(ObVarcharType));
130
  OZ (common::ObField::get_field_mb_length(err_msg_field.type_.get_type(),
131
                                           err_msg_field.accuracy_,
132
                                           common::CS_TYPE_INVALID,
133
                                           err_msg_field.length_));
134
  OX (err_msg_field.cname_ = ObString("error_message"));
135

136
  OZ (arraybinding_columns_->push_back(sql_no_field));
137
  OZ (arraybinding_columns_->push_back(err_no_field));
138
  if (is_prexecute() && column_field_cnt > 3 && OB_NOT_NULL(column_fields)) {
139
    // only for pre_execute
140
    for (int64_t i = 0; OB_SUCC(ret) && i < column_fields->count(); i++) {
141
      if (OB_FAIL(arraybinding_columns_->push_back(column_fields->at(i)))) {
142
        LOG_WARN("fail to push arraybinding_columns_", "field", column_fields->at(i), K(i));
143
      }
144
    }
145
  }
146
  OZ (arraybinding_columns_->push_back(err_msg_field));
147

148
  return ret;
149
}
150

151
int ObMPStmtExecute::init_row_for_arraybinding(ObIAllocator &alloc, int64_t array_binding_row_num)
152
{
153
  int ret = OB_SUCCESS;
154
  ObObj* obj = static_cast<ObObj*>(alloc.alloc(sizeof(ObObj) * array_binding_row_num));
155
  if (OB_ISNULL(obj)) {
156
    ret = OB_ALLOCATE_MEMORY_FAILED;
157
    LOG_WARN("failed to alloc memory for row", K(ret));
158
  } else {
159
    ObObj *ptr = obj;
160
    for (int64_t i = 0; i < array_binding_row_num; ++i) {
161
      ptr = new(ptr)ObObj();
162
      ptr++;
163
    }
164
    arraybinding_row_->assign(obj, array_binding_row_num);
165
  }
166
  return ret;
167
}
168

169
int ObMPStmtExecute::init_arraybinding_paramstore(ObIAllocator &alloc)
170
{
171
  int ret = OB_SUCCESS;
172
  if (OB_ISNULL(arraybinding_params_
173
      = static_cast<ParamStore*>(alloc.alloc(sizeof(ParamStore))))) {
174
    ret = OB_ALLOCATE_MEMORY_FAILED;
175
    LOG_WARN("failed to allocate memory", K(ret));
176
  }
177
  OX (arraybinding_params_ = new(arraybinding_params_)ParamStore((ObWrapperAllocator(alloc))));
178
  return ret;
179
}
180

181

182
// only used for pre_execute
183
int ObMPStmtExecute::init_arraybinding_fields_and_row(ObMySQLResultSet &result)
184
{
185
  int ret = OB_SUCCESS;
186
  int64_t returning_field_num = 0;
187

188
  if (!is_prexecute()) {
189
    ret = OB_ERR_UNEXPECTED;
190
    LOG_WARN("not support execute protocol", K(ret));
191
  } else if (OB_ISNULL(result.get_field_columns())) {
192
    ret = OB_ERR_UNEXPECTED;
193
    LOG_WARN("not support execute protocol", K(ret));
194
  } else {
195
    ObIAllocator *alloc = static_cast<ObMPStmtPrexecute*>(this)->get_alloc();
196
    if (OB_ISNULL(alloc)) {
197
      ret = OB_ERR_UNEXPECTED;
198
      LOG_WARN("allocator is null", K(ret));
199
    } else if (OB_ISNULL(result.get_field_columns())) {
200
      ret = OB_ERR_UNEXPECTED;
201
      LOG_WARN("returning param field is null", K(ret));
202
    } else {
203
      returning_field_num = result.get_field_columns()->count();
204
    }
205

206
    if (OB_FAIL(ret)) {
207
      // do nothing
208
    } else if (OB_ISNULL(arraybinding_columns_
209
        = static_cast<ColumnsFieldArray*>(alloc->alloc(sizeof(ColumnsFieldArray))))) {
210
      ret = OB_ALLOCATE_MEMORY_FAILED;
211
      LOG_WARN("failed to allocate memory", K(ret));
212
    } else if (OB_ISNULL(arraybinding_row_
213
        = static_cast<ObNewRow*>(alloc->alloc(sizeof(ObNewRow))))) {
214
      ret = OB_ALLOCATE_MEMORY_FAILED;
215
      LOG_WARN("failed to allocate memory", K(ret));
216
    } else {
217
      arraybinding_columns_
218
        = new(arraybinding_columns_)ColumnsFieldArray(*alloc, 3 + returning_field_num);
219
      arraybinding_row_ = new(arraybinding_row_)ObNewRow();
220
    }
221
    OZ (init_arraybinding_field(returning_field_num + 3, result.get_field_columns()));
222
    OZ (init_row_for_arraybinding(*alloc, returning_field_num + 3));
223
  }
224

225
  return ret;
226
}
227

228
int ObMPStmtExecute::init_for_arraybinding(ObIAllocator &alloc)
229
{
230
  int ret = OB_SUCCESS;
231
  if (OB_ISNULL(arraybinding_params_
232
      = static_cast<ParamStore*>(alloc.alloc(sizeof(ParamStore))))) {
233
    ret = OB_ALLOCATE_MEMORY_FAILED;
234
    LOG_WARN("failed to allocate memory", K(ret));
235
  } else if (is_save_exception_) {
236
    if (OB_ISNULL(arraybinding_columns_
237
        = static_cast<ColumnsFieldArray*>(alloc.alloc(sizeof(ColumnsFieldArray))))) {
238
      ret = OB_ALLOCATE_MEMORY_FAILED;
239
      LOG_WARN("failed to allocate memory", K(ret));
240
    } else if (OB_ISNULL(arraybinding_row_
241
        = static_cast<ObNewRow*>(alloc.alloc(sizeof(ObNewRow))))) {
242
      ret = OB_ALLOCATE_MEMORY_FAILED;
243
      LOG_WARN("failed to allocate memory", K(ret));
244
    } else {
245
      arraybinding_columns_
246
        = new(arraybinding_columns_)ColumnsFieldArray(alloc, 3);
247
      arraybinding_row_ = new(arraybinding_row_)ObNewRow();
248
    }
249
    OZ (init_arraybinding_field(3, NULL));
250
    OZ (init_row_for_arraybinding(alloc, 3));
251
  }
252
  OX (arraybinding_params_ = new(arraybinding_params_)ParamStore((ObWrapperAllocator(alloc))));
253
  return ret;
254
}
255

256
int ObMPStmtExecute::check_param_type_for_arraybinding(
257
    ObSQLSessionInfo *session_info,
258
    ParamTypeInfoArray &param_type_infos)
259
{
260
  int ret = OB_SUCCESS;
261
  if (!ObStmt::is_dml_write_stmt(stmt_type_)
262
      && stmt::T_ANONYMOUS_BLOCK != stmt_type_
263
      && stmt::T_CALL_PROCEDURE != stmt_type_) {
264
    ret = OB_NOT_SUPPORTED;
265
    LOG_WARN("arraybinding only support write dml", K(ret), K(stmt_type_));
266
    LOG_USER_ERROR(OB_NOT_SUPPORTED, "arraybinding got no write dml");
267
  } else if (session_info->get_local_autocommit()) {
268
    ret = OB_NOT_SUPPORTED;
269
    LOG_WARN("arraybinding must in autocommit off", K(ret));
270
    LOG_USER_ERROR(OB_NOT_SUPPORTED, "arraybinding has autocommit = on");
271
  } else if (OB_UNLIKELY(param_type_infos.count() <= 0)) {
272
    ret = OB_NOT_SUPPORTED;
273
    LOG_WARN("arraybinding must has parameters", K(ret));
274
    LOG_USER_ERROR(OB_NOT_SUPPORTED, "arraybinding has no parameter");
275
  } else {
276
    for (int64_t i = 0; OB_SUCC(ret) && i < param_type_infos.count(); ++i) {
277
      TypeInfo &type_info = param_type_infos.at(i);
278
      if (type_info.is_basic_type_ || !type_info.is_elem_type_) {
279
        ret = OB_NOT_SUPPORTED;
280
        LOG_WARN("arraybinding parameter must be anonymous array", K(ret));
281
        LOG_USER_ERROR(OB_NOT_SUPPORTED,
282
                       "arraybinding parameter is not anonymous array");
283
      }
284
    }
285
  }
286
  return ret;
287
}
288

289
int ObMPStmtExecute::check_param_value_for_arraybinding(ObObjParam &param)
290
{
291
  int ret = OB_SUCCESS;
292
  ObPLCollection *coll = NULL;
293
  CK (param.is_ext());
294
  CK (OB_NOT_NULL(coll = reinterpret_cast<ObPLCollection*>(param.get_ext())));
295
  if (OB_FAIL(ret)) {
296
  } else if (0 == arraybinding_size_) {
297
    arraybinding_size_ = coll->get_count();
298
  } else {
299
    CK (arraybinding_size_ == coll->get_count());
300
  }
301
  return ret;
302
}
303

304
int ObMPStmtExecute::construct_execute_param_for_arraybinding(int64_t pos)
305
{
306
  int ret = OB_SUCCESS;
307
  CK (OB_NOT_NULL(arraybinding_params_));
308
  CK (OB_NOT_NULL(params_));
309
  CK (arraybinding_params_->count() == params_->count());
310
  for (int64_t i = 0; OB_SUCC(ret) && i < arraybinding_params_->count(); ++i) {
311
    ObObjParam &obj = arraybinding_params_->at(i);
312
    ObPLCollection *coll = NULL;
313
    ObObj *data = NULL;
314
    CK (obj.is_ext());
315
    CK (OB_NOT_NULL(coll = reinterpret_cast<ObPLCollection*>(obj.get_ext())));
316
    CK (coll->get_count() > pos);
317
    CK (1 == coll->get_column_count());
318
    CK (OB_NOT_NULL(data = reinterpret_cast<ObObj*>(coll->get_data())));
319
    OX (params_->at(i) = *(data + pos));
320
    if (data[pos].is_numeric_type()) {
321
      ObAccuracy default_acc =
322
        ObAccuracy::DDL_DEFAULT_ACCURACY2[lib::is_oracle_mode()][data[pos].get_type()];
323
      if (params_->at(i).get_scale() == NUMBER_SCALE_UNKNOWN_YET) {
324
        params_->at(i).set_scale(default_acc.get_scale());
325
      }
326
      if (params_->at(i).get_precision() == PRECISION_UNKNOWN_YET) {
327
        params_->at(i).set_precision(default_acc.get_precision());
328
      }
329
    }
330
  }
331
  return ret;
332
}
333

334
void ObMPStmtExecute::reset_complex_param_memory(ParamStore *params, ObSQLSessionInfo &session_info)
335
{
336
  if (OB_NOT_NULL(params)) {
337
    for (int64_t i = 0; i < params->count(); ++i) {
338
      ObObjParam &obj = params->at(i);
339
      if (obj.is_pl_extend()) {
340
        int ret = ObUserDefinedType::destruct_obj(obj, &session_info);
341
        if (OB_SUCCESS != ret) {
342
          LOG_WARN("fail to destruct obj", K(ret), K(i));
343
        }
344
      }
345
    }
346
  }
347
}
348

349
int ObMPStmtExecute::send_eof_packet_for_arraybinding(ObSQLSessionInfo &session_info)
350
{
351
  int ret = OB_SUCCESS;
352

353
  OMPKEOF eofp;
354
  const ObWarningBuffer *warnings_buf = common::ob_get_tsi_warning_buffer();
355
  uint16_t warning_count = 0;
356
  if (OB_ISNULL(warnings_buf)) {
357
    LOG_WARN("can not get thread warnings buffer");
358
  } else {
359
    warning_count = static_cast<uint16_t>(warnings_buf->get_readable_warning_count());
360
  }
361
  eofp.set_warning_count(warning_count);
362
  ObServerStatusFlags flags = eofp.get_server_status();
363
  flags.status_flags_.OB_SERVER_STATUS_IN_TRANS
364
    = (session_info.is_server_status_in_transaction() ? 1 : 0);
365
  flags.status_flags_.OB_SERVER_STATUS_AUTOCOMMIT = (session_info.get_local_autocommit() ? 1 : 0);
366
  // MORE_RESULTS need false in prexecute protocol.
367
  // only is_save_exception_ will use this func in prexecute protocol.
368
  flags.status_flags_.OB_SERVER_MORE_RESULTS_EXISTS = is_prexecute() && is_save_exception_
369
                                                        ? false : true;
370
  if (!session_info.is_obproxy_mode()) {
371
    flags.status_flags_.OB_SERVER_QUERY_WAS_SLOW = !session_info.partition_hit().get_bool();
372
  }
373
  eofp.set_server_status(flags);
374
  OZ (response_packet(eofp, &session_info));
375

376
  return ret;
377
}
378

379
int ObMPStmtExecute::response_result_for_arraybinding(
380
    ObSQLSessionInfo &session_info,
381
    ObIArray<ObSavedException> &exception_array)
382
{
383
  int ret = OB_SUCCESS;
384
  if (exception_array.count() > 0) {
385
    if (is_prexecute()) {
386
      // do nothing
387
    } else {
388
      OMPKResheader rhp;
389
      rhp.set_field_count(3);
390
      OZ (response_packet(rhp, &session_info));
391

392
      for (int64_t i = 0; OB_SUCC(ret) && i < arraybinding_columns_->count(); ++i) {
393
        ObMySQLField field;
394
        OZ (ObMySQLResultSet::to_mysql_field(arraybinding_columns_->at(i), field));
395
        ObMySQLResultSet::replace_lob_type(session_info, arraybinding_columns_->at(i), field);
396
        OMPKField fp(field);
397
        OZ (response_packet(fp, &session_info));
398
      }
399

400
      OZ (send_eof_packet_for_arraybinding(session_info));
401

402
      for (int64_t i = 0; OB_SUCC(ret) && i < exception_array.count(); ++i) {
403
        arraybinding_row_->get_cell(0).set_int(exception_array.at(i).pos_);
404
        arraybinding_row_->get_cell(1).set_int(exception_array.at(i).error_code_);
405
        arraybinding_row_->get_cell(2).set_varchar(exception_array.at(i).error_msg_);
406

407
        const ObDataTypeCastParams dtc_params
408
          = ObBasicSessionInfo::create_dtc_params(&session_info);
409
        ObSMRow sm_row(BINARY,
410
                *arraybinding_row_,
411
                dtc_params,
412
                arraybinding_columns_,
413
                ctx_.schema_guard_,
414
                session_info.get_effective_tenant_id());
415
        OMPKRow rp(sm_row);
416
        OZ (response_packet(rp, &session_info));
417
      }
418
      OZ (send_eof_packet_for_arraybinding(session_info));
419
    }
420
  }
421

422
  if (OB_SUCC(ret)) {
423
    bool ps_out = ((stmt::T_ANONYMOUS_BLOCK == stmt_type_ || stmt::T_CALL_PROCEDURE == stmt_type_)
424
                    && arraybinding_columns_->count() > 3) ? true : false;
425
    ObOKPParam ok_param;
426
    ok_param.affected_rows_ = arraybinding_rowcnt_;
427
    ok_param.is_partition_hit_ = session_info.partition_hit().get_bool();
428
    ok_param.has_pl_out_ = ps_out;
429
    OZ (send_ok_packet(session_info, ok_param));
430
  }
431
  return ret;
432
}
433

434
int ObMPStmtExecute::save_exception_for_arraybinding(
435
  int64_t pos, int error_code, ObIArray<ObSavedException> &exception_array)
436
{
437
  int ret = OB_SUCCESS;
438
  ObSavedException exception;
439

440
  const char *errm_result = NULL;
441
  int64_t errm_length = 0;
442

443
  exception.pos_ = pos;
444
  exception.error_code_ = static_cast<uint16_t>(ob_errpkt_errno(error_code, lib::is_oracle_mode()));
445

446
  ObIAllocator &alloc = CURRENT_CONTEXT->get_arena_allocator();
447

448
  const ObWarningBuffer *wb = common::ob_get_tsi_warning_buffer();
449
  if (OB_LIKELY(NULL != wb) && wb->get_err_code() == error_code) {
450
    errm_result = wb->get_err_msg();
451
    errm_length = strlen(errm_result);
452
  } else {
453
    errm_result = ob_errpkt_strerror(error_code, true);
454
    if (NULL == errm_result) {
455
      errm_result = "ORA%ld: Message error_code not found; product=RDBMS; facility=ORA";
456
    }
457
    errm_length = strlen(errm_result);
458
  }
459

460
  OZ (ob_write_string(alloc, ObString(errm_length, errm_result), exception.error_msg_));
461
  OZ (exception_array.push_back(exception));
462
  return ret;
463
}
464

465
int ObMPStmtExecute::after_do_process_for_arraybinding(ObMySQLResultSet &result)
466
{
467
  int ret = OB_SUCCESS;
468
  if (OB_ISNULL(result.get_physical_plan())) {
469
    ret = OB_NOT_INIT;
470
    LOG_WARN("should have set plan to result set", K(ret));
471
  } else if (OB_FAIL(result.open())) {
472
    int cret = OB_SUCCESS;
473
    int cli_ret = OB_SUCCESS;
474
    retry_ctrl_.test_and_save_retry_state(gctx_,
475
                                          ctx_,
476
                                          result,
477
                                          ret,
478
                                          cli_ret,
479
                                          true/*arraybinding only local retry*/);
480
    if (OB_TRANSACTION_SET_VIOLATION != ret && OB_REPLICA_NOT_READABLE != ret) {
481
      if (OB_TRY_LOCK_ROW_CONFLICT == ret && retry_ctrl_.need_retry()) {
482
        //锁冲突重试不打印日志,避免刷屏
483
      } else {
484
        LOG_WARN("result set open failed, check if need retry",
485
                 K(ret), K(cli_ret), K(retry_ctrl_.need_retry()));
486
      }
487
    }
488
    ret = cli_ret;
489
    cret = result.close();
490
    if (cret != OB_SUCCESS &&
491
        cret != OB_TRANSACTION_SET_VIOLATION &&
492
        OB_TRY_LOCK_ROW_CONFLICT != cret) {
493
      LOG_WARN("close result set fail", K(cret));
494
    }
495
  } else if (result.is_with_rows()) {
496
    ret = OB_NOT_SUPPORTED;
497
    LOG_WARN("in arraybinding, dml with rows is not supported", K(ret));
498
    LOG_USER_ERROR(OB_NOT_SUPPORTED, "in arraybinding, dml with rows");
499
  } else {
500
    OZ (result.close());
501
    OX (arraybinding_rowcnt_ += result.get_affected_rows());
502
  }
503
  return ret;
504
}
505

506
int ObMPStmtExecute::before_process()
507
{
508
  int ret = OB_SUCCESS;
509
  if (OB_FAIL(ObMPBase::before_process())) {
510
    LOG_WARN("fail to call before process", K(ret));
511
  } else if ((OB_ISNULL(req_))) {
512
    ret = OB_INVALID_ARGUMENT;
513
    LOG_ERROR("request should not be null", K(ret));
514
  } else if (req_->get_type() != ObRequest::OB_MYSQL) {
515
    ret = OB_INVALID_ARGUMENT;
516
    LOG_ERROR("invalid request", K(ret), K_(*req));
517
  } else {
518
    ObIAllocator &alloc = CURRENT_CONTEXT->get_arena_allocator();
519
    if (OB_ISNULL(params_ = static_cast<ParamStore *>(alloc.alloc(sizeof(ParamStore))))) {
520
      ret = OB_ALLOCATE_MEMORY_FAILED;
521
      LOG_WARN("failed to allocate memory", K(ret));
522
    } else {
523
      params_ = new(params_)ParamStore( (ObWrapperAllocator(alloc)) );
524
    }
525
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
526
    const char* pos = pkt.get_cdata();
527
    analysis_checker_.init(pos, pkt.get_clen());
528
    int32_t stmt_id = -1; //INVALID_STMT_ID
529
    uint32_t ps_stmt_checksum = 0;
530
    ObSQLSessionInfo *session = NULL;
531
    PS_DEFENSE_CHECK(9) // stmt_id(4) + flag(1) + checksum(4)
532
    {
533
      ObMySQLUtil::get_int4(pos, stmt_id);
534
      stmt_id_ = stmt_id;
535

536
      // pos += 1; //skip flags
537
      int8_t flag = 0;
538
      ObMySQLUtil::get_int1(pos, flag);
539
      const uint8_t ARRAYBINDING_MODE = 8;
540
      const uint8_t SAVE_EXCEPTION_MODE = 16;
541
      is_arraybinding_ = flag & ARRAYBINDING_MODE;
542
      is_save_exception_ = flag & SAVE_EXCEPTION_MODE;
543
      ps_cursor_type_ = 0 != (flag & CURSOR_TYPE_READ_ONLY)
544
                          ? ObExecutePsCursorType
545
                          : ObNormalType;
546

547
      // 4 bytes, iteration-count, used for checksum
548
      ObMySQLUtil::get_uint4(pos, ps_stmt_checksum);
549

550
      if (is_arraybinding_) {
551
        OZ (init_for_arraybinding(alloc));
552
      }
553
    }
554
    if (OB_FAIL(ret)) {
555
    } else if (OB_FAIL(get_session(session))) {
556
      LOG_WARN("get session failed");
557
    } else if (OB_ISNULL(session)) {
558
      ret = OB_ERR_UNEXPECTED;
559
      LOG_WARN("session is NULL or invalid", K(ret), K(session));
560
    } else {
561
      const bool enable_sql_audit =
562
      GCONF.enable_sql_audit && session->get_local_ob_enable_sql_audit();
563
      OZ (request_params(session, pos, ps_stmt_checksum, alloc, -1));
564
      if (!is_pl_stmt(stmt_type_) && enable_sql_audit) {
565
        OZ (store_params_value_to_str(alloc, *session));
566
      }
567
    }
568
    if (session != NULL) {
569
      revert_session(session);
570
    }
571
  }
572
  if (OB_FAIL(ret)) {
573
    send_error_packet(ret, NULL, (void *)(ctx_.get_reroute_info()));
574
    if (OB_ERR_PREPARE_STMT_CHECKSUM == ret) {
575
      force_disconnect();
576
      LOG_WARN("prepare stmt checksum error, disconnect connection", K(ret));
577
    }
578
    flush_buffer(true);
579
  }
580

581
  return ret;
582
}
583

584
int ObMPStmtExecute::store_params_value_to_str(ObIAllocator &alloc, sql::ObSQLSessionInfo &session)
585
{
586
  return store_params_value_to_str(alloc, session, params_, params_value_, params_value_len_);
587
}
588

589
int ObMPStmtExecute::store_params_value_to_str(ObIAllocator &alloc,
590
                                               sql::ObSQLSessionInfo &session,
591
                                               ParamStore *params,
592
                                               char *&params_value,
593
                                               int64_t &params_value_len)
594
{
595
  int ret = OB_SUCCESS;
596
  int64_t pos = 0;
597
  int64_t length = OB_MAX_SQL_LENGTH;
598
  CK (OB_NOT_NULL(params));
599
  CK (OB_ISNULL(params_value));
600
  CK (OB_NOT_NULL(params_value = static_cast<char *>(alloc.alloc(OB_MAX_SQL_LENGTH))));
601
  for (int i = 0; OB_SUCC(ret) && i < params->count(); ++i) {
602
    const common::ObObjParam &param = params->at(i);
603
    if (param.is_ext()) {
604
      pos = 0;
605
      params_value = NULL;
606
      params_value_len = 0;
607
      break;
608
    } else {
609
      OZ (param.print_sql_literal(params_value, length, pos, alloc, TZ_INFO(&session)));
610
      if (i != params->count() - 1) {
611
        OZ (databuff_printf(params_value, length, pos, alloc, ","));
612
      }
613
    }
614
  }
615
  if (OB_FAIL(ret)) {
616
    params_value = NULL;
617
    params_value_len = 0;
618
    // The failure of store_params_value_to_str does not affect the execution of SQL,
619
    // so the error code is ignored here
620
    ret = OB_SUCCESS;
621
  } else {
622
    params_value_len = pos;
623
  }
624
  return ret;
625
}
626

627
int ObMPStmtExecute::parse_request_type(const char* &pos,
628
                                       int64_t num_of_params,
629
                                       int8_t new_param_bound_flag,
630
                                       ObCollationType cs_type,
631
                                       ObCollationType ncs_type,
632
                                       ParamTypeArray &param_types,
633
                                       ParamTypeInfoArray &param_type_infos
634
                                       /*ParamCastArray param_cast_infos*/)
635
{
636
  int ret = OB_SUCCESS;
637
  // Step3: get type info
638
  if (param_type_infos.count() < num_of_params) {
639
    ret = OB_ERR_UNEXPECTED;
640
    LOG_WARN("type array length is not normal", K(ret), K(param_types.count()), K(param_type_infos.count()));
641
  }
642
  for (int i = 0; OB_SUCC(ret) && i < num_of_params; ++i) {
643
    uint8_t type = 0;
644
    int8_t flag = 0;
645
    TypeInfo &type_name_info = param_type_infos.at(i);
646
    if (1 == new_param_bound_flag) {
647
      PS_DEFENSE_CHECK(2) // type(1) + flag(1)
648
      {
649
        ObMySQLUtil::get_uint1(pos, type);
650
        ObMySQLUtil::get_int1(pos, flag);
651
        if (OB_FAIL(param_types.push_back(static_cast<EMySQLFieldType>(type)))) {
652
          LOG_WARN("fail to push back", K(type), K(i));
653
        } else if (EMySQLFieldType::MYSQL_TYPE_COMPLEX != type) {
654
          int16_t unsigned_flag = 128;
655
          ObObjType ob_elem_type;
656
          if (OB_FAIL(ObSMUtils::get_ob_type(ob_elem_type,
657
                                    static_cast<EMySQLFieldType>(type),
658
                                    flag & unsigned_flag ? true : false))) {
659
            LOG_WARN("get ob type fail. ", K(type));
660
          } else {
661
            type_name_info.elem_type_.set_obj_type(ob_elem_type);
662
          }
663
        }
664
      }
665
    } else {
666
      if (num_of_params != param_types.count()) {
667
        ret = OB_ERR_WRONG_DYNAMIC_PARAM;
668
        LOG_USER_ERROR(OB_ERR_WRONG_DYNAMIC_PARAM,
669
            param_types.count(), num_of_params);
670
      } else {
671
        type = static_cast<uint8_t>(param_types.at(i));
672
      }
673
    }
674

675
    if (OB_SUCC(ret)) {
676

677
      uint8_t elem_type = 0;
678
      if (EMySQLFieldType::MYSQL_TYPE_COMPLEX == type) {
679
        type_name_info.is_basic_type_ = false;
680
        if (OB_FAIL(decode_type_info(pos, type_name_info))) {
681
          LOG_WARN("failed to decode type info", K(ret));
682
        } else if (type_name_info.type_name_.empty()) {
683
          ObObjType ob_elem_type;
684
          type_name_info.is_elem_type_ = true;
685
          PS_DEFENSE_CHECK(1) // elem_type(1)
686
          {
687
            ObMySQLUtil::get_uint1(pos, elem_type);
688
          }
689
          OZ (ObSMUtils::get_ob_type(
690
            ob_elem_type, static_cast<EMySQLFieldType>(elem_type)), elem_type);
691
          OX (type_name_info.elem_type_.set_obj_type(ob_elem_type));
692
          if (OB_SUCC(ret)) {
693
            switch (elem_type) {
694
              case MYSQL_TYPE_OB_NVARCHAR2:
695
              case MYSQL_TYPE_OB_NCHAR: {
696
                type_name_info.elem_type_.set_collation_type(ncs_type);
697
              } break;
698
              case MYSQL_TYPE_ORA_BLOB: {
699
                type_name_info.elem_type_.set_collation_type(CS_TYPE_BINARY);
700
              } break;
701
              default: {
702
                type_name_info.elem_type_.set_collation_type(cs_type);
703
              } break;
704
            }
705
          }
706
          if (OB_SUCC(ret) && EMySQLFieldType::MYSQL_TYPE_COMPLEX == elem_type) {
707
            OZ (decode_type_info(pos, type_name_info));
708
          }
709
        }
710
      }
711
    }
712
  }
713
  return ret;
714
}
715

716
int ObMPStmtExecute::parse_request_param_value(ObIAllocator &alloc,
717
                                             sql::ObSQLSessionInfo *session,
718
                                             const char* &pos,
719
                                             int64_t idx,
720
                                             EMySQLFieldType &param_type,
721
                                             TypeInfo &param_type_info,
722
                                             ObObjParam &param,
723
                                             const char *bitmap)
724
{
725
  int ret = OB_SUCCESS;
726
  ObCharsetType charset = CHARSET_INVALID;
727
  ObCharsetType ncharset = CHARSET_INVALID;
728
  ObCollationType cs_conn = CS_TYPE_INVALID;
729
  ObCollationType cs_server = CS_TYPE_INVALID;
730
  if (OB_ISNULL(session)) {
731
    ret = OB_ERR_UNEXPECTED;
732
    LOG_WARN("session is null", K(ret));
733
  } else if (OB_FAIL(session->get_character_set_connection(charset))) {
734
    LOG_WARN("get charset for client failed", K(ret));
735
  } else if (OB_FAIL(session->get_collation_connection(cs_conn))) {
736
    LOG_WARN("get charset for client failed", K(ret));
737
  } else if (OB_FAIL(session->get_collation_server(cs_server))) {
738
    LOG_WARN("get charset for client failed", K(ret));
739
  } else if (OB_FAIL(session->get_ncharacter_set_connection(ncharset))) {
740
    LOG_WARN("get charset for client failed", K(ret));
741
  }
742
  // Step5: decode value
743
  ObObjType ob_type;
744
  if (OB_FAIL(ret)) {
745
  } else if (OB_FAIL(ObSMUtils::get_ob_type(
746
        ob_type, static_cast<EMySQLFieldType>(param_type)))) {
747
    LOG_WARN("cast ob type from mysql type failed",
748
              K(ob_type), K(param_type), K(ret));
749
  } else {
750
    param.set_type(ob_type);
751
    param.set_param_meta();
752
    if (OB_FAIL(parse_param_value(alloc,
753
                                         param_type,
754
                                         charset,
755
                                         ncharset,
756
                                         is_oracle_mode() ? cs_server : cs_conn,
757
                                         session->get_nls_collation_nation(),
758
                                         pos,
759
                                         session->get_timezone_info(),
760
                                         &param_type_info,
761
                                         param,
762
                                         bitmap,
763
                                         idx))) {
764
      LOG_WARN("get param value failed", K(param));
765
    } else {
766
      LOG_DEBUG("resolve execute with param", K(param));
767
    }
768
  }
769
  return ret;
770
}
771

772
bool ObMPStmtExecute::is_contain_complex_element(const sql::ParamTypeArray &param_types) const
773
{
774
  bool b_ret = false;
775
  for (int64_t i = 0; i < param_types.count(); i++) {
776
    const obmysql::EMySQLFieldType field_type = param_types.at(i);
777
    if (MYSQL_TYPE_COMPLEX == field_type) {
778
      b_ret = true;
779
      break;
780
    }
781
  }
782
  return b_ret;
783
}
784

785
int ObMPStmtExecute::request_params(ObSQLSessionInfo *session,
786
                                    const char* &pos,
787
                                    uint32_t ps_stmt_checksum,
788
                                    ObIAllocator &alloc,
789
                                    int32_t all_param_num)
790
{
791
  int ret = OB_SUCCESS;
792
  ObPsSessionInfo *ps_session_info = NULL;
793
  ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());
794
  ObCharsetType charset = CHARSET_INVALID;
795
  ObCollationType cs_conn = CS_TYPE_INVALID;
796
  ObCollationType cs_server = CS_TYPE_INVALID;
797
  share::schema::ObSchemaGetterGuard schema_guard;
798
  const uint64_t tenant_id = session->get_effective_tenant_id();
799

800
  if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
801
    LOG_WARN("get schema guard failed", K(ret));
802
  } else if (OB_FAIL(session->get_character_set_connection(charset))) {
803
    LOG_WARN("get charset for client failed", K(ret));
804
  } else if (OB_FAIL(session->get_collation_connection(cs_conn))) {
805
    LOG_WARN("get charset for client failed", K(ret));
806
  } else if (OB_FAIL(session->get_collation_server(cs_server))) {
807
    LOG_WARN("get charset for client failed", K(ret));
808
  } else if (OB_FAIL(session->get_ps_session_info(stmt_id_, ps_session_info))) {
809
      LOG_WARN("get_ps_session_info failed", K(ret), K_(stmt_id));
810
  } else if (OB_ISNULL(ps_session_info)) {
811
    ret = OB_INVALID_ARGUMENT;
812
    LOG_WARN("ps_session_info is null", K(ret));
813
  } else if (DEFAULT_ITERATION_COUNT == ps_stmt_checksum) {
814
    // do nothing
815
    // 新协议不在这里做
816
  } else if (ps_stmt_checksum != ps_session_info->get_ps_stmt_checksum()) {
817
    ret = OB_ERR_PREPARE_STMT_CHECKSUM;
818
    LOG_ERROR("ps stmt checksum fail", K(ret), "session_id", session->get_sessid(),
819
                                        K(ps_stmt_checksum), K(*ps_session_info));
820
  }
821
  if (OB_SUCC(ret)) {
822
    LOG_TRACE("ps session info",
823
              K(ret), "session_id", session->get_sessid(), K(*ps_session_info));
824
    share::schema::ObSchemaGetterGuard *old_guard = ctx_.schema_guard_;
825
    ObSQLSessionInfo *old_sess_info = ctx_.session_info_;
826
    ctx_.schema_guard_ = &schema_guard;
827
    ctx_.session_info_ = session;
828
    const int64_t input_param_num = ps_session_info->get_param_count();
829
    stmt_type_ = ps_session_info->get_stmt_type();
830
    int8_t new_param_bound_flag = 0;
831
    if (is_pl_stmt(stmt_type_)) {
832
      // pl not support save exception
833
      is_save_exception_ = 0;
834
    }
835
    // for returning into,
836
    // all_param_num  = input_param_num + returning_param_num
837
    params_num_ = (all_param_num > input_param_num) ? all_param_num : input_param_num;
838
    int64_t returning_params_num = all_param_num - input_param_num;
839
    if (is_prexecute() && 0 != params_num_) {
840
      if (ps_session_info->get_num_of_returning_into() > 0) {
841
        // check param_cnt for returning into
842
        if (returning_params_num != ps_session_info->get_num_of_returning_into()
843
            || input_param_num != ps_session_info->get_param_count()) {
844
          ret = OB_ERR_UNEXPECTED;
845
          LOG_WARN("param num is not match ps stmt prama count.", K(is_prexecute()), K(params_num_),
846
                    K(ps_session_info->get_param_count()));
847
        }
848
      } else if (params_num_ != ps_session_info->get_param_count()) {
849
        // check param_cnt
850
        ret = OB_ERR_UNEXPECTED;
851
        LOG_WARN("param num is not match ps stmt prama count.", K(is_prexecute()), K(params_num_),
852
                 K(ps_session_info->get_param_count()));
853
      }
854
    }
855
    if (OB_SUCC(ret) && params_num_ > 0) {
856
      // Step1: 处理空值位图
857
      int64_t bitmap_types = (params_num_ + 7) / 8;
858
      const char *bitmap = pos;
859
      pos += bitmap_types;
860
      ParamTypeArray &param_types = ps_session_info->get_param_types();
861
      ParamTypeInfoArray param_type_infos;
862
      ParamCastArray param_cast_infos;
863

864
      ParamTypeArray returning_param_types;
865
      ParamTypeInfoArray returning_param_type_infos;
866
      int64_t len = bitmap_types + 1/*new_param_bound_flag*/;
867
      PS_DEFENSE_CHECK(len) // bitmap_types
868
      {
869
        // Step2: 获取new_param_bound_flag字段
870
        ObMySQLUtil::get_int1(pos, new_param_bound_flag);
871
        if (new_param_bound_flag == 1) {
872
          param_types.reuse();
873
        }
874
      }
875
      if (OB_FAIL(ret)) {
876
        // do nothing
877
      } else if (OB_FAIL(param_type_infos.prepare_allocate(input_param_num))) {
878
        LOG_WARN("array prepare allocate failed", K(ret), K(input_param_num));
879
      } else if (OB_FAIL(params_->prepare_allocate(input_param_num))) {
880
        LOG_WARN("array prepare allocate failed", K(ret));
881
      } else if (OB_FAIL(param_cast_infos.prepare_allocate(input_param_num))) {
882
        LOG_WARN("array prepare allocate failed", K(ret));
883
      } else if (is_arraybinding_) {
884
        CK (OB_NOT_NULL(arraybinding_params_));
885
        OZ (arraybinding_params_->prepare_allocate(input_param_num));
886
      }
887

888
      for (int i = 0; OB_SUCC(ret) && i < input_param_num; ++i) {
889
        param_cast_infos.at(i) = false;
890
      }
891

892
      if (OB_FAIL(ret)) {
893

894
      } else if (params_num_ <= input_param_num) {
895
        // not need init returning_param_types and returning_param_type_infos
896
      } else if (OB_FAIL(returning_param_type_infos.prepare_allocate(params_num_ - input_param_num))) {
897
        LOG_WARN("array prepare allocate failed", K(ret));
898
      }
899

900
      // Step3: 获取type信息
901
      if (OB_SUCC(ret) && OB_FAIL(parse_request_type(pos,
902
                                                     input_param_num,
903
                                                     new_param_bound_flag,
904
                                                     cs_conn,
905
                                                     cs_server,
906
                                                     param_types,
907
                                                     param_type_infos))) {
908
        LOG_WARN("fail to parse input params type", K(ret));
909
      } else if (is_contain_complex_element(param_types)) {
910
        analysis_checker_.need_check_ = false;
911
      }
912

913
      // Step3-2: 获取returning into params type信息
914
      if (OB_SUCC(ret) && returning_params_num > 0) {
915
        if (new_param_bound_flag != 1) {
916
          ret = OB_ERR_UNEXPECTED;
917
          LOG_WARN("returning into parm must define type", K(ret));
918
        } else if (OB_FAIL(parse_request_type(pos,
919
                                              returning_params_num,
920
                                              new_param_bound_flag,
921
                                              cs_conn,
922
                                              cs_server,
923
                                              returning_param_types,
924
                                              returning_param_type_infos))) {
925
          LOG_WARN("fail to parse returning into params type", K(ret));
926
        }
927
      }
928

929
      if (OB_SUCC(ret) && is_arraybinding_) {
930
        OZ (check_param_type_for_arraybinding(session, param_type_infos));
931
      }
932
      if (OB_SUCC(ret) && stmt::T_CALL_PROCEDURE == ps_session_info->get_stmt_type()) {
933
        ctx_.is_execute_call_stmt_ = true;
934
      }
935

936
      // Step5: decode value
937
      for (int64_t i = 0; OB_SUCC(ret) && i < input_param_num; ++i) {
938
        ObObjParam &param = is_arraybinding_ ? arraybinding_params_->at(i) : params_->at(i);
939
        param.reset();
940
        if (OB_SUCC(ret) && OB_FAIL(parse_request_param_value(alloc,
941
                                                              session,
942
                                                              pos,
943
                                                              i,
944
                                                              param_types.at(i),
945
                                                              param_type_infos.at(i),
946
                                                              param,
947
                                                              bitmap))) {
948
          LOG_WARN("fail to parse request param values", K(ret), K(i));
949
        } else {
950
          LOG_DEBUG("after parser param", K(param), K(i));
951
        }
952
        if (OB_SUCC(ret) && is_arraybinding_) {
953
          OZ (check_param_value_for_arraybinding(param));
954
        }
955
      }
956

957
      // Step5-2: decode returning into value
958
      // need parse returning into params
959
      if (OB_SUCC(ret) && returning_params_num > 0) {
960
        CK(returning_param_types.count() == returning_params_num);
961
        CK(returning_param_type_infos.count() == returning_params_num);
962
        for (int64_t i = 0; OB_SUCC(ret) && i < returning_params_num; ++i) {
963
          ObObjParam param;
964
          if (OB_FAIL(parse_request_param_value(alloc,
965
                                                session,
966
                                                pos,
967
                                                i + input_param_num,
968
                                                returning_param_types.at(i),
969
                                                returning_param_type_infos.at(i),
970
                                                param,
971
                                                bitmap))) {
972
            LOG_WARN("fail to parse request returning into param values", K(ret), K(i));
973
          } else {
974
            LOG_DEBUG("after parser resolve returning into", K(param), K(i));
975
          }
976
        }
977
      }
978
    }
979
    ctx_.schema_guard_ = old_guard;
980
    ctx_.session_info_ = old_sess_info;
981
  }
982
  return ret;
983
}
984

985
int ObMPStmtExecute::decode_type_info(const char*& buf, TypeInfo &type_info)
986
{
987
  int ret = OB_SUCCESS;
988
  if (OB_SUCC(ret)) {
989
    uint64_t length = 0;
990
    if (OB_FAIL(ObMySQLUtil::get_length(buf, length))) {
991
      LOG_WARN("failed to get length", K(ret));
992
    } else {
993
      PS_DEFENSE_CHECK(length)
994
      {
995
        type_info.relation_name_.assign_ptr(buf, static_cast<ObString::obstr_size_t>(length));
996
        buf += length;
997
      }
998
    }
999
  }
1000
  if (OB_SUCC(ret)) {
1001
    uint64_t length = 0;
1002
    if (OB_FAIL(ObMySQLUtil::get_length(buf, length))) {
1003
      LOG_WARN("failed to get length", K(ret));
1004
    } else {
1005
      PS_DEFENSE_CHECK(length)
1006
      {
1007
        type_info.type_name_.assign_ptr(buf, static_cast<ObString::obstr_size_t>(length));
1008
        buf += length;
1009
      }
1010
    }
1011
  }
1012
  if (OB_SUCC(ret)) {
1013
    uint64_t version = 0;
1014
    if (OB_FAIL(ObMySQLUtil::get_length(buf, version))) {
1015
      LOG_WARN("failed to get version", K(ret));
1016
    }
1017
  }
1018
  return ret;
1019
}
1020

1021
int ObMPStmtExecute::set_session_active(ObSQLSessionInfo &session) const
1022
{
1023
  int ret = OB_SUCCESS;
1024
  if (OB_FAIL(session.set_session_state(QUERY_ACTIVE))) {
1025
    LOG_WARN("fail to set session state", K(ret));
1026
  } else {
1027
    session.set_query_start_time(get_receive_timestamp());
1028
    session.set_mysql_cmd(obmysql::COM_STMT_EXECUTE);
1029
    session.update_last_active_time();
1030
  }
1031
  return ret;
1032
}
1033

1034
int ObMPStmtExecute::execute_response(ObSQLSessionInfo &session,
1035
                                      ObMySQLResultSet &result,
1036
                                      const bool enable_perf_event,
1037
                                      bool &need_response_error,
1038
                                      bool &is_diagnostics_stmt,
1039
                                      int64_t &execution_id,
1040
                                      const bool force_sync_resp,
1041
                                      bool &async_resp_used,
1042
                                      ObPsStmtId &inner_stmt_id)
1043
{
1044
  int ret = OB_SUCCESS;
1045
  inner_stmt_id = OB_INVALID_ID;
1046
  ObIAllocator &alloc = CURRENT_CONTEXT->get_arena_allocator();
1047
  if (OB_ISNULL(session.get_ps_cache())) {
1048
    ret = OB_ERR_UNEXPECTED;
1049
    LOG_WARN("ps : ps cache is null.", K(ret), K(stmt_id_));
1050
  } else if (OB_FAIL(session.get_inner_ps_stmt_id(stmt_id_, inner_stmt_id))) {
1051
    ret = OB_ERR_UNEXPECTED;
1052
    LOG_WARN("ps : get inner stmt id fail.", K(ret), K(stmt_id_));
1053
  } else {
1054
    ObPsStmtInfoGuard guard;
1055
    ObPsStmtInfo *ps_info = NULL;
1056
    if (OB_FAIL(session.get_ps_cache()->get_stmt_info_guard(inner_stmt_id, guard))) {
1057
      LOG_WARN("get stmt info guard failed", K(ret), K(stmt_id_), K(inner_stmt_id));
1058
    } else if (OB_ISNULL(ps_info = guard.get_stmt_info())) {
1059
      ret = OB_ERR_UNEXPECTED;
1060
      LOG_WARN("get stmt info is null", K(ret));
1061
    } else {
1062
      if (is_execute_ps_cursor() && stmt::T_SELECT != ps_info->get_stmt_type()) {
1063
        set_ps_cursor_type(ObNormalType);
1064
      }
1065
      ctx_.cur_sql_ = ps_info->get_ps_sql();
1066
    }
1067
  }
1068
  if OB_FAIL(ret) {
1069
    // do nothing
1070
  } else if (is_execute_ps_cursor()) {
1071
    ObDbmsCursorInfo *cursor = NULL;
1072
    bool use_stream = false;
1073
    // 1.创建cursor
1074
    if (OB_NOT_NULL(session.get_cursor(stmt_id_))) {
1075
      if (OB_FAIL(session.close_cursor(stmt_id_))) {
1076
        LOG_WARN("fail to close result set", K(ret), K(stmt_id_), K(session.get_sessid()));
1077
      }
1078
    }
1079
    OZ (session.make_dbms_cursor(cursor, stmt_id_));
1080
    CK (OB_NOT_NULL(cursor));
1081
    OX (cursor->set_stmt_type(stmt::T_SELECT));
1082
    OX (cursor->set_ps_sql(ctx_.cur_sql_));
1083
    OZ (session.ps_use_stream_result_set(use_stream));
1084
    if (use_stream) {
1085
      OX (cursor->set_streaming());
1086
    }
1087
    OZ (cursor->prepare_entity(session));
1088
    CK (OB_NOT_NULL(cursor->get_allocator()));
1089
    OZ (cursor->init_params(params_->count()));
1090
    OZ (cursor->get_exec_params().assign(*params_));
1091
    OZ (gctx_.sql_engine_->init_result_set(ctx_, result));
1092
    if (OB_SUCCESS != ret || enable_perf_event) {
1093
      exec_start_timestamp_ = ObTimeUtility::current_time();
1094
    }
1095
    if (OB_SUCC(ret)) {
1096
      ObPLExecCtx pl_ctx(cursor->get_allocator(), &result.get_exec_context(), NULL/*params*/,
1097
                        NULL/*result*/, &ret, NULL/*func*/, true);
1098
      if (OB_FAIL(ObSPIService::dbms_dynamic_open(&pl_ctx, *cursor))) {
1099
        LOG_WARN("open cursor fail. ", K(ret), K(stmt_id_));
1100
        if (!THIS_WORKER.need_retry()) {
1101
          int cli_ret = OB_SUCCESS;
1102
          retry_ctrl_.test_and_save_retry_state(
1103
            gctx_, ctx_, result, ret, cli_ret, is_arraybinding_ /*ararybinding only local retry*/);
1104
          if (OB_ERR_PROXY_REROUTE == ret) {
1105
            LOG_DEBUG("run stmt_query failed, check if need retry",
1106
                      K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K_(stmt_id));
1107
          } else {
1108
            LOG_WARN("run stmt_query failed, check if need retry",
1109
                      K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K_(stmt_id));
1110
          }
1111
          ret = cli_ret;
1112
        }
1113
        if (OB_ERR_PROXY_REROUTE == ret && !is_arraybinding_) {
1114
          need_response_error = true;
1115
        }
1116
      }
1117
    }
1118
    /*
1119
    * PS模式exec-cursor协议中,
1120
    * 不返回 result_set 结果集,只返回包头信息
1121
    * 并在EOF包中设置 OB_SERVER_STATUS_CURSOR_EXISTS 状态
1122
    * 提示驱动发送fetch协议
1123
    */
1124
    OZ (response_query_header(session, *cursor));
1125
    if (OB_SUCCESS != ret && OB_NOT_NULL(cursor)) {
1126
      int tmp_ret = ret;
1127
      if (OB_FAIL(session.close_cursor(cursor->get_id()))) {
1128
        LOG_WARN("close cursor failed.", K(ret), K(stmt_id_));
1129
      }
1130
      ret = tmp_ret;
1131
    }
1132
  } else if (FALSE_IT(ctx_.enable_sql_resource_manage_ = true)) {
1133
  } else if (OB_FAIL(gctx_.sql_engine_->stmt_execute(stmt_id_,
1134
                                                      stmt_type_,
1135
                                                      *params_,
1136
                                                      ctx_, result,
1137
                                                      false /* is_inner_sql */))) {
1138
    exec_start_timestamp_ = ObTimeUtility::current_time();
1139
    if (!THIS_WORKER.need_retry()) {
1140
      int cli_ret = OB_SUCCESS;
1141
      retry_ctrl_.test_and_save_retry_state(
1142
        gctx_, ctx_, result, ret, cli_ret, is_arraybinding_ /*ararybinding only local retry*/);
1143
      if (OB_ERR_PROXY_REROUTE == ret) {
1144
        LOG_DEBUG("run stmt_query failed, check if need retry",
1145
                  K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K_(stmt_id));
1146
      } else {
1147
        LOG_WARN("run stmt_query failed, check if need retry",
1148
                  K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K_(stmt_id));
1149
      }
1150
      ret = cli_ret;
1151
    }
1152
    if (OB_ERR_PROXY_REROUTE == ret && !is_arraybinding_) {
1153
      need_response_error = true;
1154
    }
1155
  } else {
1156
    //监控项统计开始
1157
    exec_start_timestamp_ = ObTimeUtility::current_time();
1158
    result.get_exec_context().set_plan_start_time(exec_start_timestamp_);
1159
    // 本分支内如果出错,全部会在response_result内部处理妥当
1160
    // 无需再额外处理回复错误包
1161

1162
    need_response_error = false;
1163
    is_diagnostics_stmt = ObStmt::is_diagnostic_stmt(result.get_literal_stmt_type());
1164
    ctx_.is_show_trace_stmt_ = ObStmt::is_show_trace_stmt(result.get_literal_stmt_type());
1165
    session.set_current_execution_id(execution_id);
1166

1167
    if (OB_FAIL(ret)) {
1168
    } else if (is_arraybinding_) {
1169
      if (OB_FAIL(after_do_process_for_arraybinding(result))) {
1170
        LOG_WARN("failed to process arraybinding sql", K(ret));
1171
      }
1172
    } else if (OB_FAIL(response_result(result,
1173
                                        session,
1174
                                        force_sync_resp,
1175
                                        async_resp_used))) {
1176
      ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
1177
      if (OB_ISNULL(plan_ctx)) {
1178
        LOG_ERROR("execute query fail, and plan_ctx is NULL", K(ret));
1179
      } else {
1180
        LOG_WARN("execute query fail", K(ret), "timeout_timestamp",
1181
                  plan_ctx->get_timeout_timestamp());
1182
      }
1183
    }
1184
  }
1185
  return ret;
1186
}
1187
int ObMPStmtExecute::do_process(ObSQLSessionInfo &session,
1188
                                 ParamStore *param_store,
1189
                                 const bool has_more_result,
1190
                                 const bool force_sync_resp,
1191
                                 bool &async_resp_used)
1192
{
1193
  int ret = OB_SUCCESS;
1194
  ObAuditRecordData &audit_record = session.get_raw_audit_record();
1195
  audit_record.try_cnt_++;
1196
  bool is_diagnostics_stmt = false;
1197
  ObPsStmtId inner_stmt_id = OB_INVALID_ID;
1198
  bool need_response_error = is_arraybinding_ ? false : true;
1199
  const bool enable_perf_event = lib::is_diagnose_info_enabled();
1200
  const bool enable_sql_audit =
1201
    GCONF.enable_sql_audit && session.get_local_ob_enable_sql_audit();
1202

1203
  single_process_timestamp_ = ObTimeUtility::current_time();
1204

1205
  /* !!!
1206
   * 注意req_timeinfo_guard一定要放在result前面
1207
   * !!!
1208
   */
1209
  ObReqTimeGuard req_timeinfo_guard;
1210
  SMART_VAR(ObMySQLResultSet, result, session, THIS_WORKER.get_sql_arena_allocator()) {
1211

1212
    ObWaitEventStat total_wait_desc;
1213
    int64_t execution_id = 0;
1214
    ObDiagnoseSessionInfo *di = ObDiagnoseSessionInfo::get_local_diagnose_info();
1215
    {
1216
      ObMaxWaitGuard max_wait_guard(enable_perf_event ? &audit_record.exec_record_.max_wait_event_ : NULL, di);
1217
      ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di);
1218
      if (enable_perf_event) {
1219
        audit_record.exec_record_.record_start(di);
1220
      }
1221

1222
      result.set_has_more_result(has_more_result);
1223
      result.set_ps_protocol();
1224
      ObTaskExecutorCtx *task_ctx = result.get_exec_context().get_task_executor_ctx();
1225
      if (OB_ISNULL(task_ctx)) {
1226
        ret = OB_ERR_UNEXPECTED;
1227
        LOG_ERROR("task executor ctx can not be NULL", K(task_ctx), K(ret));
1228
      } else {
1229
        task_ctx->schema_service_ = gctx_.schema_service_;
1230
        task_ctx->set_query_tenant_begin_schema_version(retry_ctrl_.get_tenant_local_schema_version());
1231
        task_ctx->set_query_sys_begin_schema_version(retry_ctrl_.get_sys_local_schema_version());
1232
        task_ctx->set_min_cluster_version(GET_MIN_CLUSTER_VERSION());
1233

1234
        ctx_.retry_times_ = retry_ctrl_.get_retry_times();
1235
        if (OB_ISNULL(ctx_.schema_guard_)) {
1236
          ret = OB_INVALID_ARGUMENT;
1237
          LOG_WARN("newest schema is NULL", K(ret));
1238
        } else if (OB_FAIL(result.init())) {
1239
          LOG_WARN("result set init failed", K(ret));
1240
        } else if (OB_ISNULL(gctx_.sql_engine_) || OB_ISNULL(param_store)) {
1241
          ret = OB_ERR_UNEXPECTED;
1242
          LOG_ERROR("invalid sql engine", K(ret), K(gctx_), K(param_store));
1243
        } else if (FALSE_IT(execution_id = gctx_.sql_engine_->get_execution_id())) {
1244
          // do nothing ...
1245
        } else if (OB_FAIL(set_session_active(session))) {
1246
          LOG_WARN("fail to set session active", K(ret));
1247
        } else {
1248
          if (is_prexecute()) {
1249
            ret = static_cast<ObMPStmtPrexecute*>(this)->
1250
                      execute_response(session,
1251
                                        *param_store,
1252
                                        ctx_,
1253
                                        result,
1254
                                        retry_ctrl_,
1255
                                        enable_perf_event,
1256
                                        need_response_error,
1257
                                        is_diagnostics_stmt,
1258
                                        execution_id,
1259
                                        force_sync_resp,
1260
                                        async_resp_used,
1261
                                        inner_stmt_id);
1262
          } else {
1263
            ret = execute_response(session,
1264
                                    result,
1265
                                    enable_perf_event,
1266
                                    need_response_error,
1267
                                    is_diagnostics_stmt,
1268
                                    execution_id,
1269
                                    force_sync_resp,
1270
                                    async_resp_used,
1271
                                    inner_stmt_id);
1272
          }
1273
          if ((OB_SUCC(ret) && is_diagnostics_stmt) || async_resp_used) {
1274
            // if diagnostic stmt succeed, no need to clear warning buf.
1275
            // or async resp is used, it will be cleared in callback thread.
1276
            session.update_show_warnings_buf();
1277
          } else {
1278
            session.set_show_warnings_buf(ret);
1279
          }
1280
        }
1281
      }
1282
      //监控项统计结束
1283
      exec_end_timestamp_ = ObTimeUtility::current_time();
1284

1285
      // some statistics must be recorded for plan stat, even though sql audit disabled
1286
      bool first_record = (1 == audit_record.try_cnt_);
1287
      ObExecStatUtils::record_exec_timestamp(*this, first_record, audit_record.exec_timestamp_);
1288
      audit_record.exec_timestamp_.update_stage_time();
1289

1290
      if (enable_perf_event) {
1291
        audit_record.exec_record_.record_end(di);
1292
        record_stat(result.get_stmt_type(), exec_end_timestamp_);
1293
        audit_record.exec_record_.wait_time_end_ = total_wait_desc.time_waited_;
1294
        audit_record.exec_record_.wait_count_end_ = total_wait_desc.total_waits_;
1295
        audit_record.update_event_stage_state();
1296
      }
1297

1298
      if (enable_perf_event && !THIS_THWORKER.need_retry()
1299
        && OB_NOT_NULL(result.get_physical_plan())) {
1300
        const int64_t time_cost = exec_end_timestamp_ - get_receive_timestamp();
1301
        ObSQLUtils::record_execute_time(result.get_physical_plan()->get_plan_type(), time_cost);
1302
      }
1303

1304
      if (OB_FAIL(ret)
1305
          && !async_resp_used
1306
          && need_response_error
1307
          && is_conn_valid()
1308
          && !THIS_WORKER.need_retry()
1309
          && !retry_ctrl_.need_retry()) {
1310
        LOG_WARN("query failed", K(ret), K(retry_ctrl_.need_retry()), K_(stmt_id));
1311
        // 当need_retry=false时,可能给客户端回过包了,可能还没有回过任何包。
1312
        // 不过,可以确定:这个请求出错了,还没处理完。如果不是已经交给异步EndTrans收尾,
1313
        // 则需要在下面回复一个error_packet作为收尾。否则后面没人帮忙发错误包给客户端了,
1314
        // 可能会导致客户端挂起等回包。
1315
        bool is_partition_hit = session.get_err_final_partition_hit(ret);
1316
        int err = send_error_packet(ret, NULL, is_partition_hit, (void *)ctx_.get_reroute_info());
1317
        if (OB_SUCCESS != err) {  // 发送error包
1318
          LOG_WARN("send error packet failed", K(ret), K(err));
1319
        }
1320
      }
1321
    }
1322

1323
    audit_record.status_ =
1324
      (0 == ret || OB_ITER_END == ret) ? REQUEST_SUCC : (ret);
1325
    if (enable_sql_audit && !is_ps_cursor()) {
1326
      ObPhysicalPlan *plan = result.get_physical_plan();
1327
      audit_record.seq_ = 0;  //don't use now
1328
      audit_record.execution_id_ = execution_id;
1329
      audit_record.client_addr_ = session.get_peer_addr();
1330
      audit_record.user_client_addr_ = session.get_user_client_addr();
1331
      audit_record.user_group_ = THIS_WORKER.get_group_id();
1332
      MEMCPY(audit_record.sql_id_, ctx_.sql_id_, (int32_t)sizeof(audit_record.sql_id_));
1333
      if (NULL != plan) {
1334
        audit_record.plan_type_ = plan->get_plan_type();
1335
        audit_record.table_scan_ = plan->contain_table_scan();
1336
        audit_record.plan_id_ = plan->get_plan_id();
1337
        audit_record.plan_hash_ = plan->get_plan_hash_value();
1338
        audit_record.rule_name_ = const_cast<char *>(plan->get_rule_name().ptr());
1339
        audit_record.rule_name_len_ = plan->get_rule_name().length();
1340
        audit_record.partition_hit_ = session.partition_hit().get_bool();
1341
      }
1342
      audit_record.affected_rows_ = result.get_affected_rows();
1343
      audit_record.return_rows_ = result.get_return_rows();
1344
      audit_record.partition_cnt_ =
1345
        result.get_exec_context().get_das_ctx().get_related_tablet_cnt();
1346
      audit_record.expected_worker_cnt_ =
1347
        result.get_exec_context().get_task_exec_ctx().get_expected_worker_cnt();
1348
      audit_record.used_worker_cnt_ =
1349
        result.get_exec_context().get_task_exec_ctx().get_admited_worker_cnt();
1350

1351
      audit_record.is_executor_rpc_ = false;
1352
      audit_record.is_inner_sql_ = false;
1353
      audit_record.is_hit_plan_cache_ = result.get_is_from_plan_cache();
1354
      audit_record.sql_ = const_cast<char *>(ctx_.cur_sql_.ptr());
1355
      audit_record.sql_len_ = min(ctx_.cur_sql_.length(), OB_MAX_SQL_LENGTH);
1356
      audit_record.sql_cs_type_ = session.get_local_collation_connection();
1357
      audit_record.ps_stmt_id_ = stmt_id_;
1358
      audit_record.ps_inner_stmt_id_ = inner_stmt_id;
1359
      audit_record.params_value_ = params_value_;
1360
      audit_record.params_value_len_ = params_value_len_;
1361
      audit_record.is_perf_event_closed_ = !lib::is_diagnose_info_enabled();
1362

1363
      ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
1364
      if (OB_NOT_NULL(plan_ctx)) {
1365
        audit_record.consistency_level_ = plan_ctx->get_consistency_level();
1366
      }
1367
    }
1368

1369
    //update v$sql statistics
1370
    if (session.get_local_ob_enable_plan_cache()
1371
        && !retry_ctrl_.need_retry()
1372
        && !is_ps_cursor()) {
1373
      // ps cursor do this in inner open
1374
      ObIArray<ObTableRowCount> *table_row_count_list = NULL;
1375
      ObPhysicalPlan *plan = result.get_physical_plan();
1376
      ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
1377
      if (OB_NOT_NULL(plan_ctx)) {
1378
        table_row_count_list = &(plan_ctx->get_table_row_count_list());
1379
        audit_record.table_scan_stat_ = plan_ctx->get_table_scan_stat();
1380
      }
1381
      if (NULL != plan) {
1382
        if (!(ctx_.self_add_plan_) && ctx_.plan_cache_hit_) {
1383
          plan->update_plan_stat(audit_record,
1384
              false, // false mean not first update plan stat
1385
              result.get_exec_context().get_is_evolution(),
1386
              table_row_count_list);
1387
          plan->update_cache_access_stat(audit_record.table_scan_stat_);
1388
        } else if (ctx_.self_add_plan_ && !ctx_.plan_cache_hit_) {
1389
          plan->update_plan_stat(audit_record,
1390
              true,
1391
              result.get_exec_context().get_is_evolution(),
1392
              table_row_count_list);
1393
          plan->update_cache_access_stat(audit_record.table_scan_stat_);
1394
        } else if (ctx_.self_add_plan_ && ctx_.plan_cache_hit_) {
1395
          // spm evolution plan first execute
1396
          plan->update_plan_stat(audit_record,
1397
              true,
1398
              result.get_exec_context().get_is_evolution(),
1399
              table_row_count_list);
1400
          plan->update_cache_access_stat(audit_record.table_scan_stat_);
1401
        }
1402
      }
1403
    }
1404

1405
    // reset thread waring buffer in sync mode
1406
    if (!async_resp_used) {
1407
      clear_wb_content(session);
1408
    }
1409

1410
    bool need_retry = (THIS_THWORKER.need_retry()
1411
                       || RETRY_TYPE_NONE != retry_ctrl_.get_retry_type());
1412
    if (!is_ps_cursor()) {
1413
#ifdef OB_BUILD_SPM
1414
      if (!need_retry) {
1415
        (void)ObSQLUtils::handle_plan_baseline(audit_record, result.get_physical_plan(), ret, ctx_);
1416
      }
1417
#endif
1418
      // ps cursor has already record after inner_open in spi
1419
      ObSQLUtils::handle_audit_record(need_retry, EXECUTE_PS_EXECUTE, session, ctx_.is_sensitive_);
1420
    }
1421
  }
1422
  return ret;
1423
}
1424

1425
// return false only if send packet fail.
1426
int ObMPStmtExecute::response_result(
1427
    ObMySQLResultSet &result,
1428
    ObSQLSessionInfo &session,
1429
    bool force_sync_resp,
1430
    bool &async_resp_used)
1431
{
1432
  int ret = OB_SUCCESS;
1433
#ifndef OB_BUILD_SPM
1434
  bool need_trans_cb  = result.need_end_trans_callback() && (!force_sync_resp);
1435
#else
1436
  bool need_trans_cb  = result.need_end_trans_callback() &&
1437
                        (!force_sync_resp) &&
1438
                        (!ctx_.spm_ctx_.check_execute_status_);
1439
#endif
1440

1441
  // NG_TRACE_EXT(exec_begin, ID(arg1), force_sync_resp, ID(end_trans_cb), need_trans_cb);
1442

1443
  if (OB_LIKELY(NULL != result.get_physical_plan())) {
1444
    if (need_trans_cb) {
1445
      ObAsyncPlanDriver drv(gctx_, ctx_, session, retry_ctrl_, *this, is_prexecute());
1446
      // NOTE: sql_end_cb必须在drv.response_result()之前初始化好
1447
      ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
1448
      if (OB_FAIL(sql_end_cb.init(packet_sender_, &session,
1449
                                    stmt_id_, params_num_,
1450
                                    is_prexecute() ? packet_sender_.get_comp_seq() : 0))) {
1451
        LOG_WARN("failed to init sql end callback", K(ret));
1452
      } else if (OB_FAIL(drv.response_result(result))) {
1453
        LOG_WARN("fail response async result", K(ret));
1454
      }
1455
      async_resp_used = result.is_async_end_trans_submitted();
1456
    } else {
1457
      // 试点ObQuerySyncDriver
1458
      int32_t iteration_count = OB_INVALID_COUNT;
1459
      if (is_prexecute()) {
1460
        iteration_count = static_cast<ObMPStmtPrexecute*>(this)->get_iteration_count();
1461
      }
1462
      ObSyncPlanDriver drv(gctx_,
1463
                           ctx_,
1464
                           session,
1465
                           retry_ctrl_,
1466
                           *this,
1467
                           is_prexecute(),
1468
                           iteration_count);
1469
      ret = drv.response_result(result);
1470
    }
1471
  } else {
1472
    if (need_trans_cb) {
1473
      ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
1474
      ObAsyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this, is_prexecute());
1475
      if (OB_FAIL(sql_end_cb.init(packet_sender_, &session,
1476
                                    stmt_id_, params_num_,
1477
                                    is_prexecute() ? packet_sender_.get_comp_seq() : 0))) {
1478
        LOG_WARN("failed to init sql end callback", K(ret));
1479
      } else if (OB_FAIL(drv.response_result(result))) {
1480
        LOG_WARN("fail response async result", K(ret));
1481
      } else {
1482
        LOG_DEBUG("use async cmd driver success!",
1483
                  K(result.get_stmt_type()), K(session.get_local_autocommit()));
1484
      }
1485
      async_resp_used = result.is_async_end_trans_submitted();
1486
    } else {
1487
      ObSyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this, is_prexecute());
1488
      session.set_pl_query_sender(&drv);
1489
      session.set_ps_protocol(result.is_ps_protocol());
1490
      if (OB_FAIL(drv.response_result(result))) {
1491
        LOG_WARN("failed response sync result", K(ret));
1492
      } else {
1493
        LOG_DEBUG("use sync cmd driver success!",
1494
                  K(result.get_stmt_type()), K(session.get_local_autocommit()));
1495
      }
1496
      session.set_pl_query_sender(NULL);
1497
    }
1498
  }
1499
//  NG_TRACE(exec_end);
1500
  return ret;
1501
}
1502

1503
OB_NOINLINE int ObMPStmtExecute::process_retry(ObSQLSessionInfo &session,
1504
                                               ParamStore *param_store,
1505
                                               bool has_more_result,
1506
                                               bool force_sync_resp,
1507
                                               bool &async_resp_used)
1508
{
1509
  int ret = OB_SUCCESS;
1510
  //create a temporary memory context to process retry, avoid memory bloat caused by retries
1511
  lib::ContextParam param;
1512
  param.set_mem_attr(MTL_ID(),
1513
      ObModIds::OB_SQL_EXECUTOR, ObCtxIds::DEFAULT_CTX_ID)
1514
    .set_properties(lib::USE_TL_PAGE_OPTIONAL)
1515
    .set_page_size(!lib::is_mini_mode() ? OB_MALLOC_BIG_BLOCK_SIZE
1516
        : OB_MALLOC_MIDDLE_BLOCK_SIZE)
1517
    .set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE);
1518
  CREATE_WITH_TEMP_CONTEXT(param) {
1519
    ret = do_process(session,
1520
                     param_store,
1521
                     has_more_result,
1522
                     force_sync_resp,
1523
                     async_resp_used);
1524
    ctx_.clear();
1525
  }
1526
  return ret;
1527
}
1528

1529
int ObMPStmtExecute::do_process_single(ObSQLSessionInfo &session,
1530
                                       ParamStore *param_store,
1531
                                       bool has_more_result,
1532
                                       bool force_sync_resp,
1533
                                       bool &async_resp_used)
1534
{
1535
  int ret = OB_SUCCESS;
1536
  // 每次执行不同sql都需要更新
1537
  ctx_.self_add_plan_ = false;
1538
  oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
1539
  do {
1540
    // 每次都必须设置为OB_SCCESS, 否则可能会因为没有调用do_process()造成死循环
1541
    ret = OB_SUCCESS;
1542
    share::schema::ObSchemaGetterGuard schema_guard;
1543
    int64_t tenant_version = 0;
1544
    int64_t sys_version = 0;
1545
    retry_ctrl_.clear_state_before_each_retry(session.get_retry_info_for_update());
1546
    OZ (gctx_.schema_service_->get_tenant_schema_guard(session.get_effective_tenant_id(),
1547
                                                       schema_guard));
1548
    OZ (schema_guard.get_schema_version(session.get_effective_tenant_id(), tenant_version));
1549
    OZ (schema_guard.get_schema_version(OB_SYS_TENANT_ID, sys_version));
1550
    OX (ctx_.schema_guard_ = &schema_guard);
1551
    OX (retry_ctrl_.set_tenant_local_schema_version(tenant_version));
1552
    OX (retry_ctrl_.set_sys_local_schema_version(sys_version));
1553

1554
    if (OB_SUCC(ret) && !is_send_long_data()) {
1555
      if (OB_LIKELY(session.get_is_in_retry()) 
1556
            || (is_arraybinding_ && (prepare_packet_sent_ || !is_prexecute()))) {
1557
        ret = process_retry(session,
1558
				                    param_store,
1559
                            has_more_result,
1560
                            force_sync_resp,
1561
                            async_resp_used);
1562
      } else {
1563
        ret = do_process(session,
1564
						             param_store,
1565
                         has_more_result,
1566
                         force_sync_resp,
1567
                         async_resp_used);
1568
        ctx_.clear();
1569
      }
1570
      session.set_session_in_retry(retry_ctrl_.need_retry());
1571
    }
1572
  } while (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type());
1573

1574
  if (OB_SUCC(ret) && retry_ctrl_.get_retry_times() > 0) {
1575
    // 经过重试之后才成功的,把sql打印出来。这里只能覆盖到本地重试的情况,没法覆盖到扔回队列重试的情况。
1576
    // 如果需要重试则ret不可能为OB_SUCCESS,因此这里不用判断retry_type。
1577
    LOG_TRACE("sql retry",
1578
              K(ret), "retry_times", retry_ctrl_.get_retry_times(), "sql", ctx_.cur_sql_);
1579
  }
1580
  return ret;
1581
}
1582

1583
int ObMPStmtExecute::is_arraybinding_returning(sql::ObSQLSessionInfo &session, bool &is_ab_return)
1584
{
1585
  int ret = OB_SUCCESS;
1586
  ObPsCache *ps_cache = NULL;
1587
  ObPsStmtId inner_stmt_id = OB_INVALID_ID;
1588
  ObPsStmtInfoGuard guard;
1589
  ObPsStmtInfo *ps_info = NULL;
1590
  is_ab_return = false;
1591
  if (OB_ISNULL(ps_cache = session.get_ps_cache())) {
1592
    ret = OB_INVALID_ARGUMENT;
1593
    LOG_ERROR("physical plan context or ps plan cache is NULL or schema_guard is null",
1594
              K(ret), K(ps_cache));
1595
  } else if (OB_FAIL(session.get_inner_ps_stmt_id(stmt_id_, inner_stmt_id))) {
1596
    LOG_WARN("fail to get inner ps stmt_id", K(ret), K(stmt_id_), K(inner_stmt_id));
1597
  } else if (OB_FAIL(session.get_ps_cache()->get_stmt_info_guard(inner_stmt_id, guard))) {
1598
    LOG_WARN("get stmt info guard failed", K(ret), K(stmt_id_), K(inner_stmt_id));
1599
  } else if (OB_ISNULL(ps_info = guard.get_stmt_info())) {
1600
    ret = OB_ERR_UNEXPECTED;
1601
    LOG_WARN("get stmt info is null", K(ret));
1602
  } else if (ps_info->get_num_of_returning_into() > 0) {
1603
    is_ab_return = true;
1604
    LOG_TRACE("is arraybinding returning", K(ret), KPC(ps_info));
1605
  }
1606
  return ret;
1607
}
1608

1609
int ObMPStmtExecute::try_batch_multi_stmt_optimization(ObSQLSessionInfo &session,
1610
                                                       bool has_more_result,
1611
                                                       bool force_sync_resp,
1612
                                                       bool &async_resp_used,
1613
                                                       bool &optimization_done)
1614
{
1615
  // 1. save_exception 不能batch
1616
  // 2. returning 不能batch
1617
  int ret = OB_SUCCESS;
1618
  optimization_done = false;
1619
  ctx_.multi_stmt_item_.set_ps_mode(true);
1620
  ctx_.multi_stmt_item_.set_ab_cnt(arraybinding_size_);
1621
  bool is_ab_returning = false;
1622
  ParamStore *array_binding_params = NULL;
1623
  bool enable_batch_opt = session.is_enable_batched_multi_statement();
1624
  bool use_plan_cache = session.get_local_ob_enable_plan_cache();
1625
  ObIAllocator &alloc = CURRENT_CONTEXT->get_arena_allocator();
1626

1627
  if (!enable_batch_opt) {
1628
    // 不支持做batch执行
1629
    LOG_TRACE("not open the batch optimization");
1630
  } else if (!use_plan_cache) {
1631
    LOG_TRACE("not enable the plan_cache", K(use_plan_cache));
1632
    // plan_cache开关没打开
1633
  } else if (!is_prexecute()) {
1634
    // 只对二合一协议开启batch优化
1635
  } else if (is_pl_stmt(stmt_type_)) {
1636
    LOG_TRACE("is pl execution, can't do the batch optimization");
1637
  } else if (1 == arraybinding_size_) {
1638
    LOG_TRACE("arraybinding size is 1, not need d batch");
1639
  } else if (get_save_exception()) {
1640
    LOG_TRACE("is save exception mode, not supported batch optimization");
1641
  } else if (OB_FAIL(is_arraybinding_returning(session, is_ab_returning))) {
1642
    LOG_WARN("failed to check is arraybinding returning", K(ret));
1643
  } else if (is_ab_returning) {
1644
    LOG_TRACE("returning not support the batch optimization");
1645
  } else if (OB_FAIL(ObSQLUtils::transform_pl_ext_type(*arraybinding_params_,
1646
                                                       arraybinding_size_,
1647
                                                       alloc,
1648
                                                       array_binding_params))) {
1649
    LOG_WARN("fail to trans_form extend type params_store", K(ret), K(arraybinding_size_));
1650
  } else if (OB_FAIL(do_process_single(session, array_binding_params, has_more_result, force_sync_resp, async_resp_used))) {
1651
    // 调用do_single接口
1652
    if (THIS_WORKER.need_retry()) {
1653
      // just go back to large query queue and retry
1654
    } else if (OB_BATCHED_MULTI_STMT_ROLLBACK == ret) {
1655
      LOG_TRACE("batched multi_stmt needs rollback", K(ret));
1656
      ret = OB_SUCCESS;
1657
    } else {
1658
      // 无论什么报错,都走单行执行一次,用于容错
1659
      int ret_tmp = ret;
1660
      ret = OB_SUCCESS;
1661
      LOG_WARN("failed to process batch stmt, cover the error code, reset retry flag, then execute with single row",
1662
          K(ret_tmp), K(ret), K(THIS_WORKER.need_retry()));
1663
    }
1664
  } else {
1665
    optimization_done = true;
1666
  }
1667
  LOG_TRACE("after try batched multi-stmt optimization", K(ret), K(stmt_type_), K(use_plan_cache),
1668
      K(optimization_done), K(enable_batch_opt), K(is_ab_returning), K(THIS_WORKER.need_retry()), K(arraybinding_size_));
1669
  return ret;
1670
}
1671

1672
int ObMPStmtExecute::process_execute_stmt(const ObMultiStmtItem &multi_stmt_item,
1673
                                          ObSQLSessionInfo &session,
1674
                                          bool has_more_result,
1675
                                          bool force_sync_resp,
1676
                                          bool &async_resp_used)
1677
{
1678
  int ret = OB_SUCCESS;
1679
  bool need_response_error = true;
1680

1681
  // 执行setup_wb后,所有WARNING都会写入到当前session的WARNING BUFFER中
1682
  setup_wb(session);
1683
  const bool enable_trace_log = lib::is_trace_log_enabled();
1684
  //============================ 注意这些变量的生命周期 ================================
1685
  ObSMConnection *conn = get_conn();
1686
  ObSessionStatEstGuard stat_est_guard(conn->tenant_->id(), session.get_sessid());
1687
  if (OB_FAIL(init_process_var(ctx_, multi_stmt_item, session))) {
1688
    LOG_WARN("init process var failed.", K(ret), K(multi_stmt_item));
1689
  } else {
1690
    if (enable_trace_log) {
1691
      //set session log_level.Must use ObThreadLogLevelUtils::clear() in pair
1692
      ObThreadLogLevelUtils::init(session.get_log_id_level_map());
1693
    }
1694
    // obproxy may use 'SET @@last_schema_version = xxxx' to set newest schema,
1695
    // observer will force refresh schema if local_schema_version < last_schema_version;
1696
    if (OB_FAIL(check_and_refresh_schema(session.get_login_tenant_id(),
1697
                                         session.get_effective_tenant_id()))) {
1698
      LOG_WARN("failed to check_and_refresh_schema", K(ret));
1699
    } else if (OB_FAIL(session.update_timezone_info())) {
1700
      LOG_WARN("fail to update time zone info", K(ret));
1701
    } else if (is_arraybinding_) {
1702
      need_response_error = false;
1703
      bool optimization_done = false;
1704
      if (ctx_.can_reroute_sql_) {
1705
        ctx_.can_reroute_sql_ = false;
1706
        LOG_INFO("arraybinding not support reroute sql.");
1707
      }
1708
      ObSEArray<ObSavedException, 4> exception_array;
1709
      if (OB_UNLIKELY(arraybinding_size_ <= 0)) {
1710
        ret = OB_NOT_SUPPORTED;
1711
        LOG_WARN("arraybinding has no parameters", K(ret), K(arraybinding_size_));
1712
        LOG_USER_ERROR(OB_NOT_SUPPORTED, "oci arraybinding has no parameters");
1713
      } else if (OB_FAIL(try_batch_multi_stmt_optimization(session,
1714
                                                           has_more_result,
1715
                                                           force_sync_resp,
1716
                                                           async_resp_used, optimization_done))) {
1717
        LOG_WARN("fail to try_batch_multi_stmt_optimization", K(ret));
1718
      } else if (!optimization_done) {
1719
        ctx_.multi_stmt_item_.set_ps_mode(true);
1720
        ctx_.multi_stmt_item_.set_ab_cnt(0);
1721
        for (int64_t i = 0; OB_SUCC(ret) && i < arraybinding_size_; ++i) {
1722
          set_curr_sql_idx(i);
1723
          OZ (construct_execute_param_for_arraybinding(i));
1724
          OZ (do_process_single(session, params_, has_more_result, force_sync_resp, async_resp_used));
1725
          if (OB_FAIL(ret)) {
1726
            if (is_save_exception_ && !is_prexecute()) {
1727
              // The old ps protocol will only collect error information here,
1728
              // and the new one has already done fault tolerance in the front
1729
              ret = save_exception_for_arraybinding(i, ret, exception_array);
1730
              ret = OB_SUCCESS;
1731
            }
1732
            if (OB_FAIL(ret)) {
1733
              // If there is still an error in the new ps protocol,
1734
              // then send an err package,
1735
              // indicating that the server has an error that is not expected by the customer
1736
              need_response_error = true;
1737
              break;
1738
            }
1739
          }
1740
        }
1741
      }
1742
      // 释放数组内存避免内存泄漏
1743
      reset_complex_param_memory(arraybinding_params_, session);
1744
      OZ (response_result_for_arraybinding(session, exception_array));
1745
    } else {
1746
      need_response_error = false;
1747
      if (OB_FAIL(do_process_single(session, params_, has_more_result, force_sync_resp, async_resp_used))) {
1748
        LOG_WARN("fail to do process", K(ret), K(ctx_.cur_sql_));
1749
      }
1750
      if (OB_UNLIKELY(NULL != GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && is_conn_valid()) {
1751
        int bak_ret = ret;
1752
        ObSQLSessionInfo *sess = NULL;
1753
        if (OB_FAIL(get_session(sess))) {
1754
          LOG_WARN("get session fail", K(ret));
1755
        } else if (OB_ISNULL(sess)) {
1756
          ret = OB_ERR_UNEXPECTED;
1757
          LOG_WARN("session is NULL or invalid", K(ret));
1758
        } else {
1759
          // Call setup_user_resource_group no matter OB_SUCC or OB_FAIL
1760
          if (OB_FAIL(setup_user_resource_group(*conn, sess->get_effective_tenant_id(), sess))) {
1761
            LOG_WARN("fail setup user resource group", K(ret));
1762
          }
1763
        }
1764
        if (sess != NULL) {
1765
          revert_session(sess);
1766
        }
1767
        ret = OB_SUCC(bak_ret) ? ret : bak_ret;
1768
      }
1769
      reset_complex_param_memory(params_, session);
1770
    }
1771
    if (enable_trace_log) {
1772
      ObThreadLogLevelUtils::clear();
1773
    }
1774
    const int64_t debug_sync_timeout = GCONF.debug_sync_timeout;
1775
    if (debug_sync_timeout > 0) {
1776
      // ignore thread local debug sync actions to session actions failed
1777
      int tmp_ret = OB_SUCCESS;
1778
      tmp_ret = GDS.collect_result_actions(session.get_debug_sync_actions());
1779
      if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
1780
        LOG_WARN("set thread local debug sync actions to session actions failed", K(tmp_ret));
1781
      }
1782
    }
1783
  }
1784

1785
  //对于tracelog的处理, 不影响正常逻辑, 错误码无须赋值给ret, 清空WARNING BUFFER
1786
  do_after_process(session, ctx_, async_resp_used);
1787

1788
  if (OB_FAIL(ret) && need_response_error && is_conn_valid()) {
1789
    send_error_packet(ret, NULL, (void *)(ctx_.get_reroute_info()));
1790
  }
1791

1792
  return ret;
1793
}
1794

1795

1796
int ObMPStmtExecute::process()
1797
{
1798
  int ret = OB_SUCCESS;
1799
  int flush_ret = OB_SUCCESS;
1800
  trace::UUID ps_execute_span_id;
1801
  ObSQLSessionInfo *sess = NULL;
1802
  bool need_response_error = true;
1803
  bool need_disconnect = true;
1804
  bool async_resp_used = false; // 由事务提交线程异步回复客户端
1805
  int64_t query_timeout = 0;
1806

1807
  ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id();
1808
  ObSMConnection *conn = get_conn();
1809
  if (OB_ISNULL(req_) || OB_ISNULL(conn) || OB_ISNULL(cur_trace_id)) {
1810
    ret = OB_ERR_UNEXPECTED;
1811
    LOG_WARN("null conn ptr", K_(stmt_id), K_(req), K(cur_trace_id), K(ret));
1812
  } else if (OB_UNLIKELY(!conn->is_in_authed_phase())) {
1813
    ret = OB_ERR_NO_PRIVILEGE;
1814
    LOG_WARN("receive sql without session", K_(stmt_id), K(ret));
1815
  } else if (OB_ISNULL(conn->tenant_)) {
1816
    ret = OB_ERR_UNEXPECTED;
1817
    LOG_ERROR("invalid tenant", K_(stmt_id), K(conn->tenant_), K(ret));
1818
  } else if (OB_FAIL(get_session(sess))) {
1819
    LOG_WARN("get session fail", K_(stmt_id), K(ret));
1820
  } else if (OB_ISNULL(sess)) {
1821
    ret = OB_ERR_UNEXPECTED;
1822
    LOG_WARN("session is NULL or invalid", K_(stmt_id), K(sess), K(ret));
1823
  } else if (OB_FAIL(update_transmission_checksum_flag(*sess))) {
1824
    LOG_WARN("update transmisson checksum flag failed", K(ret));
1825
  } else {
1826
    ObSQLSessionInfo &session = *sess;
1827
    int64_t tenant_version = 0;
1828
    int64_t sys_version = 0;
1829
    THIS_WORKER.set_session(sess);
1830
    lib::CompatModeGuard g(sess->get_compatibility_mode() == ORACLE_MODE ?
1831
                             lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL);
1832
    ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
1833
    session.set_current_trace_id(ObCurTraceId::get_trace_id());
1834
    session.init_use_rich_format();
1835
    session.get_raw_audit_record().request_memory_used_ = 0;
1836
    observer::ObProcessMallocCallback pmcb(0,
1837
          session.get_raw_audit_record().request_memory_used_);
1838
    lib::ObMallocCallbackGuard guard(pmcb);
1839
    session.set_thread_id(GETTID());
1840
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
1841
    int64_t packet_len = pkt.get_clen();
1842
    if (OB_UNLIKELY(!session.is_valid())) {
1843
      ret = OB_ERR_UNEXPECTED;
1844
      LOG_ERROR("invalid session", K_(stmt_id), K(ret));
1845
    } else if (OB_FAIL(process_kill_client_session(session))) {
1846
      LOG_WARN("client session has been killed", K(ret));
1847
    } else if (OB_UNLIKELY(session.is_zombie())) {
1848
      //session has been killed some moment ago
1849
      ret = OB_ERR_SESSION_INTERRUPTED;
1850
      LOG_WARN("session has been killed", K(session.get_session_state()), K_(stmt_id),
1851
               K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(), K(ret));
1852
    } else if (OB_FAIL(session.check_and_init_retry_info(*cur_trace_id, ctx_.cur_sql_))) {
1853
      LOG_WARN("fail to check and init retry info", K(ret), K(*cur_trace_id), K(ctx_.cur_sql_));
1854
    } else if (OB_FAIL(session.get_query_timeout(query_timeout))) {
1855
      LOG_WARN("fail to get query timeout", K(ret));
1856
    } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
1857
                session.get_effective_tenant_id(), tenant_version))) {
1858
      LOG_WARN("fail get tenant broadcast version", K(ret));
1859
    } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
1860
                OB_SYS_TENANT_ID, sys_version))) {
1861
      LOG_WARN("fail get tenant broadcast version", K(ret));
1862
    } else if (pkt.exist_trace_info()
1863
               && OB_FAIL(session.update_sys_variable(SYS_VAR_OB_TRACE_INFO,
1864
                                                      pkt.get_trace_info()))) {
1865
      LOG_WARN("fail to update trace info", K(ret));
1866
    } else if (FALSE_IT(session.set_txn_free_route(pkt.txn_free_route()))) {
1867
    } else if (OB_FAIL(process_extra_info(session, pkt, need_response_error))) {
1868
      LOG_WARN("fail get process extra info", K(ret));
1869
    } else if (FALSE_IT(session.post_sync_session_info())) {
1870
    } else if (OB_UNLIKELY(packet_len > session.get_max_packet_size())) {
1871
      //packet size check with session variable max_allowd_packet or net_buffer_length
1872
      ret = OB_ERR_NET_PACKET_TOO_LARGE;
1873
      LOG_WARN("packet too large than allowed for the session", K_(stmt_id), K(ret));
1874
    } else if (OB_FAIL(sql::ObFLTUtils::init_flt_info(pkt.get_extra_info(), session,
1875
                            conn->proxy_cap_flags_.is_full_link_trace_support()))) {
1876
      LOG_WARN("failed to init flt extra info", K(ret));
1877
    } else if (OB_FAIL(session.gen_configs_in_pc_str())) {
1878
      LOG_WARN("fail to generate configuration string that can influence execution plan", K(ret));
1879
    } else {
1880
      FLTSpanGuard(ps_execute);
1881
      FLT_SET_TAG(log_trace_id, ObCurTraceId::get_trace_id_str(),
1882
                    receive_ts, get_receive_timestamp(),
1883
                    client_info, session.get_client_info(),
1884
                    module_name, session.get_module_name(),
1885
                    action_name, session.get_action_name(),
1886
                    sess_id, session.get_sessid());
1887
      THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
1888
      retry_ctrl_.set_tenant_global_schema_version(tenant_version);
1889
      retry_ctrl_.set_sys_global_schema_version(sys_version);
1890
      session.partition_hit().reset();
1891
      session.set_pl_can_retry(true);
1892
      ObLockWaitNode &lock_wait_node  = req_->get_lock_wait_node();
1893
      lock_wait_node.set_session_info(session.get_sessid());
1894

1895
      need_response_error = false;
1896
      need_disconnect = false;
1897
      ret = process_execute_stmt(ObMultiStmtItem(false, 0, ObString()),
1898
                                 session,
1899
                                 false, // has_mode
1900
                                 false, // force_sync_resp
1901
                                 async_resp_used);
1902

1903
      // 退出前打印出SQL语句,便于定位各种问题
1904
      if (OB_FAIL(ret)) {
1905
        if (OB_EAGAIN == ret) {
1906
          //large query, do nothing
1907
        } else if (is_conn_valid()) {// The memory of sql string is invalid if conn_valid_ has been set false.
1908
          LOG_WARN("fail execute sql", "sql_id", ctx_.sql_id_, K_(stmt_id), K(ret));
1909
        } else {
1910
          LOG_WARN("fail execute sql", K(ret));
1911
        }
1912
      }
1913
    }
1914
    session.check_and_reset_retry_info(*cur_trace_id, THIS_WORKER.need_retry());
1915
    session.set_last_trace_id(ObCurTraceId::get_trace_id());
1916

1917
    if (!retry_ctrl_.need_retry()) {
1918
      // if no retry would be performed any more, clear the piece cache
1919
      ObPieceCache *piece_cache = nullptr;
1920
      int upper_scope_ret = ret;
1921
      ret = OB_SUCCESS;
1922
      piece_cache = session.get_piece_cache();
1923
      if (OB_NOT_NULL(piece_cache)) {
1924
        for (uint64_t i = 0; OB_SUCC(ret) && i < params_num_; i++) {
1925
          if (OB_FAIL(piece_cache->remove_piece(
1926
                  piece_cache->get_piece_key(stmt_id_, i), session))) {
1927
            if (OB_HASH_NOT_EXIST == ret) {
1928
              ret = OB_SUCCESS;
1929
              LOG_INFO("piece hash not exist", K(ret), K(stmt_id_), K(i));
1930
            } else {
1931
              need_disconnect = true;
1932
              LOG_WARN("remove piece fail", K(ret), K(need_disconnect), K(stmt_id_), K(i));
1933
            }
1934
          }
1935
        }
1936
      } else {
1937
        LOG_DEBUG("piece_cache_ is null");
1938
      }
1939
      ret = upper_scope_ret;
1940
    }
1941

1942
    record_flt_trace(session);
1943
  }
1944

1945
  if (OB_NOT_NULL(sess) && !sess->get_in_transaction()) {
1946
    // transcation ends, end trace
1947
    FLT_END_TRACE();
1948
  }
1949

1950
  if (OB_FAIL(ret) && is_conn_valid()) {
1951
    if (need_response_error) {
1952
      send_error_packet(ret, NULL, (void *)(ctx_.get_reroute_info()));
1953
    }
1954
    if (need_disconnect) {
1955
      force_disconnect();
1956
      LOG_WARN("disconnect connection when process query", K(ret));
1957
    }
1958
  }
1959

1960
  // 如果已经异步回包,则这部分逻辑在cb中执行,这里跳过flush_buffer()
1961
  if (!THIS_WORKER.need_retry()) {
1962
    if (async_resp_used) {
1963
      async_resp_used_ = true;
1964
      packet_sender_.disable_response();
1965
    } else {
1966
      flush_ret = flush_buffer(true);
1967
    }
1968
  } else {
1969
    need_retry_ = true;
1970
  }
1971

1972
  THIS_WORKER.set_session(NULL);
1973
  if (sess != NULL) {
1974
    revert_session(sess); //current ignore revert session ret
1975
  }
1976

1977
  return (OB_SUCCESS != ret) ? ret : flush_ret;
1978
}
1979

1980
int ObMPStmtExecute::get_udt_by_name(ObString relation_name,
1981
                                     ObString type_name,
1982
                                     const share::schema::ObUDTTypeInfo *&udt_info)
1983
{
1984
  int ret = OB_SUCCESS;
1985
  ObString new_relation_name;
1986
  CK (OB_NOT_NULL(ctx_.schema_guard_));
1987
  CK (OB_NOT_NULL(ctx_.session_info_));
1988
  if (OB_SUCC(ret)) {
1989
    if (relation_name.empty()) {
1990
      if (ctx_.session_info_->get_database_name().empty()) {
1991
        ret = OB_ERR_NO_DB_SELECTED;
1992
        LOG_WARN("no select no database", K(ret));
1993
      } else {
1994
        new_relation_name = ctx_.session_info_->get_database_name();
1995
      }
1996
    } else {
1997
      new_relation_name = relation_name;
1998
    }
1999
  }
2000
  if (OB_SUCC(ret)) {
2001
    uint64_t database_id = OB_INVALID_ID;
2002
    OZ (ctx_.schema_guard_->get_database_id(ctx_.session_info_->get_effective_tenant_id(),
2003
                                            new_relation_name,
2004
                                            database_id));
2005
    CK (OB_LIKELY(OB_INVALID_ID != database_id));
2006
    OZ (ctx_.schema_guard_->get_udt_info(ctx_.session_info_->get_effective_tenant_id(),
2007
                                         database_id,
2008
                                         OB_INVALID_ID,
2009
                                         type_name,
2010
                                         udt_info));
2011
    if (OB_ISNULL(udt_info)) {
2012
      // 尝试下是不是系统type
2013
      if (relation_name.empty()
2014
          || relation_name.case_compare("oceanbase")
2015
          || relation_name.case_compare("sys")) {
2016
        OZ (ctx_.schema_guard_->get_udt_info(OB_SYS_TENANT_ID,
2017
                                             OB_SYS_DATABASE_ID,
2018
                                             OB_INVALID_ID,
2019
                                             type_name,
2020
                                             udt_info));
2021
      }
2022
    }
2023
    if (OB_SUCC(ret) && OB_ISNULL(udt_info)) {
2024
      ret = OB_ENTRY_NOT_EXIST;
2025
      LOG_WARN("udt not exist", K(ret), K(relation_name), K(type_name));
2026
    }
2027
  }
2028
  return ret;
2029
}
2030

2031
int ObMPStmtExecute::get_package_type_by_name(ObIAllocator &allocator,
2032
                                              const TypeInfo *type_info,
2033
                                              const pl::ObUserDefinedType *&pl_type)
2034
{
2035
  int ret = OB_SUCCESS;
2036
  const share::schema::ObPackageInfo *package_info = NULL;
2037
  int64_t compatible_mode = lib::is_oracle_mode() ? COMPATIBLE_ORACLE_MODE
2038
                                                  : COMPATIBLE_MYSQL_MODE;
2039
  ObSchemaChecker schema_checker;
2040
  CK (OB_NOT_NULL(type_info));
2041
  CK (OB_NOT_NULL(ctx_.schema_guard_));
2042
  CK (OB_NOT_NULL(ctx_.session_info_));
2043
  CK (OB_NOT_NULL(ctx_.session_info_->get_pl_engine()));
2044
  if (OB_SUCC(ret) && OB_ISNULL(pl_type ))
2045
  OZ (schema_checker.init(*ctx_.schema_guard_, ctx_.session_info_->get_sessid()));
2046
  OZ (schema_checker.get_package_info(ctx_.session_info_->get_effective_tenant_id(),
2047
                                      type_info->relation_name_,
2048
                                      type_info->package_name_,
2049
                                      share::schema::PACKAGE_TYPE,
2050
                                      compatible_mode,
2051
                                      package_info));
2052
  CK (OB_NOT_NULL(package_info));
2053
  if (OB_SUCC(ret)) {
2054
    pl::ObPLPackageManager &package_manager
2055
      = ctx_.session_info_->get_pl_engine()->get_package_manager();
2056
    pl::ObPLPackageGuard package_guard(ctx_.session_info_->get_effective_tenant_id());
2057
    pl::ObPLResolveCtx resolve_ctx(allocator,
2058
                                   *(ctx_.session_info_),
2059
                                   *(ctx_.schema_guard_),
2060
                                   package_guard,
2061
                                   *(GCTX.sql_proxy_),
2062
                                   false);
2063
    OZ (package_manager.get_package_type(
2064
      resolve_ctx, package_info->get_package_id(), type_info->type_name_, pl_type));
2065
    CK (OB_NOT_NULL(pl_type));
2066
  }
2067
  return ret;
2068
}
2069

2070
int ObMPStmtExecute::get_pl_type_by_type_info(ObIAllocator &allocator,
2071
                                              const TypeInfo *type_info,
2072
                                              const pl::ObUserDefinedType *&pl_type)
2073
{
2074
  int ret = OB_SUCCESS;
2075
#ifndef OB_BUILD_ORACLE_PL
2076
  UNUSEDx(allocator, type_info, pl_type);
2077
  ret = OB_NOT_SUPPORTED;
2078
  LOG_WARN("not support", K(ret));
2079
  LOG_USER_ERROR(OB_NOT_SUPPORTED, "Get PL type by type info is not supported in CE version");
2080
#else
2081
  const share::schema::ObUDTTypeInfo *udt_info = NULL;
2082
  if (OB_ISNULL(type_info)) {
2083
    ret = OB_ERR_UNEXPECTED;
2084
    LOG_WARN("type info is null", K(ret), K(type_info));
2085
  } else if (!type_info->is_elem_type_) {
2086
    if (type_info->package_name_.empty()) {
2087
      OZ (get_udt_by_name(type_info->relation_name_, type_info->type_name_, udt_info));
2088
      OZ (udt_info->transform_to_pl_type(allocator, pl_type));
2089
    } else {
2090
      OZ (get_package_type_by_name(allocator, type_info, pl_type));
2091
    }
2092
  } else {
2093
    void *ptr = NULL;
2094
    pl::ObNestedTableType *table_type = NULL;
2095
    pl::ObPLDataType elem_type;
2096
    const pl::ObUserDefinedType *elem_type_ptr = NULL;
2097
    if (type_info->elem_type_.get_obj_type() != ObExtendType) {
2098
      elem_type.set_data_type(type_info->elem_type_);
2099
    } else if (OB_FAIL(get_udt_by_name(type_info->relation_name_, type_info->type_name_, udt_info))) {
2100
      LOG_WARN("failed to get udt info", K(ret), K(type_info->relation_name_), K(type_info->type_name_));
2101
    } else if (OB_FAIL(udt_info->transform_to_pl_type(allocator, elem_type_ptr))) {
2102
      LOG_WARN("failed to transform udt to pl type", K(ret));
2103
    } else if (OB_ISNULL(elem_type_ptr)) {
2104
      ret = OB_ERR_UNEXPECTED;
2105
      LOG_WARN("failed to get elem type ptr", K(ret));
2106
    } else {
2107
      elem_type = *(static_cast<const pl::ObPLDataType*>(elem_type_ptr));
2108
    }
2109
    if (OB_FAIL(ret)) {
2110
    } else if (OB_ISNULL(ptr = allocator.alloc(sizeof(pl::ObNestedTableType)))) {
2111
      ret = OB_ALLOCATE_MEMORY_FAILED;
2112
      LOG_WARN("failed to allocate memory for ObNestedTableType", K(ret));
2113
    } else {
2114
      table_type = new(ptr)pl::ObNestedTableType();
2115
      table_type->set_type_from(pl::ObPLTypeFrom::PL_TYPE_LOCAL);
2116
      table_type->set_element_type(elem_type);
2117
      pl_type = table_type;
2118
    }
2119
  }
2120
  CK (OB_NOT_NULL(pl_type));
2121
#endif
2122
  return ret;
2123
}
2124

2125
int ObMPStmtExecute::parse_complex_param_value(ObIAllocator &allocator,
2126
                                               const ObCharsetType charset,
2127
                                               const ObCollationType cs_type,
2128
                                               const ObCollationType ncs_type,
2129
                                               const char *&data,
2130
                                               const common::ObTimeZoneInfo *tz_info,
2131
                                               TypeInfo *type_info,
2132
                                               ObObjParam &param)
2133
{
2134
  int ret = OB_SUCCESS;
2135
  const pl::ObUserDefinedType *pl_type = NULL;
2136
  int64_t param_size = 0, param_pos = 0;
2137
  CK (OB_NOT_NULL(type_info));
2138
  OZ (get_pl_type_by_type_info(allocator, type_info, pl_type));
2139
  CK (OB_NOT_NULL(pl_type));
2140
  OZ (pl_type->init_obj(*(ctx_.schema_guard_), allocator, param, param_size));
2141
  OX (param.set_udt_id(pl_type->get_user_type_id()));
2142
  OZ (pl_type->deserialize(*(ctx_.schema_guard_), allocator, charset, cs_type, ncs_type,
2143
        tz_info, data, reinterpret_cast<char *>(param.get_ext()), param_size, param_pos));
2144
  OX (param.set_need_to_check_extend_type(true));
2145
  return ret;
2146
}
2147

2148
int ObMPStmtExecute::parse_basic_param_value(ObIAllocator &allocator,
2149
                                             const uint32_t type,
2150
                                             const ObCharsetType charset,
2151
                                             const ObCharsetType ncharset,
2152
                                             const ObCollationType cs_type,
2153
                                             const ObCollationType ncs_type,
2154
                                             const char *& data,
2155
                                             const common::ObTimeZoneInfo *tz_info,
2156
                                             ObObj &param,
2157
                                             bool is_complex_element,
2158
                                             ObPSAnalysisChecker *checker,
2159
                                             bool is_unsigned)
2160
{
2161
  int ret = OB_SUCCESS;
2162
  UNUSED(charset);
2163
  switch(type) {
2164
    case MYSQL_TYPE_TINY:
2165
    case MYSQL_TYPE_SHORT:
2166
    case MYSQL_TYPE_LONG:
2167
    case MYSQL_TYPE_LONGLONG: {
2168
      if (OB_FAIL(parse_integer_value(type, data, param, allocator, is_complex_element, checker, is_unsigned))) {
2169
        LOG_WARN("parse integer value from client failed", K(ret));
2170
      }
2171
      break;
2172
    }
2173
    case MYSQL_TYPE_FLOAT: {
2174
      float value = 0;
2175
      PS_STATIC_DEFENSE_CHECK(checker, sizeof(value))
2176
      {
2177
        MEMCPY(&value, data, sizeof(value));
2178
        data += sizeof(value);
2179
        param.set_float(value);
2180
      }
2181
      break;
2182
    }
2183
    case MYSQL_TYPE_ORA_BINARY_FLOAT: {
2184
      float value = 0;
2185
      PS_STATIC_DEFENSE_CHECK(checker, sizeof(value))
2186
      {
2187
        MEMCPY(&value, data, sizeof(value));
2188
        data += sizeof(value);
2189
        param.set_float(value);
2190
      }
2191
      break;
2192
    }
2193
    case MYSQL_TYPE_DOUBLE: {
2194
      double value = 0;
2195
      PS_STATIC_DEFENSE_CHECK(checker, sizeof(value))
2196
      {
2197
        MEMCPY(&value, data, sizeof(value));
2198
        data += sizeof(value);
2199
        if (lib::is_mysql_mode()) {
2200
          param.set_double(value);
2201
        } else {
2202
          char *buf = NULL;
2203
          int64_t buf_len = 0;
2204
          number::ObNumber nb;
2205
          const int64_t alloc_size = OB_MAX_DOUBLE_FLOAT_DISPLAY_WIDTH;
2206
          if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(alloc_size)))) {
2207
            ret = OB_ALLOCATE_MEMORY_FAILED;
2208
            LOG_WARN("failed to allocate memory", K(ret));
2209
          } else if (FALSE_IT(buf_len = ob_gcvt_strict(value, OB_GCVT_ARG_DOUBLE, alloc_size,
2210
                                                      buf, NULL, TRUE/*is_oracle_mode*/,
2211
                                                      FALSE/*is_binary_double*/, FALSE))) {
2212
          } else if (OB_FAIL(nb.from_sci_opt(buf, buf_len, allocator))) {
2213
            LOG_WARN("decode double param to number failed", K(ret));
2214
          } else {
2215
            param.set_number(nb);
2216
          }
2217
        }
2218
      }
2219
      break;
2220
    }
2221
    case MYSQL_TYPE_ORA_BINARY_DOUBLE: {
2222
      double value = 0;
2223
      PS_STATIC_DEFENSE_CHECK(checker, sizeof(value))
2224
      {
2225
        MEMCPY(&value, data, sizeof(value));
2226
        data += sizeof(value);
2227
        param.set_double(value);
2228
      }
2229
      break;
2230
    }
2231
    case MYSQL_TYPE_YEAR: {
2232
      int16_t value = 0;
2233
      PS_STATIC_DEFENSE_CHECK(checker, 2)
2234
      {
2235
        ObMySQLUtil::get_int2(data, value);
2236
        param.set_year(static_cast<uint8_t>(value));
2237
      }
2238
      break;
2239
    }
2240
    case MYSQL_TYPE_DATE:
2241
    case MYSQL_TYPE_DATETIME:
2242
    case MYSQL_TYPE_TIMESTAMP: {
2243
      if (OB_FAIL(parse_mysql_timestamp_value(static_cast<EMySQLFieldType>(type), data,
2244
                                              param, tz_info, checker))) {
2245
        LOG_WARN("parse timestamp value from client failed", K(ret));
2246
      }
2247
      break;
2248
    }
2249
    case MYSQL_TYPE_TIME:{
2250
      if (OB_FAIL(parse_mysql_time_value(data, param, checker))) {
2251
        LOG_WARN("parse timestamp value from client failed", K(ret));
2252
      }
2253
      break;
2254
    }
2255
    case MYSQL_TYPE_OB_TIMESTAMP_WITH_TIME_ZONE:
2256
    case MYSQL_TYPE_OB_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
2257
    case MYSQL_TYPE_OB_TIMESTAMP_NANO: {
2258
      ObTimeConvertCtx cvrt_ctx(tz_info, true);
2259
      if (OB_FAIL(parse_oracle_timestamp_value(
2260
                            static_cast<EMySQLFieldType>(type), data, cvrt_ctx, param, checker))) {
2261
        LOG_WARN("parse timestamp value from client failed", K(ret));
2262
      }
2263
      break;
2264
    }
2265
    case MYSQL_TYPE_OB_NVARCHAR2:
2266
    case MYSQL_TYPE_OB_NCHAR:
2267
    case MYSQL_TYPE_OB_RAW:
2268
    case MYSQL_TYPE_TINY_BLOB:
2269
    case MYSQL_TYPE_MEDIUM_BLOB:
2270
    case MYSQL_TYPE_LONG_BLOB:
2271
    case MYSQL_TYPE_BLOB:
2272
    case MYSQL_TYPE_STRING:
2273
    case MYSQL_TYPE_VARCHAR:
2274
    case MYSQL_TYPE_VAR_STRING:
2275
    case MYSQL_TYPE_OB_NUMBER_FLOAT:
2276
    case MYSQL_TYPE_NEWDECIMAL:
2277
    case MYSQL_TYPE_OB_UROWID:
2278
    case MYSQL_TYPE_ORA_BLOB:
2279
    case MYSQL_TYPE_ORA_CLOB:
2280
    case MYSQL_TYPE_JSON:
2281
    case MYSQL_TYPE_GEOMETRY: {
2282
      ObString str;
2283
      ObString dst;
2284
      uint64_t length = 0;
2285
      ObCollationType cur_cs_type = ObCharset::get_default_collation(charset);
2286
      ObCollationType cur_ncs_type = ObCollationType::CS_TYPE_INVALID;
2287
      if (ncharset == ObCharsetType::CHARSET_INVALID || ncharset == ObCharsetType::CHARSET_BINARY) {
2288
        cur_ncs_type = ObCharset::get_default_collation(charset);
2289
      } else {
2290
        cur_ncs_type = ObCharset::get_default_collation(ncharset);
2291
      }
2292
      if (OB_FAIL(ObMySQLUtil::get_length(data, length))) {
2293
        LOG_ERROR("decode varchar param value failed", K(ret));
2294
      } else {
2295
        PS_STATIC_DEFENSE_CHECK(checker, length)
2296
        {
2297
          str.assign_ptr(data, static_cast<ObString::obstr_size_t>(length));
2298
        }
2299
        if (OB_FAIL(ret)) {
2300
        } else if (length > OB_MAX_LONGTEXT_LENGTH) {
2301
          ret = OB_ERR_INVALID_INPUT_ARGUMENT;
2302
          LOG_WARN("input param len is over size", K(ret), K(length));
2303
        } else if (MYSQL_TYPE_OB_NVARCHAR2 == type
2304
                  || MYSQL_TYPE_OB_NCHAR == type) {
2305
          OZ(copy_or_convert_str(allocator, cur_ncs_type, ncs_type, str, dst));
2306
          if (OB_SUCC(ret)) {
2307
            MYSQL_TYPE_OB_NVARCHAR2 == type ? param.set_nvarchar2(dst)
2308
                                            : param.set_nchar(dst);
2309
            param.set_collation_type(ncs_type);
2310
          }
2311
          LOG_DEBUG("recieve Nchar param", K(ret), K(str), K(dst));
2312
        } else if (ObURowIDType == type) {
2313
          // decode bae64 str and get urowid content
2314
          ObURowIDData urowid_data;
2315
          if (OB_FAIL(ObURowIDData::decode2urowid(str.ptr(), str.length(),
2316
                                                  allocator, urowid_data))) {
2317
            LOG_WARN("failed to decode to urowid", K(ret));
2318
            if (OB_INVALID_ROWID == ret) {
2319
              LOG_USER_ERROR(OB_INVALID_ROWID);
2320
            }
2321
          } else {
2322
            param.set_urowid(urowid_data);
2323
          }
2324
        } else {
2325
          bool is_lob_v1 = false;
2326
          if (MYSQL_TYPE_STRING == type
2327
              || MYSQL_TYPE_VARCHAR == type
2328
              || MYSQL_TYPE_VAR_STRING == type
2329
              || MYSQL_TYPE_ORA_CLOB == type
2330
              || MYSQL_TYPE_JSON == type
2331
              || MYSQL_TYPE_GEOMETRY == type) {
2332
            int64_t extra_len = 0;
2333
            if (MYSQL_TYPE_ORA_CLOB == type) {
2334
              ObLobLocatorV2 lob(str);
2335
              if (lob.is_lob_locator_v1()) {
2336
                is_lob_v1 = true;
2337
                const ObLobLocator &lobv1 = *(reinterpret_cast<const ObLobLocator *>(str.ptr()));
2338
                if (OB_UNLIKELY(! lobv1.is_valid() || lobv1.get_total_size() != length)) {
2339
                  ret = OB_ERR_UNEXPECTED;
2340
                  LOG_WARN("got invalid ps lob param", K(length), K(lobv1.magic_code_), K(lobv1.table_id_),
2341
                            K(lobv1.column_id_), K(lobv1.payload_offset_), K(lobv1.payload_size_),
2342
                            K(type), K(cs_type), K(lobv1.get_total_size()), K(lobv1.get_data_length()));
2343
                } else {
2344
                  extra_len = str.length() - reinterpret_cast<const ObLobLocator *>(str.ptr())->payload_size_;
2345
                }
2346
              } else {
2347
                if (!lob.is_valid()) {
2348
                  ret = OB_ERR_UNEXPECTED;
2349
                  LOG_WARN("got invalid ps lob param", K(length), K(lob), K(type), K(cs_type));
2350
                } // if INROW, does it need to do copy_or_convert_str?
2351
              }
2352
            }
2353
            if (is_lob_v1 || MYSQL_TYPE_ORA_CLOB != type) {
2354
              OZ(copy_or_convert_str(allocator,
2355
                                  cur_cs_type,
2356
                                  cs_type,
2357
                                  ObString(str.length() - extra_len, str.ptr() + extra_len),
2358
                                  dst,
2359
                                  extra_len));
2360
            }
2361
            if (OB_SUCC(ret) && MYSQL_TYPE_ORA_CLOB == type) {
2362
              if (is_lob_v1) {
2363
                // copy lob header
2364
                dst.assign_ptr(dst.ptr() - extra_len, dst.length() + extra_len);
2365
                MEMCPY(dst.ptr(), str.ptr(), extra_len);
2366
                reinterpret_cast<ObLobLocator *>(dst.ptr())->payload_size_ = dst.length() - extra_len;
2367
              } else {
2368
                if (OB_FAIL(ob_write_string(allocator, str, dst))) {
2369
                  LOG_WARN("Failed to write str", K(ret));
2370
                }
2371
              }
2372
            }
2373
          } else if (OB_FAIL(ob_write_string(allocator, str, dst))) {
2374
            LOG_WARN("Failed to write str", K(ret));
2375
          }
2376
          if (OB_SUCC(ret)) {
2377
            if (MYSQL_TYPE_NEWDECIMAL == type) {
2378
              number::ObNumber nb;
2379
              if (OB_FAIL(nb.from(str.ptr(), length, allocator))) {
2380
                LOG_WARN("decode varchar param to number failed", K(ret), K(str));
2381
              } else {
2382
                param.set_number(nb);
2383
              }
2384
            } else if (MYSQL_TYPE_OB_NUMBER_FLOAT == type) {
2385
              number::ObNumber nb;
2386
              if (OB_FAIL(nb.from(str.ptr(), length, allocator))) {
2387
                LOG_WARN("decode varchar param to number failed", K(ret), K(str));
2388
              } else {
2389
                param.set_number_float(nb);
2390
              }
2391
            } else if (MYSQL_TYPE_OB_RAW == type) {
2392
              param.set_raw(dst);
2393
            } else if (MYSQL_TYPE_ORA_BLOB == type
2394
                      || MYSQL_TYPE_ORA_CLOB == type) {
2395
              if (MYSQL_TYPE_ORA_BLOB == type) {
2396
                param.set_collation_type(CS_TYPE_BINARY);
2397
              } else {
2398
                param.set_collation_type(cs_type);
2399
              }
2400
              ObLobLocatorV2 lobv2(str);
2401
              if (lobv2.is_lob_locator_v1()) {
2402
                const ObLobLocator &lob = *(reinterpret_cast<const ObLobLocator *>(dst.ptr()));
2403
                if (!IS_CLUSTER_VERSION_BEFORE_4_1_0_0 && lob.get_payload_length() == 0) {
2404
                  // do convert empty lob v1 to v2
2405
                  ObString payload;
2406
                  if (OB_FAIL(lob.get_payload(payload))) {
2407
                    LOG_WARN("fail to get payload", K(ret), K(lob));
2408
                  } else {
2409
                    param.set_lob_value(ObLongTextType, payload.ptr(), payload.length());
2410
                    if (OB_FAIL(ObTextStringResult::ob_convert_obj_temporay_lob(param, allocator))) {
2411
                      LOG_WARN("Fail to convert plain lob data to templob",K(ret), K(payload));
2412
                    } else {
2413
                      LOG_TRACE("convert empty lob v1 to v2", K(lob), K(cs_type), K(type));
2414
                    }
2415
                  }
2416
                } else {
2417
                  param.set_lob_locator(lob);
2418
                  param.set_has_lob_header();
2419
                  LOG_TRACE("get lob locator", K(lob), K(cs_type), K(type));
2420
                }
2421
              } else {
2422
                param.set_lob_value(ObLongTextType, dst.ptr(), dst.length());
2423
                param.set_has_lob_header();
2424
                LOG_TRACE("get lob locator v2", K(lobv2), K(cs_type), K(type));
2425
              }
2426
            } else if (MYSQL_TYPE_TINY_BLOB == type
2427
                      || MYSQL_TYPE_MEDIUM_BLOB == type
2428
                      || MYSQL_TYPE_BLOB == type
2429
                      || MYSQL_TYPE_LONG_BLOB == type
2430
                      || MYSQL_TYPE_JSON == type
2431
                      || MYSQL_TYPE_GEOMETRY == type) {
2432
              // in ps protocol:
2433
              //    Oracle mode: client driver will call hextoraw()
2434
              //    MySQL mode: no need to call hextoraw
2435
              // in text protocol:
2436
              //    Oracle mode: server will call hextoraw()
2437
              //    MySQL mode: no need to call hextoraw
2438
              // Notice: text tc without lob header here, should not set has_lob_header flag here
2439
              param.set_collation_type(cs_type);
2440
              if (MYSQL_TYPE_TINY_BLOB == type) {
2441
                param.set_lob_value(ObTinyTextType, dst.ptr(), dst.length());
2442
              } else if (MYSQL_TYPE_MEDIUM_BLOB == type) {
2443
                param.set_lob_value(ObMediumTextType, dst.ptr(), dst.length());
2444
              } else if (MYSQL_TYPE_BLOB == type) {
2445
                param.set_lob_value(ObTextType, dst.ptr(), dst.length());
2446
              } else if (MYSQL_TYPE_LONG_BLOB == type) {
2447
                param.set_lob_value(ObLongTextType, dst.ptr(), dst.length());
2448
              } else if (MYSQL_TYPE_JSON == type) {
2449
                param.set_json_value(ObJsonType, dst.ptr(), dst.length());
2450
              } else if (MYSQL_TYPE_GEOMETRY == type) {
2451
                param.set_geometry_value(ObGeometryType, dst.ptr(), dst.length());
2452
              }
2453
              if (OB_SUCC(ret) && param.is_lob_storage() && dst.length() > 0) {
2454
                if (OB_FAIL(ObTextStringResult::ob_convert_obj_temporay_lob(param, allocator))) {
2455
                  LOG_WARN("Fail to convert plain lob data to templob",K(ret));
2456
                }
2457
              }
2458
            } else {
2459
              param.set_collation_type(cs_type);
2460
              if (is_oracle_mode() && !is_complex_element) {
2461
                param.set_char(dst);
2462
              } else {
2463
                if (is_complex_element && dst.length()== 0) {
2464
                  param.set_null();
2465
                } else {
2466
                  param.set_varchar(dst);
2467
                }
2468
              }
2469
            }
2470
          }
2471
        }
2472
      }
2473
      data += length;
2474
      break;
2475
    }
2476
    case MYSQL_TYPE_OB_INTERVAL_YM: {
2477
      if (OB_FAIL(parse_oracle_interval_ym_value(data, param, checker))) {
2478
        LOG_WARN("failed to parse oracle interval year to month value", K(ret));
2479
      }
2480
      break;
2481
    }
2482
    case MYSQL_TYPE_OB_INTERVAL_DS:{
2483
      if (OB_FAIL(parse_oracle_interval_ds_value(data, param, checker))) {
2484
        LOG_WARN("failed to parse oracle interval year to month value", K(ret));
2485
      }
2486
      break;
2487
    }
2488
    default: {
2489
      LOG_USER_ERROR(OB_ERR_ILLEGAL_TYPE, type);
2490
      ret = OB_ERR_ILLEGAL_TYPE;
2491
      break;
2492
    }
2493
  }
2494
  if (OB_SUCC(ret) && lib::is_mysql_mode()) {
2495
    param.set_collation_level(CS_LEVEL_COERCIBLE);
2496
  }
2497
  return ret;
2498
}
2499

2500
int ObMPStmtExecute::parse_param_value(ObIAllocator &allocator,
2501
                                       const uint32_t type,
2502
                                       const ObCharsetType charset,
2503
                                       const ObCharsetType ncharset,
2504
                                       const ObCollationType cs_type,
2505
                                       const ObCollationType ncs_type,
2506
                                       const char *&data,
2507
                                       const common::ObTimeZoneInfo *tz_info,
2508
                                       TypeInfo *type_info,
2509
                                       ObObjParam &param,
2510
                                       const char *bitmap,
2511
                                       int64_t param_id)
2512
{
2513
  int ret = OB_SUCCESS;
2514
  uint64_t length = 0;
2515
  uint64_t count = 1;
2516
  common::ObFixedArray<ObSqlString, ObIAllocator>
2517
                str_buf(THIS_WORKER.get_sql_arena_allocator());
2518
  ObPieceCache *piece_cache =
2519
      NULL == ctx_.session_info_ ? NULL : ctx_.session_info_->get_piece_cache();
2520
  ObPiece *piece = NULL;
2521
  if (OB_NOT_NULL(piece_cache) && OB_FAIL(piece_cache->get_piece(stmt_id_, param_id, piece))) {
2522
    ret = OB_ERR_UNEXPECTED;
2523
    LOG_WARN("get piece fail.", K(ret));
2524
  } else if (OB_ISNULL(piece_cache) || OB_ISNULL(piece)) {
2525
    // send piece data will init piece cache
2526
    // if piece cache is null, it must not be send piece protocol
2527
    bool is_null = ObSMUtils::update_from_bitmap(param, bitmap, param_id);
2528
    if (is_null) {
2529
      LOG_DEBUG("param is null", K(param_id), K(param), K(type));
2530
    } else if (OB_UNLIKELY(MYSQL_TYPE_COMPLEX == type)) {
2531
      if (OB_FAIL(parse_complex_param_value(allocator, charset, cs_type, ncs_type,
2532
                                            data, tz_info, type_info,
2533
                                            param))) {
2534
        LOG_WARN("failed to parse complex value", K(ret));
2535
      }
2536
    } else if (OB_UNLIKELY(MYSQL_TYPE_CURSOR == type)) {
2537
      CK (OB_NOT_NULL(ctx_.session_info_));
2538
      if (OB_SUCC(ret)) {
2539
        ObPLCursorInfo *cursor = NULL;
2540
        // OZ (ctx_.session_info_->make_cursor(cursor));
2541
        OX (param.set_extend(reinterpret_cast<int64_t>(cursor), PL_CURSOR_TYPE));
2542
        OX (param.set_param_meta());
2543
      }
2544
    } else {
2545
      bool is_unsigned = NULL == type_info || !type_info->elem_type_.get_meta_type().is_unsigned_integer() ? false : true;
2546
      if (OB_FAIL(parse_basic_param_value(allocator, type, charset, ncharset, cs_type, ncs_type,
2547
                                          data, tz_info, param, false, &analysis_checker_, is_unsigned))) {
2548
        LOG_WARN("failed to parse basic param value", K(ret));
2549
      } else {
2550
        param.set_param_meta();
2551
        param.set_length(param.get_val_len());
2552
      }
2553
    }
2554
  } else if (!support_send_long_data(type)) {
2555
    ret = OB_ERR_UNEXPECTED;
2556
    LOG_WARN("this type is not support send long data.", K(type), K(ret));
2557
  } else if (NULL == piece->get_allocator()) {
2558
    ret = OB_ERR_UNEXPECTED;
2559
    LOG_WARN("piece allocator is null.", K(stmt_id_), K(param_id), K(ret));
2560
  } else if (OB_SUCCESS != piece->get_error_ret()) {
2561
    ret = piece->get_error_ret();
2562
    LOG_WARN("send long data has error. ", K(stmt_id_), K(param_id), K(ret));
2563
  } else {
2564
    if (OB_UNLIKELY(MYSQL_TYPE_COMPLEX == type)) {
2565
      // this must be array bounding.
2566
      bool is_null = ObSMUtils::update_from_bitmap(param, bitmap, param_id);
2567
      if (is_null) {
2568
        LOG_DEBUG("param is null", K(param_id), K(param), K(type));
2569
      } else {
2570
        // 1. read count
2571
        ObMySQLUtil::get_length(data, count);
2572
        // 2. make null map
2573
        int64_t bitmap_bytes = ((count + 7) / 8);
2574
        char is_null_map[bitmap_bytes];
2575
        MEMSET(is_null_map, 0, bitmap_bytes);
2576
        length = piece_cache->get_length_length(count) + bitmap_bytes;
2577
        // 3. get string buffer (include lenght + value)
2578
        if (OB_FAIL(str_buf.prepare_allocate(count))) {
2579
          LOG_WARN("prepare fail.");
2580
        } else if (OB_FAIL(piece_cache->get_buffer(stmt_id_,
2581
                                                  param_id,
2582
                                                  count,
2583
                                                  length,
2584
                                                  str_buf,
2585
                                                  is_null_map))) {
2586
          LOG_WARN("piece get buffer fail.", K(ret), K(stmt_id_), K(param_id));
2587
        } else {
2588
          // 4. merge all this info
2589
          char *tmp = static_cast<char*>(piece->get_allocator()->alloc(length));
2590
          int64_t pos = 0;
2591
          if (OB_ISNULL(tmp)) {
2592
            ret = OB_ALLOCATE_MEMORY_FAILED;
2593
            LOG_WARN("failed to alloc memory", K(ret));
2594
          } else if (FALSE_IT(MEMSET(tmp, 0, length))) {
2595
          } else if (OB_FAIL(ObMySQLUtil::store_length(tmp, length, count, pos))) {
2596
            LOG_WARN("store length fail.", K(ret), K(stmt_id_), K(param_id));
2597
          } else {
2598
            MEMCPY(tmp+pos, is_null_map, bitmap_bytes);
2599
            pos += bitmap_bytes;
2600
            for (int64_t i=0; OB_SUCC(ret) && i<count; i++) {
2601
              if (OB_FAIL(ObMySQLUtil::store_obstr(tmp, length, str_buf.at(i).string(), pos))) {
2602
                LOG_WARN("store string fail.", K(ret), K(stmt_id_), K(param_id),
2603
                        K(length), K(pos), K(i), K(str_buf.at(i).string()), K(str_buf.at(i).string().length()),
2604
                        K(str_buf.at(i).length()));
2605
              }
2606
            }
2607
          }
2608
          if (OB_FAIL(ret)) {
2609
            // do nothing.
2610
          } else {
2611
            const char* src = tmp;
2612
            if (OB_FAIL(parse_complex_param_value(allocator, charset, cs_type, ncs_type,
2613
                                                  src, tz_info, type_info,
2614
                                                  param))) {
2615
              LOG_WARN("failed to parse complex value", K(ret));
2616
            }
2617
          }
2618
          piece->get_allocator()->free(tmp);
2619
        }
2620
      }
2621
    } else {
2622
      if (OB_FAIL(str_buf.prepare_allocate(count))) {
2623
        LOG_WARN("prepare fail.");
2624
      } else if (OB_FAIL(piece_cache->get_buffer(stmt_id_,
2625
                                                  param_id,
2626
                                                  count,
2627
                                                  length,
2628
                                                  str_buf,
2629
                                                  NULL))) {
2630
        LOG_WARN("piece get buffer fail.", K(ret), K(stmt_id_), K(param_id));
2631
      } else {
2632
        char *tmp = static_cast<char*>(piece->get_allocator()->alloc(length));
2633
        int64_t pos = 0;
2634
        if (OB_ISNULL(tmp)) {
2635
          ret = OB_ALLOCATE_MEMORY_FAILED;
2636
          LOG_WARN("failed to alloc memory", K(ret));
2637
        } else if (FALSE_IT(MEMSET(tmp, 0, length))) {
2638
        } else if (OB_FAIL(ObMySQLUtil::store_obstr(tmp, length, str_buf.at(0).string(), pos))) {
2639
          LOG_WARN("store string fail.", K(ret), K(stmt_id_), K(param_id));
2640
        } else {
2641
          const char* src = tmp;
2642
          bool is_unsigned = NULL == type_info || !type_info->elem_type_.get_meta_type().is_unsigned_integer() ? false : true;
2643
          if (OB_FAIL(parse_basic_param_value(allocator, type, charset, ncharset, cs_type, ncs_type,
2644
                                              src, tz_info, param, false, NULL ,is_unsigned))) {
2645
            LOG_WARN("failed to parse basic param value", K(ret));
2646
          } else {
2647
            param.set_param_meta();
2648
            param.set_length(param.get_val_len());
2649
          }
2650
        }
2651
        piece->get_allocator()->free(tmp);
2652
      }
2653
    }
2654
  }
2655
  return ret;
2656
}
2657

2658

2659

2660
int ObMPStmtExecute::copy_or_convert_str(common::ObIAllocator &allocator,
2661
                                         const ObCollationType src_type,
2662
                                         const ObCollationType dst_type,
2663
                                         const ObString &src,
2664
                                         ObString &out,
2665
                                         int64_t extra_buf_len /* = 0 */)
2666
{
2667
  int ret = OB_SUCCESS;
2668
  if (!ObCharset::is_valid_collation(src_type) || !ObCharset::is_valid_collation(dst_type)) {
2669
    ret = OB_ERR_UNEXPECTED;
2670
    LOG_WARN("invalid collation", K(ret), K(dst_type));
2671
  } else if (0 == src.length()
2672
             || ObCharset::charset_type_by_coll(src_type)
2673
                == ObCharset::charset_type_by_coll(dst_type)) {
2674
    int64_t len = src.length() + extra_buf_len;
2675
    if (len > 0) {
2676
      char *buf = static_cast<char *>(allocator.alloc(len));
2677
      if (NULL == buf) {
2678
        ret = OB_ALLOCATE_MEMORY_FAILED;
2679
        LOG_WARN("allocate failed", K(ret), K(len));
2680
      } else {
2681
        if (src.length() > 0) {
2682
          MEMCPY(buf + extra_buf_len, src.ptr(), src.length());
2683
        }
2684
        out.assign_ptr(buf + extra_buf_len, src.length());
2685
      }
2686
    } else {
2687
      out.reset();
2688
    }
2689
  } else {
2690
    int64_t maxmb_len = 0;
2691
    OZ(ObCharset::get_mbmaxlen_by_coll(dst_type, maxmb_len));
2692
    const int64_t len = maxmb_len * src.length() + 1 + extra_buf_len;
2693
    uint32_t res_len = 0;
2694
    if (OB_SUCC(ret)) {
2695
      char *buf = static_cast<char *>(allocator.alloc(len));
2696
      if (NULL == buf) {
2697
        ret = OB_ALLOCATE_MEMORY_FAILED;
2698
        LOG_WARN("allocate failed", K(ret), K(len));
2699
      } else {
2700
        ObDataBuffer buf_alloc(buf + extra_buf_len, len - extra_buf_len);
2701
        if (OB_FAIL(ObCharset::charset_convert(buf_alloc,
2702
                                               src,
2703
                                               src_type,
2704
                                               dst_type,
2705
                                               out,
2706
                                               ObCharset::REPLACE_UNKNOWN_CHARACTER))) {
2707
          LOG_WARN("fail to charset convert", K(ret), K(src_type), K(dst_type),
2708
          K(src), K(len), K(extra_buf_len));
2709
        }
2710
      }
2711
    }
2712
  }
2713
  return ret;
2714
}
2715

2716
int ObMPStmtExecute::parse_integer_value(const uint32_t type,
2717
                                         const char *&data,
2718
                                         ObObj &param,
2719
                                         ObIAllocator &allocator,
2720
                                         bool is_complex_element,
2721
                                         ObPSAnalysisChecker *checker,
2722
                                         bool is_unsigned) // oracle unsigned need
2723
{
2724
  int ret = OB_SUCCESS;
2725
  bool cast_to_number = !(lib::is_mysql_mode() || is_complex_element || MYSQL_TYPE_TINY == type);
2726
  int64_t res_val = 0;
2727
  switch(type) {
2728
    case MYSQL_TYPE_TINY: {
2729
      PS_STATIC_DEFENSE_CHECK(checker, 1)
2730
      {
2731
        int8_t value;
2732
        ObMySQLUtil::get_int1(data, value);
2733
        is_unsigned ? param.set_utinyint(value) : param.set_tinyint(value);
2734
      }
2735
      break;
2736
    }
2737
    case MYSQL_TYPE_SHORT: {
2738
      PS_STATIC_DEFENSE_CHECK(checker, 2)
2739
      {
2740
        int16_t value = 0;
2741
        ObMySQLUtil::get_int2(data, value);
2742
        if (!cast_to_number) {
2743
          is_unsigned ? param.set_usmallint(value) : param.set_smallint(value);
2744
        } else {
2745
          res_val = static_cast<int64_t>(value);
2746
          if (is_unsigned) {
2747
            if (((1LL << 16) + res_val) < 1 || res_val > 0xFFFF) {
2748
              ret = OB_DECIMAL_OVERFLOW_WARN;
2749
              LOG_WARN("param is over flower.", K(res_val), K(type), K(ret));
2750
            } else {
2751
              res_val = res_val < 0 ? ((1LL << 16) + res_val) : res_val;
2752
            }
2753
          }
2754
        }
2755
      }
2756
      break;
2757
    }
2758
    case MYSQL_TYPE_LONG: {
2759
      PS_STATIC_DEFENSE_CHECK(checker, 4)
2760
      {
2761
        int32_t value = 0;
2762
        ObMySQLUtil::get_int4(data, value);
2763
        if (!cast_to_number) {
2764
          is_unsigned ? param.set_uint32(value) : param.set_int32(value);
2765
        } else {
2766
          res_val = static_cast<int64_t>(value);
2767
          if (is_unsigned) {
2768
            if (((1LL << 32) + res_val) < 1 || res_val > 0xFFFFFFFF) {
2769
              ret = OB_DECIMAL_OVERFLOW_WARN;
2770
              LOG_WARN("param is over flower.", K(res_val), K(type), K(ret));
2771
            } else {
2772
              res_val = res_val < 0 ? ((1LL << 32) + res_val) : res_val;
2773
            }
2774
          }
2775
        }
2776
      }
2777
      break;
2778
    }
2779
    case MYSQL_TYPE_LONGLONG: {
2780
      PS_STATIC_DEFENSE_CHECK(checker, 8)
2781
      {
2782
        int64_t value = 0;
2783
        ObMySQLUtil::get_int8(data, value);
2784
        if (!cast_to_number) {
2785
          is_unsigned ? param.set_uint(ObUInt64Type, value) : param.set_int(value);
2786
        } else {
2787
          res_val = value;
2788
        }
2789
      }
2790
      break;
2791
    }
2792
    default: {
2793
      ret = OB_ERR_UNEXPECTED;
2794
      LOG_ERROR("unexpected integer type", K(type), K(ret));
2795
      break;
2796
    }
2797
  }
2798
  if (OB_SUCC(ret) && cast_to_number) {
2799
    number::ObNumber nb;
2800
    if (is_unsigned && OB_FAIL(nb.from(static_cast<uint64_t>(res_val), allocator))) {
2801
      LOG_WARN("decode param to number failed", K(ret), K(res_val));
2802
    } else if (!is_unsigned && OB_FAIL(nb.from(static_cast<int64_t>(res_val), allocator))) {
2803
      LOG_WARN("decode param to number failed", K(ret), K(res_val));
2804
    } else {
2805
      param.set_number(nb);
2806
    }
2807
  }
2808
  return ret;
2809
}
2810

2811
int ObMPStmtExecute::parse_mysql_timestamp_value(const EMySQLFieldType field_type,
2812
                                                 const char *&data,
2813
                                                 ObObj &param,
2814
                                                 const common::ObTimeZoneInfo *tz_info,
2815
                                                 ObPSAnalysisChecker *checker)
2816
{
2817
  int ret = OB_SUCCESS;
2818
  int8_t length = 0;
2819
  int16_t year = 0;
2820
  int8_t month = 0;
2821
  int8_t day = 0;
2822
  int8_t hour = 0;
2823
  int8_t min = 0;
2824
  int8_t second = 0;
2825
  int32_t microsecond = 0;
2826
  ObPreciseDateTime value;
2827
  PS_STATIC_DEFENSE_CHECK(checker, 1)
2828
  {
2829
    ObMySQLUtil::get_int1(data, length);
2830
    if (0 == length) {
2831
      value = 0;
2832
    } else if (4 == length) {
2833
      PS_STATIC_DEFENSE_CHECK(checker, 4)
2834
      {
2835
        ObMySQLUtil::get_int2(data, year);
2836
        ObMySQLUtil::get_int1(data, month);
2837
        ObMySQLUtil::get_int1(data, day);
2838
      }
2839
    } else if (7 == length) {
2840
      PS_STATIC_DEFENSE_CHECK(checker, 7)
2841
      {
2842
        ObMySQLUtil::get_int2(data, year);
2843
        ObMySQLUtil::get_int1(data, month);
2844
        ObMySQLUtil::get_int1(data, day);
2845
        ObMySQLUtil::get_int1(data, hour);
2846
        ObMySQLUtil::get_int1(data, min);
2847
        ObMySQLUtil::get_int1(data, second);
2848
      }
2849
    } else if (11 == length) {
2850
      PS_STATIC_DEFENSE_CHECK(checker, 11)
2851
      {
2852
        ObMySQLUtil::get_int2(data, year);
2853
        ObMySQLUtil::get_int1(data, month);
2854
        ObMySQLUtil::get_int1(data, day);
2855
        ObMySQLUtil::get_int1(data, hour);
2856
        ObMySQLUtil::get_int1(data, min);
2857
        ObMySQLUtil::get_int1(data, second);
2858
        ObMySQLUtil::get_int4(data, microsecond);
2859
      }
2860
    } else {
2861
      ret = OB_ERROR;
2862
      LOG_WARN("invalid mysql timestamp value length", K(length));
2863
    }
2864
  }
2865

2866
  if (OB_SUCC(ret)) {
2867
    ObTime ob_time;
2868
    if (0 != length) {
2869
      if (lib::is_oracle_mode()) {
2870
        //oracle mode datetime should not has microsecond
2871
        microsecond = 0;
2872
      }
2873
      ob_time.parts_[DT_YEAR] = year;
2874
      ob_time.parts_[DT_MON] = month;
2875
      ob_time.parts_[DT_MDAY] = day;
2876
      ob_time.parts_[DT_HOUR] = hour;
2877
      ob_time.parts_[DT_MIN] = min;
2878
      ob_time.parts_[DT_SEC] = second;
2879
      ob_time.parts_[DT_USEC] = microsecond;
2880
      if (!ObTimeUtility2::is_valid_date(year, month, day)
2881
          || !ObTimeUtility2::is_valid_time(hour, min, second, microsecond)) {
2882
        ret = OB_INVALID_DATE_FORMAT;
2883
        LOG_WARN("invalid date format", K(ret));
2884
      } else {
2885
        ObTimeConvertCtx cvrt_ctx(NULL, false);
2886
        ob_time.parts_[DT_DATE] = ObTimeConverter::ob_time_to_date(ob_time);
2887
        if (field_type == MYSQL_TYPE_DATE) {
2888
          value = ob_time.parts_[DT_DATE];
2889
        } else if (OB_FAIL(ObTimeConverter::ob_time_to_datetime(ob_time, cvrt_ctx, value))){
2890
          LOG_WARN("convert obtime to datetime failed", K(value), K(year), K(month),
2891
                   K(day), K(hour), K(min), K(second));
2892
        }
2893
      }
2894
    }
2895
  }
2896
  if (OB_SUCC(ret)) {
2897
    if (field_type == MYSQL_TYPE_TIMESTAMP) {
2898
      int64_t ts_value = 0;
2899
      if (OB_FAIL(ObTimeConverter::datetime_to_timestamp(value, tz_info, ts_value))) {
2900
        LOG_WARN("datetime to timestamp failed", K(ret));
2901
      } else {
2902
        param.set_timestamp(ts_value);
2903
      }
2904
    } else if (field_type == MYSQL_TYPE_DATETIME) {
2905
      param.set_datetime(value);
2906
    } else if (field_type == MYSQL_TYPE_DATE) {
2907
      param.set_date(static_cast<int32_t>(value));
2908
    }
2909
  }
2910
  LOG_DEBUG("get datetime", K(length), K(year), K(month), K(day), K(hour), K(min),K(second),  K(microsecond), K(value));
2911
  return ret;
2912
}
2913

2914
int ObMPStmtExecute::parse_oracle_timestamp_value(const obmysql::EMySQLFieldType field_type,
2915
    const char *&data, const ObTimeConvertCtx &cvrt_ctx, ObObj &param, ObPSAnalysisChecker *checker)
2916
{
2917
  int ret = OB_SUCCESS;
2918
  int8_t total_len = 0;
2919
  ObObjType obj_type;
2920
  ObOTimestampData ot_data;
2921
  int8_t scale = -1;
2922
  PS_STATIC_DEFENSE_CHECK(checker, 1)
2923
  {
2924
    ObMySQLUtil::get_int1(data, total_len);
2925
  }
2926
  if (OB_FAIL(ret)) {
2927
  } else if (OB_FAIL(ObSMUtils::get_ob_type(obj_type, field_type))) {
2928
    LOG_WARN("failed to get_ob_type", K(ret));
2929
  } else if (OB_FAIL(ObTimeConverter::decode_otimestamp(obj_type, data, total_len, cvrt_ctx, ot_data, scale))) {
2930
    LOG_WARN("failed to decode_otimestamp", K(ret));
2931
  } else {
2932
    PS_STATIC_DEFENSE_CHECK(checker, total_len)
2933
    {
2934
      data += total_len;
2935
      param.set_otimestamp_value(obj_type, ot_data);
2936
      param.set_scale(scale);
2937
    }
2938
  }
2939
  return ret;
2940
}
2941

2942
int ObMPStmtExecute::parse_mysql_time_value(const char *&data, ObObj &param, ObPSAnalysisChecker *checker)
2943
{
2944
  int ret = OB_SUCCESS;
2945
  int8_t length = 0;
2946
  int8_t is_negative = 0;
2947
  int16_t year = 0;
2948
  int8_t month = 0;
2949
  int32_t day = 0;
2950
  int8_t hour = 0;
2951
  int8_t min = 0;
2952
  int8_t second = 0;
2953
  int32_t microsecond = 0;
2954
  struct tm tmval;
2955
  MEMSET(&tmval, 0, sizeof(tmval));
2956
  int64_t value;
2957
  PS_STATIC_DEFENSE_CHECK(checker, 1)
2958
  {
2959
    ObMySQLUtil::get_int1(data, length);
2960
    if (0 == length) {
2961
      value = 0;
2962
    } else if (8 == length) {
2963
      PS_STATIC_DEFENSE_CHECK(checker, 8)
2964
      {
2965
        ObMySQLUtil::get_int1(data, is_negative);
2966
        ObMySQLUtil::get_int4(data, day);
2967
        ObMySQLUtil::get_int1(data, hour);
2968
        ObMySQLUtil::get_int1(data, min);
2969
        ObMySQLUtil::get_int1(data, second);
2970
      }
2971
    } else if (12 == length) {
2972
      PS_STATIC_DEFENSE_CHECK(checker, 12)
2973
      {
2974
        ObMySQLUtil::get_int1(data, is_negative);
2975
        ObMySQLUtil::get_int4(data, day);
2976
        ObMySQLUtil::get_int1(data, hour);
2977
        ObMySQLUtil::get_int1(data, min);
2978
        ObMySQLUtil::get_int1(data, second);
2979
        ObMySQLUtil::get_int4(data, microsecond);
2980
      }
2981
    } else {
2982
      ret = OB_ERR_UNEXPECTED;
2983
      LOG_ERROR("unexpected time length", K(length), K(ret));
2984
    }
2985

2986
    if (OB_SUCC(ret)) {
2987
      ObTime ob_time;
2988
      if (0 != length) {
2989
        ob_time.parts_[DT_YEAR] = year;
2990
        ob_time.parts_[DT_MON] = month;
2991
        ob_time.parts_[DT_MDAY] = day;
2992
        ob_time.parts_[DT_HOUR] = hour;
2993
        ob_time.parts_[DT_MIN] = min;
2994
        ob_time.parts_[DT_SEC] = second;
2995
        ob_time.parts_[DT_USEC] = microsecond;
2996
        if (!ObTimeUtility2::is_valid_time(hour, min, second, microsecond)) {
2997
          ret = OB_INVALID_DATE_FORMAT;
2998
          LOG_WARN("invalid date format", K(ret));
2999
        } else {
3000
          ob_time.parts_[DT_DATE] = ObTimeConverter::ob_time_to_date(ob_time);
3001
          ob_time.parts_[DT_HOUR] += ob_time.parts_[DT_MDAY] * 24;
3002
          ob_time.parts_[DT_MDAY] = 0;
3003
          value = ObTimeConverter::ob_time_to_time(ob_time);
3004
          if(is_negative) {
3005
            value = -value;
3006
          }
3007
        }
3008
      }
3009
    }
3010
  }
3011
  if (OB_SUCC(ret)) {
3012
    param.set_time(value);
3013
  }
3014
  LOG_INFO("get time", K(length), K(year), K(month), K(day), K(hour), K(min),K(second),  K(microsecond), K(value));
3015
  return ret;
3016
}
3017

3018
int ObMPStmtExecute::parse_oracle_interval_ds_value(const char *&data, ObObj &param, ObPSAnalysisChecker *checker)
3019
{
3020
  int ret = OB_SUCCESS;
3021
  int8_t length = 0;
3022
  ObScale scale = 0;
3023
  ObIntervalDSValue value;
3024

3025
  ObMySQLUtil::get_int1(data, length);
3026
  PS_STATIC_DEFENSE_CHECK(checker, length)
3027
  {
3028
    if (OB_FAIL(ObTimeConverter::decode_interval_ds(data, length, value, scale))) {
3029
      LOG_WARN("fail to decode interval day to second", K(ret), K(length));
3030
    } else {
3031
      param.set_interval_ds(value);
3032
      param.set_scale(scale);
3033
    }
3034
  }
3035

3036
  return ret;
3037
}
3038

3039
int ObMPStmtExecute::parse_oracle_interval_ym_value(const char *&data, ObObj &param, ObPSAnalysisChecker *checker)
3040
{
3041
  int ret = OB_SUCCESS;
3042
  int8_t length = 0;
3043
  ObScale scale = 0;
3044
  ObIntervalYMValue value;
3045

3046
  ObMySQLUtil::get_int1(data, length);
3047
  PS_STATIC_DEFENSE_CHECK(checker, length)
3048
  {
3049
    if (OB_FAIL(ObTimeConverter::decode_interval_ym(data, length, value, scale))) {
3050
      LOG_WARN("fail to decode interval year to month", K(ret), K(length));
3051
    } else {
3052
      param.set_interval_ym(value);
3053
      param.set_scale(scale);
3054
    }
3055
  }
3056

3057
  return ret;
3058
}
3059

3060
void ObMPStmtExecute::record_stat(const stmt::StmtType type, const int64_t end_time) const
3061
{
3062
#define ADD_STMT_STAT(type)                     \
3063
  case stmt::T_##type:                          \
3064
    EVENT_INC(SQL_##type##_COUNT);              \
3065
    EVENT_ADD(SQL_##type##_TIME, time_cost);    \
3066
    break
3067
  if (lib::is_diagnose_info_enabled()) {
3068
    const int64_t time_cost = end_time - get_receive_timestamp();
3069
    if (!THIS_THWORKER.need_retry()) {
3070
      EVENT_INC(SQL_PS_EXECUTE_COUNT);
3071
      switch (type) {
3072
        ADD_STMT_STAT(SELECT);
3073
        ADD_STMT_STAT(INSERT);
3074
        ADD_STMT_STAT(REPLACE);
3075
        ADD_STMT_STAT(UPDATE);
3076
        ADD_STMT_STAT(DELETE);
3077
        default: {
3078
          EVENT_INC(SQL_OTHER_COUNT);
3079
          EVENT_ADD(SQL_OTHER_TIME, time_cost);
3080
        }
3081
      }
3082
    }
3083
  }
3084
#undef ADD_STMT_STAT
3085
}
3086

3087
int ObMPStmtExecute::response_query_header(ObSQLSessionInfo &session, pl::ObDbmsCursorInfo &cursor)
3088
{
3089
  int ret = OB_SUCCESS;
3090
  ObSyncPlanDriver drv(gctx_,
3091
                           ctx_,
3092
                           session,
3093
                           retry_ctrl_,
3094
                           *this,
3095
                           false,
3096
                           OB_INVALID_COUNT);
3097
  if (0 == cursor.get_field_columns().count()) {
3098
    // SELECT * INTO OUTFILE return null field, and only response ok packet
3099
    ObOKPParam ok_param;
3100
    ok_param.affected_rows_ = 0;
3101
    ok_param.is_partition_hit_ = session.partition_hit().get_bool();
3102
    ok_param.has_more_result_ = false;
3103
    if (OB_FAIL(send_ok_packet(session, ok_param))) {
3104
      LOG_WARN("fail to send ok packt", K(ok_param), K(ret));
3105
    }
3106
  } else {
3107
    if (OB_FAIL(drv.response_query_header(cursor.get_field_columns(),
3108
                                          false,
3109
                                          false,
3110
                                          true))) {
3111
      LOG_WARN("fail to get autocommit", K(ret));
3112
    }
3113
  }
3114
  return ret;
3115
}
3116

3117
} //end of namespace observer
3118
} //end of namespace oceanbase
3119

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.