2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
15
#include "observer/mysql/obmp_stmt_execute.h"
17
#include "lib/oblog/ob_log.h"
18
#include "lib/stat/ob_session_stat.h"
19
#include "lib/profile/ob_perf_event.h"
20
#include "lib/timezone/ob_time_convert.h"
21
#include "lib/encode/ob_base64_encode.h"
22
#include "observer/mysql/obsm_utils.h"
23
#include "rpc/ob_request.h"
24
#include "rpc/obmysql/ob_mysql_packet.h"
25
#include "rpc/obmysql/ob_mysql_util.h"
26
#include "rpc/obmysql/packet/ompk_eof.h"
27
#include "rpc/obmysql/packet/ompk_resheader.h"
28
#include "rpc/obmysql/packet/ompk_field.h"
29
#include "rpc/obmysql/packet/ompk_row.h"
30
#include "rpc/obmysql/obsm_struct.h"
31
#include "observer/mysql/obsm_row.h"
32
#include "share/schema/ob_schema_getter_guard.h"
33
#include "share/ob_time_utility2.h"
34
#include "sql/ob_sql.h"
35
#include "sql/ob_sql_context.h"
36
#include "sql/session/ob_sql_session_info.h"
37
#include "sql/plan_cache/ob_prepare_stmt_struct.h"
38
#include "observer/omt/ob_tenant.h"
39
#include "observer/mysql/ob_sync_plan_driver.h"
40
#include "observer/mysql/ob_sync_cmd_driver.h"
41
#include "observer/mysql/ob_async_cmd_driver.h"
42
#include "observer/mysql/ob_async_plan_driver.h"
43
#include "observer/ob_req_time_service.h"
44
#include "pl/ob_pl_user_type.h"
45
#include "pl/ob_pl_package.h"
46
#include "pl/ob_pl_resolver.h"
47
#include "pl/ob_pl_exception_handling.h"
48
#include "sql/plan_cache/ob_prepare_stmt_struct.h"
49
#include "observer/mysql/obmp_stmt_prexecute.h"
50
#include "observer/mysql/obmp_stmt_send_piece_data.h"
51
#include "observer/mysql/obmp_utils.h"
52
#include "share/ob_lob_access_utils.h"
53
#include "sql/plan_cache/ob_ps_cache.h"
58
using namespace common;
60
using namespace obmysql;
66
inline int ObPSAnalysisChecker::detection(const int64_t len)
70
} else if (*pos_ + len > end_pos_) {
71
ret = OB_ERR_MALFORMED_PS_PACKET;
72
LOG_USER_ERROR(OB_ERR_MALFORMED_PS_PACKET);
73
LOG_ERROR("malformed ps data packet, please check the number and content of data packet parameters", K(ret), KP(pos_), KP(begin_pos_),
74
K(end_pos_ - begin_pos_), K(len), K(data_len_), K(remain_len()));
79
ObMPStmtExecute::ObMPStmtExecute(const ObGlobalContext &gctx)
81
retry_ctrl_(/*ctx_.retry_info_*/),
84
stmt_type_(stmt::T_NONE),
86
arraybinding_params_(NULL),
87
arraybinding_columns_(NULL),
88
arraybinding_row_(NULL),
89
is_arraybinding_(false),
90
is_save_exception_(false),
91
arraybinding_size_(0),
92
arraybinding_rowcnt_(0),
93
ps_cursor_type_(ObNormalType),
94
single_process_timestamp_(0),
95
exec_start_timestamp_(0),
96
exec_end_timestamp_(0),
97
prepare_packet_sent_(false),
103
ctx_.exec_type_ = MpQuery;
106
int ObMPStmtExecute::init_arraybinding_field(int64_t column_field_cnt,
107
const ColumnsFieldIArray *column_fields)
109
int ret = OB_SUCCESS;
111
ObField sql_no_field, err_no_field, err_msg_field;
113
OX (sql_no_field.charsetnr_ = CS_TYPE_UTF8MB4_GENERAL_CI);
114
OX (sql_no_field.type_.set_type(ObIntType));
115
OZ (common::ObField::get_field_mb_length(sql_no_field.type_.get_type(),
116
sql_no_field.accuracy_,
117
common::CS_TYPE_INVALID,
118
sql_no_field.length_));
119
OX (sql_no_field.cname_ = ObString("sql_no"));
121
OX (err_no_field.charsetnr_ = CS_TYPE_UTF8MB4_GENERAL_CI);
122
OX (err_no_field.type_.set_type(ObIntType));
123
OZ (common::ObField::get_field_mb_length(err_no_field.type_.get_type(),
124
err_no_field.accuracy_,
125
common::CS_TYPE_INVALID, err_no_field.length_));
126
OX (err_no_field.cname_ = ObString("error_code"));
128
OX (err_msg_field.charsetnr_ = CS_TYPE_UTF8MB4_GENERAL_CI);
129
OX (err_msg_field.type_.set_type(ObVarcharType));
130
OZ (common::ObField::get_field_mb_length(err_msg_field.type_.get_type(),
131
err_msg_field.accuracy_,
132
common::CS_TYPE_INVALID,
133
err_msg_field.length_));
134
OX (err_msg_field.cname_ = ObString("error_message"));
136
OZ (arraybinding_columns_->push_back(sql_no_field));
137
OZ (arraybinding_columns_->push_back(err_no_field));
138
if (is_prexecute() && column_field_cnt > 3 && OB_NOT_NULL(column_fields)) {
139
// only for pre_execute
140
for (int64_t i = 0; OB_SUCC(ret) && i < column_fields->count(); i++) {
141
if (OB_FAIL(arraybinding_columns_->push_back(column_fields->at(i)))) {
142
LOG_WARN("fail to push arraybinding_columns_", "field", column_fields->at(i), K(i));
146
OZ (arraybinding_columns_->push_back(err_msg_field));
151
int ObMPStmtExecute::init_row_for_arraybinding(ObIAllocator &alloc, int64_t array_binding_row_num)
153
int ret = OB_SUCCESS;
154
ObObj* obj = static_cast<ObObj*>(alloc.alloc(sizeof(ObObj) * array_binding_row_num));
155
if (OB_ISNULL(obj)) {
156
ret = OB_ALLOCATE_MEMORY_FAILED;
157
LOG_WARN("failed to alloc memory for row", K(ret));
160
for (int64_t i = 0; i < array_binding_row_num; ++i) {
161
ptr = new(ptr)ObObj();
164
arraybinding_row_->assign(obj, array_binding_row_num);
169
int ObMPStmtExecute::init_arraybinding_paramstore(ObIAllocator &alloc)
171
int ret = OB_SUCCESS;
172
if (OB_ISNULL(arraybinding_params_
173
= static_cast<ParamStore*>(alloc.alloc(sizeof(ParamStore))))) {
174
ret = OB_ALLOCATE_MEMORY_FAILED;
175
LOG_WARN("failed to allocate memory", K(ret));
177
OX (arraybinding_params_ = new(arraybinding_params_)ParamStore((ObWrapperAllocator(alloc))));
182
// only used for pre_execute
183
int ObMPStmtExecute::init_arraybinding_fields_and_row(ObMySQLResultSet &result)
185
int ret = OB_SUCCESS;
186
int64_t returning_field_num = 0;
188
if (!is_prexecute()) {
189
ret = OB_ERR_UNEXPECTED;
190
LOG_WARN("not support execute protocol", K(ret));
191
} else if (OB_ISNULL(result.get_field_columns())) {
192
ret = OB_ERR_UNEXPECTED;
193
LOG_WARN("not support execute protocol", K(ret));
195
ObIAllocator *alloc = static_cast<ObMPStmtPrexecute*>(this)->get_alloc();
196
if (OB_ISNULL(alloc)) {
197
ret = OB_ERR_UNEXPECTED;
198
LOG_WARN("allocator is null", K(ret));
199
} else if (OB_ISNULL(result.get_field_columns())) {
200
ret = OB_ERR_UNEXPECTED;
201
LOG_WARN("returning param field is null", K(ret));
203
returning_field_num = result.get_field_columns()->count();
208
} else if (OB_ISNULL(arraybinding_columns_
209
= static_cast<ColumnsFieldArray*>(alloc->alloc(sizeof(ColumnsFieldArray))))) {
210
ret = OB_ALLOCATE_MEMORY_FAILED;
211
LOG_WARN("failed to allocate memory", K(ret));
212
} else if (OB_ISNULL(arraybinding_row_
213
= static_cast<ObNewRow*>(alloc->alloc(sizeof(ObNewRow))))) {
214
ret = OB_ALLOCATE_MEMORY_FAILED;
215
LOG_WARN("failed to allocate memory", K(ret));
217
arraybinding_columns_
218
= new(arraybinding_columns_)ColumnsFieldArray(*alloc, 3 + returning_field_num);
219
arraybinding_row_ = new(arraybinding_row_)ObNewRow();
221
OZ (init_arraybinding_field(returning_field_num + 3, result.get_field_columns()));
222
OZ (init_row_for_arraybinding(*alloc, returning_field_num + 3));
228
int ObMPStmtExecute::init_for_arraybinding(ObIAllocator &alloc)
230
int ret = OB_SUCCESS;
231
if (OB_ISNULL(arraybinding_params_
232
= static_cast<ParamStore*>(alloc.alloc(sizeof(ParamStore))))) {
233
ret = OB_ALLOCATE_MEMORY_FAILED;
234
LOG_WARN("failed to allocate memory", K(ret));
235
} else if (is_save_exception_) {
236
if (OB_ISNULL(arraybinding_columns_
237
= static_cast<ColumnsFieldArray*>(alloc.alloc(sizeof(ColumnsFieldArray))))) {
238
ret = OB_ALLOCATE_MEMORY_FAILED;
239
LOG_WARN("failed to allocate memory", K(ret));
240
} else if (OB_ISNULL(arraybinding_row_
241
= static_cast<ObNewRow*>(alloc.alloc(sizeof(ObNewRow))))) {
242
ret = OB_ALLOCATE_MEMORY_FAILED;
243
LOG_WARN("failed to allocate memory", K(ret));
245
arraybinding_columns_
246
= new(arraybinding_columns_)ColumnsFieldArray(alloc, 3);
247
arraybinding_row_ = new(arraybinding_row_)ObNewRow();
249
OZ (init_arraybinding_field(3, NULL));
250
OZ (init_row_for_arraybinding(alloc, 3));
252
OX (arraybinding_params_ = new(arraybinding_params_)ParamStore((ObWrapperAllocator(alloc))));
256
int ObMPStmtExecute::check_param_type_for_arraybinding(
257
ObSQLSessionInfo *session_info,
258
ParamTypeInfoArray ¶m_type_infos)
260
int ret = OB_SUCCESS;
261
if (!ObStmt::is_dml_write_stmt(stmt_type_)
262
&& stmt::T_ANONYMOUS_BLOCK != stmt_type_
263
&& stmt::T_CALL_PROCEDURE != stmt_type_) {
264
ret = OB_NOT_SUPPORTED;
265
LOG_WARN("arraybinding only support write dml", K(ret), K(stmt_type_));
266
LOG_USER_ERROR(OB_NOT_SUPPORTED, "arraybinding got no write dml");
267
} else if (session_info->get_local_autocommit()) {
268
ret = OB_NOT_SUPPORTED;
269
LOG_WARN("arraybinding must in autocommit off", K(ret));
270
LOG_USER_ERROR(OB_NOT_SUPPORTED, "arraybinding has autocommit = on");
271
} else if (OB_UNLIKELY(param_type_infos.count() <= 0)) {
272
ret = OB_NOT_SUPPORTED;
273
LOG_WARN("arraybinding must has parameters", K(ret));
274
LOG_USER_ERROR(OB_NOT_SUPPORTED, "arraybinding has no parameter");
276
for (int64_t i = 0; OB_SUCC(ret) && i < param_type_infos.count(); ++i) {
277
TypeInfo &type_info = param_type_infos.at(i);
278
if (type_info.is_basic_type_ || !type_info.is_elem_type_) {
279
ret = OB_NOT_SUPPORTED;
280
LOG_WARN("arraybinding parameter must be anonymous array", K(ret));
281
LOG_USER_ERROR(OB_NOT_SUPPORTED,
282
"arraybinding parameter is not anonymous array");
289
int ObMPStmtExecute::check_param_value_for_arraybinding(ObObjParam ¶m)
291
int ret = OB_SUCCESS;
292
ObPLCollection *coll = NULL;
294
CK (OB_NOT_NULL(coll = reinterpret_cast<ObPLCollection*>(param.get_ext())));
296
} else if (0 == arraybinding_size_) {
297
arraybinding_size_ = coll->get_count();
299
CK (arraybinding_size_ == coll->get_count());
304
int ObMPStmtExecute::construct_execute_param_for_arraybinding(int64_t pos)
306
int ret = OB_SUCCESS;
307
CK (OB_NOT_NULL(arraybinding_params_));
308
CK (OB_NOT_NULL(params_));
309
CK (arraybinding_params_->count() == params_->count());
310
for (int64_t i = 0; OB_SUCC(ret) && i < arraybinding_params_->count(); ++i) {
311
ObObjParam &obj = arraybinding_params_->at(i);
312
ObPLCollection *coll = NULL;
315
CK (OB_NOT_NULL(coll = reinterpret_cast<ObPLCollection*>(obj.get_ext())));
316
CK (coll->get_count() > pos);
317
CK (1 == coll->get_column_count());
318
CK (OB_NOT_NULL(data = reinterpret_cast<ObObj*>(coll->get_data())));
319
OX (params_->at(i) = *(data + pos));
320
if (data[pos].is_numeric_type()) {
321
ObAccuracy default_acc =
322
ObAccuracy::DDL_DEFAULT_ACCURACY2[lib::is_oracle_mode()][data[pos].get_type()];
323
if (params_->at(i).get_scale() == NUMBER_SCALE_UNKNOWN_YET) {
324
params_->at(i).set_scale(default_acc.get_scale());
326
if (params_->at(i).get_precision() == PRECISION_UNKNOWN_YET) {
327
params_->at(i).set_precision(default_acc.get_precision());
334
void ObMPStmtExecute::reset_complex_param_memory(ParamStore *params, ObSQLSessionInfo &session_info)
336
if (OB_NOT_NULL(params)) {
337
for (int64_t i = 0; i < params->count(); ++i) {
338
ObObjParam &obj = params->at(i);
339
if (obj.is_pl_extend()) {
340
int ret = ObUserDefinedType::destruct_obj(obj, &session_info);
341
if (OB_SUCCESS != ret) {
342
LOG_WARN("fail to destruct obj", K(ret), K(i));
349
int ObMPStmtExecute::send_eof_packet_for_arraybinding(ObSQLSessionInfo &session_info)
351
int ret = OB_SUCCESS;
354
const ObWarningBuffer *warnings_buf = common::ob_get_tsi_warning_buffer();
355
uint16_t warning_count = 0;
356
if (OB_ISNULL(warnings_buf)) {
357
LOG_WARN("can not get thread warnings buffer");
359
warning_count = static_cast<uint16_t>(warnings_buf->get_readable_warning_count());
361
eofp.set_warning_count(warning_count);
362
ObServerStatusFlags flags = eofp.get_server_status();
363
flags.status_flags_.OB_SERVER_STATUS_IN_TRANS
364
= (session_info.is_server_status_in_transaction() ? 1 : 0);
365
flags.status_flags_.OB_SERVER_STATUS_AUTOCOMMIT = (session_info.get_local_autocommit() ? 1 : 0);
366
// MORE_RESULTS need false in prexecute protocol.
367
// only is_save_exception_ will use this func in prexecute protocol.
368
flags.status_flags_.OB_SERVER_MORE_RESULTS_EXISTS = is_prexecute() && is_save_exception_
370
if (!session_info.is_obproxy_mode()) {
371
flags.status_flags_.OB_SERVER_QUERY_WAS_SLOW = !session_info.partition_hit().get_bool();
373
eofp.set_server_status(flags);
374
OZ (response_packet(eofp, &session_info));
379
int ObMPStmtExecute::response_result_for_arraybinding(
380
ObSQLSessionInfo &session_info,
381
ObIArray<ObSavedException> &exception_array)
383
int ret = OB_SUCCESS;
384
if (exception_array.count() > 0) {
385
if (is_prexecute()) {
389
rhp.set_field_count(3);
390
OZ (response_packet(rhp, &session_info));
392
for (int64_t i = 0; OB_SUCC(ret) && i < arraybinding_columns_->count(); ++i) {
394
OZ (ObMySQLResultSet::to_mysql_field(arraybinding_columns_->at(i), field));
395
ObMySQLResultSet::replace_lob_type(session_info, arraybinding_columns_->at(i), field);
397
OZ (response_packet(fp, &session_info));
400
OZ (send_eof_packet_for_arraybinding(session_info));
402
for (int64_t i = 0; OB_SUCC(ret) && i < exception_array.count(); ++i) {
403
arraybinding_row_->get_cell(0).set_int(exception_array.at(i).pos_);
404
arraybinding_row_->get_cell(1).set_int(exception_array.at(i).error_code_);
405
arraybinding_row_->get_cell(2).set_varchar(exception_array.at(i).error_msg_);
407
const ObDataTypeCastParams dtc_params
408
= ObBasicSessionInfo::create_dtc_params(&session_info);
409
ObSMRow sm_row(BINARY,
412
arraybinding_columns_,
414
session_info.get_effective_tenant_id());
416
OZ (response_packet(rp, &session_info));
418
OZ (send_eof_packet_for_arraybinding(session_info));
423
bool ps_out = ((stmt::T_ANONYMOUS_BLOCK == stmt_type_ || stmt::T_CALL_PROCEDURE == stmt_type_)
424
&& arraybinding_columns_->count() > 3) ? true : false;
426
ok_param.affected_rows_ = arraybinding_rowcnt_;
427
ok_param.is_partition_hit_ = session_info.partition_hit().get_bool();
428
ok_param.has_pl_out_ = ps_out;
429
OZ (send_ok_packet(session_info, ok_param));
434
int ObMPStmtExecute::save_exception_for_arraybinding(
435
int64_t pos, int error_code, ObIArray<ObSavedException> &exception_array)
437
int ret = OB_SUCCESS;
438
ObSavedException exception;
440
const char *errm_result = NULL;
441
int64_t errm_length = 0;
443
exception.pos_ = pos;
444
exception.error_code_ = static_cast<uint16_t>(ob_errpkt_errno(error_code, lib::is_oracle_mode()));
446
ObIAllocator &alloc = CURRENT_CONTEXT->get_arena_allocator();
448
const ObWarningBuffer *wb = common::ob_get_tsi_warning_buffer();
449
if (OB_LIKELY(NULL != wb) && wb->get_err_code() == error_code) {
450
errm_result = wb->get_err_msg();
451
errm_length = strlen(errm_result);
453
errm_result = ob_errpkt_strerror(error_code, true);
454
if (NULL == errm_result) {
455
errm_result = "ORA%ld: Message error_code not found; product=RDBMS; facility=ORA";
457
errm_length = strlen(errm_result);
460
OZ (ob_write_string(alloc, ObString(errm_length, errm_result), exception.error_msg_));
461
OZ (exception_array.push_back(exception));
465
int ObMPStmtExecute::after_do_process_for_arraybinding(ObMySQLResultSet &result)
467
int ret = OB_SUCCESS;
468
if (OB_ISNULL(result.get_physical_plan())) {
470
LOG_WARN("should have set plan to result set", K(ret));
471
} else if (OB_FAIL(result.open())) {
472
int cret = OB_SUCCESS;
473
int cli_ret = OB_SUCCESS;
474
retry_ctrl_.test_and_save_retry_state(gctx_,
479
true/*arraybinding only local retry*/);
480
if (OB_TRANSACTION_SET_VIOLATION != ret && OB_REPLICA_NOT_READABLE != ret) {
481
if (OB_TRY_LOCK_ROW_CONFLICT == ret && retry_ctrl_.need_retry()) {
484
LOG_WARN("result set open failed, check if need retry",
485
K(ret), K(cli_ret), K(retry_ctrl_.need_retry()));
489
cret = result.close();
490
if (cret != OB_SUCCESS &&
491
cret != OB_TRANSACTION_SET_VIOLATION &&
492
OB_TRY_LOCK_ROW_CONFLICT != cret) {
493
LOG_WARN("close result set fail", K(cret));
495
} else if (result.is_with_rows()) {
496
ret = OB_NOT_SUPPORTED;
497
LOG_WARN("in arraybinding, dml with rows is not supported", K(ret));
498
LOG_USER_ERROR(OB_NOT_SUPPORTED, "in arraybinding, dml with rows");
501
OX (arraybinding_rowcnt_ += result.get_affected_rows());
506
int ObMPStmtExecute::before_process()
508
int ret = OB_SUCCESS;
509
if (OB_FAIL(ObMPBase::before_process())) {
510
LOG_WARN("fail to call before process", K(ret));
511
} else if ((OB_ISNULL(req_))) {
512
ret = OB_INVALID_ARGUMENT;
513
LOG_ERROR("request should not be null", K(ret));
514
} else if (req_->get_type() != ObRequest::OB_MYSQL) {
515
ret = OB_INVALID_ARGUMENT;
516
LOG_ERROR("invalid request", K(ret), K_(*req));
518
ObIAllocator &alloc = CURRENT_CONTEXT->get_arena_allocator();
519
if (OB_ISNULL(params_ = static_cast<ParamStore *>(alloc.alloc(sizeof(ParamStore))))) {
520
ret = OB_ALLOCATE_MEMORY_FAILED;
521
LOG_WARN("failed to allocate memory", K(ret));
523
params_ = new(params_)ParamStore( (ObWrapperAllocator(alloc)) );
525
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
526
const char* pos = pkt.get_cdata();
527
analysis_checker_.init(pos, pkt.get_clen());
528
int32_t stmt_id = -1; //INVALID_STMT_ID
529
uint32_t ps_stmt_checksum = 0;
530
ObSQLSessionInfo *session = NULL;
531
PS_DEFENSE_CHECK(9) // stmt_id(4) + flag(1) + checksum(4)
533
ObMySQLUtil::get_int4(pos, stmt_id);
536
// pos += 1; //skip flags
538
ObMySQLUtil::get_int1(pos, flag);
539
const uint8_t ARRAYBINDING_MODE = 8;
540
const uint8_t SAVE_EXCEPTION_MODE = 16;
541
is_arraybinding_ = flag & ARRAYBINDING_MODE;
542
is_save_exception_ = flag & SAVE_EXCEPTION_MODE;
543
ps_cursor_type_ = 0 != (flag & CURSOR_TYPE_READ_ONLY)
544
? ObExecutePsCursorType
547
// 4 bytes, iteration-count, used for checksum
548
ObMySQLUtil::get_uint4(pos, ps_stmt_checksum);
550
if (is_arraybinding_) {
551
OZ (init_for_arraybinding(alloc));
555
} else if (OB_FAIL(get_session(session))) {
556
LOG_WARN("get session failed");
557
} else if (OB_ISNULL(session)) {
558
ret = OB_ERR_UNEXPECTED;
559
LOG_WARN("session is NULL or invalid", K(ret), K(session));
561
const bool enable_sql_audit =
562
GCONF.enable_sql_audit && session->get_local_ob_enable_sql_audit();
563
OZ (request_params(session, pos, ps_stmt_checksum, alloc, -1));
564
if (!is_pl_stmt(stmt_type_) && enable_sql_audit) {
565
OZ (store_params_value_to_str(alloc, *session));
568
if (session != NULL) {
569
revert_session(session);
573
send_error_packet(ret, NULL, (void *)(ctx_.get_reroute_info()));
574
if (OB_ERR_PREPARE_STMT_CHECKSUM == ret) {
576
LOG_WARN("prepare stmt checksum error, disconnect connection", K(ret));
584
int ObMPStmtExecute::store_params_value_to_str(ObIAllocator &alloc, sql::ObSQLSessionInfo &session)
586
return store_params_value_to_str(alloc, session, params_, params_value_, params_value_len_);
589
int ObMPStmtExecute::store_params_value_to_str(ObIAllocator &alloc,
590
sql::ObSQLSessionInfo &session,
593
int64_t ¶ms_value_len)
595
int ret = OB_SUCCESS;
597
int64_t length = OB_MAX_SQL_LENGTH;
598
CK (OB_NOT_NULL(params));
599
CK (OB_ISNULL(params_value));
600
CK (OB_NOT_NULL(params_value = static_cast<char *>(alloc.alloc(OB_MAX_SQL_LENGTH))));
601
for (int i = 0; OB_SUCC(ret) && i < params->count(); ++i) {
602
const common::ObObjParam ¶m = params->at(i);
603
if (param.is_ext()) {
606
params_value_len = 0;
609
OZ (param.print_sql_literal(params_value, length, pos, alloc, TZ_INFO(&session)));
610
if (i != params->count() - 1) {
611
OZ (databuff_printf(params_value, length, pos, alloc, ","));
617
params_value_len = 0;
618
// The failure of store_params_value_to_str does not affect the execution of SQL,
619
// so the error code is ignored here
622
params_value_len = pos;
627
int ObMPStmtExecute::parse_request_type(const char* &pos,
628
int64_t num_of_params,
629
int8_t new_param_bound_flag,
630
ObCollationType cs_type,
631
ObCollationType ncs_type,
632
ParamTypeArray ¶m_types,
633
ParamTypeInfoArray ¶m_type_infos
634
/*ParamCastArray param_cast_infos*/)
636
int ret = OB_SUCCESS;
637
// Step3: get type info
638
if (param_type_infos.count() < num_of_params) {
639
ret = OB_ERR_UNEXPECTED;
640
LOG_WARN("type array length is not normal", K(ret), K(param_types.count()), K(param_type_infos.count()));
642
for (int i = 0; OB_SUCC(ret) && i < num_of_params; ++i) {
645
TypeInfo &type_name_info = param_type_infos.at(i);
646
if (1 == new_param_bound_flag) {
647
PS_DEFENSE_CHECK(2) // type(1) + flag(1)
649
ObMySQLUtil::get_uint1(pos, type);
650
ObMySQLUtil::get_int1(pos, flag);
651
if (OB_FAIL(param_types.push_back(static_cast<EMySQLFieldType>(type)))) {
652
LOG_WARN("fail to push back", K(type), K(i));
653
} else if (EMySQLFieldType::MYSQL_TYPE_COMPLEX != type) {
654
int16_t unsigned_flag = 128;
655
ObObjType ob_elem_type;
656
if (OB_FAIL(ObSMUtils::get_ob_type(ob_elem_type,
657
static_cast<EMySQLFieldType>(type),
658
flag & unsigned_flag ? true : false))) {
659
LOG_WARN("get ob type fail. ", K(type));
661
type_name_info.elem_type_.set_obj_type(ob_elem_type);
666
if (num_of_params != param_types.count()) {
667
ret = OB_ERR_WRONG_DYNAMIC_PARAM;
668
LOG_USER_ERROR(OB_ERR_WRONG_DYNAMIC_PARAM,
669
param_types.count(), num_of_params);
671
type = static_cast<uint8_t>(param_types.at(i));
677
uint8_t elem_type = 0;
678
if (EMySQLFieldType::MYSQL_TYPE_COMPLEX == type) {
679
type_name_info.is_basic_type_ = false;
680
if (OB_FAIL(decode_type_info(pos, type_name_info))) {
681
LOG_WARN("failed to decode type info", K(ret));
682
} else if (type_name_info.type_name_.empty()) {
683
ObObjType ob_elem_type;
684
type_name_info.is_elem_type_ = true;
685
PS_DEFENSE_CHECK(1) // elem_type(1)
687
ObMySQLUtil::get_uint1(pos, elem_type);
689
OZ (ObSMUtils::get_ob_type(
690
ob_elem_type, static_cast<EMySQLFieldType>(elem_type)), elem_type);
691
OX (type_name_info.elem_type_.set_obj_type(ob_elem_type));
694
case MYSQL_TYPE_OB_NVARCHAR2:
695
case MYSQL_TYPE_OB_NCHAR: {
696
type_name_info.elem_type_.set_collation_type(ncs_type);
698
case MYSQL_TYPE_ORA_BLOB: {
699
type_name_info.elem_type_.set_collation_type(CS_TYPE_BINARY);
702
type_name_info.elem_type_.set_collation_type(cs_type);
706
if (OB_SUCC(ret) && EMySQLFieldType::MYSQL_TYPE_COMPLEX == elem_type) {
707
OZ (decode_type_info(pos, type_name_info));
716
int ObMPStmtExecute::parse_request_param_value(ObIAllocator &alloc,
717
sql::ObSQLSessionInfo *session,
720
EMySQLFieldType ¶m_type,
721
TypeInfo ¶m_type_info,
725
int ret = OB_SUCCESS;
726
ObCharsetType charset = CHARSET_INVALID;
727
ObCharsetType ncharset = CHARSET_INVALID;
728
ObCollationType cs_conn = CS_TYPE_INVALID;
729
ObCollationType cs_server = CS_TYPE_INVALID;
730
if (OB_ISNULL(session)) {
731
ret = OB_ERR_UNEXPECTED;
732
LOG_WARN("session is null", K(ret));
733
} else if (OB_FAIL(session->get_character_set_connection(charset))) {
734
LOG_WARN("get charset for client failed", K(ret));
735
} else if (OB_FAIL(session->get_collation_connection(cs_conn))) {
736
LOG_WARN("get charset for client failed", K(ret));
737
} else if (OB_FAIL(session->get_collation_server(cs_server))) {
738
LOG_WARN("get charset for client failed", K(ret));
739
} else if (OB_FAIL(session->get_ncharacter_set_connection(ncharset))) {
740
LOG_WARN("get charset for client failed", K(ret));
742
// Step5: decode value
745
} else if (OB_FAIL(ObSMUtils::get_ob_type(
746
ob_type, static_cast<EMySQLFieldType>(param_type)))) {
747
LOG_WARN("cast ob type from mysql type failed",
748
K(ob_type), K(param_type), K(ret));
750
param.set_type(ob_type);
751
param.set_param_meta();
752
if (OB_FAIL(parse_param_value(alloc,
756
is_oracle_mode() ? cs_server : cs_conn,
757
session->get_nls_collation_nation(),
759
session->get_timezone_info(),
764
LOG_WARN("get param value failed", K(param));
766
LOG_DEBUG("resolve execute with param", K(param));
772
bool ObMPStmtExecute::is_contain_complex_element(const sql::ParamTypeArray ¶m_types) const
775
for (int64_t i = 0; i < param_types.count(); i++) {
776
const obmysql::EMySQLFieldType field_type = param_types.at(i);
777
if (MYSQL_TYPE_COMPLEX == field_type) {
785
int ObMPStmtExecute::request_params(ObSQLSessionInfo *session,
787
uint32_t ps_stmt_checksum,
789
int32_t all_param_num)
791
int ret = OB_SUCCESS;
792
ObPsSessionInfo *ps_session_info = NULL;
793
ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());
794
ObCharsetType charset = CHARSET_INVALID;
795
ObCollationType cs_conn = CS_TYPE_INVALID;
796
ObCollationType cs_server = CS_TYPE_INVALID;
797
share::schema::ObSchemaGetterGuard schema_guard;
798
const uint64_t tenant_id = session->get_effective_tenant_id();
800
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
801
LOG_WARN("get schema guard failed", K(ret));
802
} else if (OB_FAIL(session->get_character_set_connection(charset))) {
803
LOG_WARN("get charset for client failed", K(ret));
804
} else if (OB_FAIL(session->get_collation_connection(cs_conn))) {
805
LOG_WARN("get charset for client failed", K(ret));
806
} else if (OB_FAIL(session->get_collation_server(cs_server))) {
807
LOG_WARN("get charset for client failed", K(ret));
808
} else if (OB_FAIL(session->get_ps_session_info(stmt_id_, ps_session_info))) {
809
LOG_WARN("get_ps_session_info failed", K(ret), K_(stmt_id));
810
} else if (OB_ISNULL(ps_session_info)) {
811
ret = OB_INVALID_ARGUMENT;
812
LOG_WARN("ps_session_info is null", K(ret));
813
} else if (DEFAULT_ITERATION_COUNT == ps_stmt_checksum) {
816
} else if (ps_stmt_checksum != ps_session_info->get_ps_stmt_checksum()) {
817
ret = OB_ERR_PREPARE_STMT_CHECKSUM;
818
LOG_ERROR("ps stmt checksum fail", K(ret), "session_id", session->get_sessid(),
819
K(ps_stmt_checksum), K(*ps_session_info));
822
LOG_TRACE("ps session info",
823
K(ret), "session_id", session->get_sessid(), K(*ps_session_info));
824
share::schema::ObSchemaGetterGuard *old_guard = ctx_.schema_guard_;
825
ObSQLSessionInfo *old_sess_info = ctx_.session_info_;
826
ctx_.schema_guard_ = &schema_guard;
827
ctx_.session_info_ = session;
828
const int64_t input_param_num = ps_session_info->get_param_count();
829
stmt_type_ = ps_session_info->get_stmt_type();
830
int8_t new_param_bound_flag = 0;
831
if (is_pl_stmt(stmt_type_)) {
832
// pl not support save exception
833
is_save_exception_ = 0;
835
// for returning into,
836
// all_param_num = input_param_num + returning_param_num
837
params_num_ = (all_param_num > input_param_num) ? all_param_num : input_param_num;
838
int64_t returning_params_num = all_param_num - input_param_num;
839
if (is_prexecute() && 0 != params_num_) {
840
if (ps_session_info->get_num_of_returning_into() > 0) {
841
// check param_cnt for returning into
842
if (returning_params_num != ps_session_info->get_num_of_returning_into()
843
|| input_param_num != ps_session_info->get_param_count()) {
844
ret = OB_ERR_UNEXPECTED;
845
LOG_WARN("param num is not match ps stmt prama count.", K(is_prexecute()), K(params_num_),
846
K(ps_session_info->get_param_count()));
848
} else if (params_num_ != ps_session_info->get_param_count()) {
850
ret = OB_ERR_UNEXPECTED;
851
LOG_WARN("param num is not match ps stmt prama count.", K(is_prexecute()), K(params_num_),
852
K(ps_session_info->get_param_count()));
855
if (OB_SUCC(ret) && params_num_ > 0) {
857
int64_t bitmap_types = (params_num_ + 7) / 8;
858
const char *bitmap = pos;
860
ParamTypeArray ¶m_types = ps_session_info->get_param_types();
861
ParamTypeInfoArray param_type_infos;
862
ParamCastArray param_cast_infos;
864
ParamTypeArray returning_param_types;
865
ParamTypeInfoArray returning_param_type_infos;
866
int64_t len = bitmap_types + 1/*new_param_bound_flag*/;
867
PS_DEFENSE_CHECK(len) // bitmap_types
869
// Step2: 获取new_param_bound_flag字段
870
ObMySQLUtil::get_int1(pos, new_param_bound_flag);
871
if (new_param_bound_flag == 1) {
877
} else if (OB_FAIL(param_type_infos.prepare_allocate(input_param_num))) {
878
LOG_WARN("array prepare allocate failed", K(ret), K(input_param_num));
879
} else if (OB_FAIL(params_->prepare_allocate(input_param_num))) {
880
LOG_WARN("array prepare allocate failed", K(ret));
881
} else if (OB_FAIL(param_cast_infos.prepare_allocate(input_param_num))) {
882
LOG_WARN("array prepare allocate failed", K(ret));
883
} else if (is_arraybinding_) {
884
CK (OB_NOT_NULL(arraybinding_params_));
885
OZ (arraybinding_params_->prepare_allocate(input_param_num));
888
for (int i = 0; OB_SUCC(ret) && i < input_param_num; ++i) {
889
param_cast_infos.at(i) = false;
894
} else if (params_num_ <= input_param_num) {
895
// not need init returning_param_types and returning_param_type_infos
896
} else if (OB_FAIL(returning_param_type_infos.prepare_allocate(params_num_ - input_param_num))) {
897
LOG_WARN("array prepare allocate failed", K(ret));
901
if (OB_SUCC(ret) && OB_FAIL(parse_request_type(pos,
903
new_param_bound_flag,
907
param_type_infos))) {
908
LOG_WARN("fail to parse input params type", K(ret));
909
} else if (is_contain_complex_element(param_types)) {
910
analysis_checker_.need_check_ = false;
913
// Step3-2: 获取returning into params type信息
914
if (OB_SUCC(ret) && returning_params_num > 0) {
915
if (new_param_bound_flag != 1) {
916
ret = OB_ERR_UNEXPECTED;
917
LOG_WARN("returning into parm must define type", K(ret));
918
} else if (OB_FAIL(parse_request_type(pos,
919
returning_params_num,
920
new_param_bound_flag,
923
returning_param_types,
924
returning_param_type_infos))) {
925
LOG_WARN("fail to parse returning into params type", K(ret));
929
if (OB_SUCC(ret) && is_arraybinding_) {
930
OZ (check_param_type_for_arraybinding(session, param_type_infos));
932
if (OB_SUCC(ret) && stmt::T_CALL_PROCEDURE == ps_session_info->get_stmt_type()) {
933
ctx_.is_execute_call_stmt_ = true;
936
// Step5: decode value
937
for (int64_t i = 0; OB_SUCC(ret) && i < input_param_num; ++i) {
938
ObObjParam ¶m = is_arraybinding_ ? arraybinding_params_->at(i) : params_->at(i);
940
if (OB_SUCC(ret) && OB_FAIL(parse_request_param_value(alloc,
945
param_type_infos.at(i),
948
LOG_WARN("fail to parse request param values", K(ret), K(i));
950
LOG_DEBUG("after parser param", K(param), K(i));
952
if (OB_SUCC(ret) && is_arraybinding_) {
953
OZ (check_param_value_for_arraybinding(param));
957
// Step5-2: decode returning into value
958
// need parse returning into params
959
if (OB_SUCC(ret) && returning_params_num > 0) {
960
CK(returning_param_types.count() == returning_params_num);
961
CK(returning_param_type_infos.count() == returning_params_num);
962
for (int64_t i = 0; OB_SUCC(ret) && i < returning_params_num; ++i) {
964
if (OB_FAIL(parse_request_param_value(alloc,
968
returning_param_types.at(i),
969
returning_param_type_infos.at(i),
972
LOG_WARN("fail to parse request returning into param values", K(ret), K(i));
974
LOG_DEBUG("after parser resolve returning into", K(param), K(i));
979
ctx_.schema_guard_ = old_guard;
980
ctx_.session_info_ = old_sess_info;
985
int ObMPStmtExecute::decode_type_info(const char*& buf, TypeInfo &type_info)
987
int ret = OB_SUCCESS;
990
if (OB_FAIL(ObMySQLUtil::get_length(buf, length))) {
991
LOG_WARN("failed to get length", K(ret));
993
PS_DEFENSE_CHECK(length)
995
type_info.relation_name_.assign_ptr(buf, static_cast<ObString::obstr_size_t>(length));
1001
uint64_t length = 0;
1002
if (OB_FAIL(ObMySQLUtil::get_length(buf, length))) {
1003
LOG_WARN("failed to get length", K(ret));
1005
PS_DEFENSE_CHECK(length)
1007
type_info.type_name_.assign_ptr(buf, static_cast<ObString::obstr_size_t>(length));
1013
uint64_t version = 0;
1014
if (OB_FAIL(ObMySQLUtil::get_length(buf, version))) {
1015
LOG_WARN("failed to get version", K(ret));
1021
int ObMPStmtExecute::set_session_active(ObSQLSessionInfo &session) const
1023
int ret = OB_SUCCESS;
1024
if (OB_FAIL(session.set_session_state(QUERY_ACTIVE))) {
1025
LOG_WARN("fail to set session state", K(ret));
1027
session.set_query_start_time(get_receive_timestamp());
1028
session.set_mysql_cmd(obmysql::COM_STMT_EXECUTE);
1029
session.update_last_active_time();
1034
int ObMPStmtExecute::execute_response(ObSQLSessionInfo &session,
1035
ObMySQLResultSet &result,
1036
const bool enable_perf_event,
1037
bool &need_response_error,
1038
bool &is_diagnostics_stmt,
1039
int64_t &execution_id,
1040
const bool force_sync_resp,
1041
bool &async_resp_used,
1042
ObPsStmtId &inner_stmt_id)
1044
int ret = OB_SUCCESS;
1045
inner_stmt_id = OB_INVALID_ID;
1046
ObIAllocator &alloc = CURRENT_CONTEXT->get_arena_allocator();
1047
if (OB_ISNULL(session.get_ps_cache())) {
1048
ret = OB_ERR_UNEXPECTED;
1049
LOG_WARN("ps : ps cache is null.", K(ret), K(stmt_id_));
1050
} else if (OB_FAIL(session.get_inner_ps_stmt_id(stmt_id_, inner_stmt_id))) {
1051
ret = OB_ERR_UNEXPECTED;
1052
LOG_WARN("ps : get inner stmt id fail.", K(ret), K(stmt_id_));
1054
ObPsStmtInfoGuard guard;
1055
ObPsStmtInfo *ps_info = NULL;
1056
if (OB_FAIL(session.get_ps_cache()->get_stmt_info_guard(inner_stmt_id, guard))) {
1057
LOG_WARN("get stmt info guard failed", K(ret), K(stmt_id_), K(inner_stmt_id));
1058
} else if (OB_ISNULL(ps_info = guard.get_stmt_info())) {
1059
ret = OB_ERR_UNEXPECTED;
1060
LOG_WARN("get stmt info is null", K(ret));
1062
if (is_execute_ps_cursor() && stmt::T_SELECT != ps_info->get_stmt_type()) {
1063
set_ps_cursor_type(ObNormalType);
1065
ctx_.cur_sql_ = ps_info->get_ps_sql();
1070
} else if (is_execute_ps_cursor()) {
1071
ObDbmsCursorInfo *cursor = NULL;
1072
bool use_stream = false;
1074
if (OB_NOT_NULL(session.get_cursor(stmt_id_))) {
1075
if (OB_FAIL(session.close_cursor(stmt_id_))) {
1076
LOG_WARN("fail to close result set", K(ret), K(stmt_id_), K(session.get_sessid()));
1079
OZ (session.make_dbms_cursor(cursor, stmt_id_));
1080
CK (OB_NOT_NULL(cursor));
1081
OX (cursor->set_stmt_type(stmt::T_SELECT));
1082
OX (cursor->set_ps_sql(ctx_.cur_sql_));
1083
OZ (session.ps_use_stream_result_set(use_stream));
1085
OX (cursor->set_streaming());
1087
OZ (cursor->prepare_entity(session));
1088
CK (OB_NOT_NULL(cursor->get_allocator()));
1089
OZ (cursor->init_params(params_->count()));
1090
OZ (cursor->get_exec_params().assign(*params_));
1091
OZ (gctx_.sql_engine_->init_result_set(ctx_, result));
1092
if (OB_SUCCESS != ret || enable_perf_event) {
1093
exec_start_timestamp_ = ObTimeUtility::current_time();
1096
ObPLExecCtx pl_ctx(cursor->get_allocator(), &result.get_exec_context(), NULL/*params*/,
1097
NULL/*result*/, &ret, NULL/*func*/, true);
1098
if (OB_FAIL(ObSPIService::dbms_dynamic_open(&pl_ctx, *cursor))) {
1099
LOG_WARN("open cursor fail. ", K(ret), K(stmt_id_));
1100
if (!THIS_WORKER.need_retry()) {
1101
int cli_ret = OB_SUCCESS;
1102
retry_ctrl_.test_and_save_retry_state(
1103
gctx_, ctx_, result, ret, cli_ret, is_arraybinding_ /*ararybinding only local retry*/);
1104
if (OB_ERR_PROXY_REROUTE == ret) {
1105
LOG_DEBUG("run stmt_query failed, check if need retry",
1106
K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K_(stmt_id));
1108
LOG_WARN("run stmt_query failed, check if need retry",
1109
K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K_(stmt_id));
1113
if (OB_ERR_PROXY_REROUTE == ret && !is_arraybinding_) {
1114
need_response_error = true;
1119
* PS模式exec-cursor协议中,
1120
* 不返回 result_set 结果集,只返回包头信息
1121
* 并在EOF包中设置 OB_SERVER_STATUS_CURSOR_EXISTS 状态
1124
OZ (response_query_header(session, *cursor));
1125
if (OB_SUCCESS != ret && OB_NOT_NULL(cursor)) {
1127
if (OB_FAIL(session.close_cursor(cursor->get_id()))) {
1128
LOG_WARN("close cursor failed.", K(ret), K(stmt_id_));
1132
} else if (FALSE_IT(ctx_.enable_sql_resource_manage_ = true)) {
1133
} else if (OB_FAIL(gctx_.sql_engine_->stmt_execute(stmt_id_,
1137
false /* is_inner_sql */))) {
1138
exec_start_timestamp_ = ObTimeUtility::current_time();
1139
if (!THIS_WORKER.need_retry()) {
1140
int cli_ret = OB_SUCCESS;
1141
retry_ctrl_.test_and_save_retry_state(
1142
gctx_, ctx_, result, ret, cli_ret, is_arraybinding_ /*ararybinding only local retry*/);
1143
if (OB_ERR_PROXY_REROUTE == ret) {
1144
LOG_DEBUG("run stmt_query failed, check if need retry",
1145
K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K_(stmt_id));
1147
LOG_WARN("run stmt_query failed, check if need retry",
1148
K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K_(stmt_id));
1152
if (OB_ERR_PROXY_REROUTE == ret && !is_arraybinding_) {
1153
need_response_error = true;
1157
exec_start_timestamp_ = ObTimeUtility::current_time();
1158
result.get_exec_context().set_plan_start_time(exec_start_timestamp_);
1159
// 本分支内如果出错,全部会在response_result内部处理妥当
1162
need_response_error = false;
1163
is_diagnostics_stmt = ObStmt::is_diagnostic_stmt(result.get_literal_stmt_type());
1164
ctx_.is_show_trace_stmt_ = ObStmt::is_show_trace_stmt(result.get_literal_stmt_type());
1165
session.set_current_execution_id(execution_id);
1168
} else if (is_arraybinding_) {
1169
if (OB_FAIL(after_do_process_for_arraybinding(result))) {
1170
LOG_WARN("failed to process arraybinding sql", K(ret));
1172
} else if (OB_FAIL(response_result(result,
1175
async_resp_used))) {
1176
ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
1177
if (OB_ISNULL(plan_ctx)) {
1178
LOG_ERROR("execute query fail, and plan_ctx is NULL", K(ret));
1180
LOG_WARN("execute query fail", K(ret), "timeout_timestamp",
1181
plan_ctx->get_timeout_timestamp());
1187
int ObMPStmtExecute::do_process(ObSQLSessionInfo &session,
1188
ParamStore *param_store,
1189
const bool has_more_result,
1190
const bool force_sync_resp,
1191
bool &async_resp_used)
1193
int ret = OB_SUCCESS;
1194
ObAuditRecordData &audit_record = session.get_raw_audit_record();
1195
audit_record.try_cnt_++;
1196
bool is_diagnostics_stmt = false;
1197
ObPsStmtId inner_stmt_id = OB_INVALID_ID;
1198
bool need_response_error = is_arraybinding_ ? false : true;
1199
const bool enable_perf_event = lib::is_diagnose_info_enabled();
1200
const bool enable_sql_audit =
1201
GCONF.enable_sql_audit && session.get_local_ob_enable_sql_audit();
1203
single_process_timestamp_ = ObTimeUtility::current_time();
1206
* 注意req_timeinfo_guard一定要放在result前面
1209
ObReqTimeGuard req_timeinfo_guard;
1210
SMART_VAR(ObMySQLResultSet, result, session, THIS_WORKER.get_sql_arena_allocator()) {
1212
ObWaitEventStat total_wait_desc;
1213
int64_t execution_id = 0;
1214
ObDiagnoseSessionInfo *di = ObDiagnoseSessionInfo::get_local_diagnose_info();
1216
ObMaxWaitGuard max_wait_guard(enable_perf_event ? &audit_record.exec_record_.max_wait_event_ : NULL, di);
1217
ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di);
1218
if (enable_perf_event) {
1219
audit_record.exec_record_.record_start(di);
1222
result.set_has_more_result(has_more_result);
1223
result.set_ps_protocol();
1224
ObTaskExecutorCtx *task_ctx = result.get_exec_context().get_task_executor_ctx();
1225
if (OB_ISNULL(task_ctx)) {
1226
ret = OB_ERR_UNEXPECTED;
1227
LOG_ERROR("task executor ctx can not be NULL", K(task_ctx), K(ret));
1229
task_ctx->schema_service_ = gctx_.schema_service_;
1230
task_ctx->set_query_tenant_begin_schema_version(retry_ctrl_.get_tenant_local_schema_version());
1231
task_ctx->set_query_sys_begin_schema_version(retry_ctrl_.get_sys_local_schema_version());
1232
task_ctx->set_min_cluster_version(GET_MIN_CLUSTER_VERSION());
1234
ctx_.retry_times_ = retry_ctrl_.get_retry_times();
1235
if (OB_ISNULL(ctx_.schema_guard_)) {
1236
ret = OB_INVALID_ARGUMENT;
1237
LOG_WARN("newest schema is NULL", K(ret));
1238
} else if (OB_FAIL(result.init())) {
1239
LOG_WARN("result set init failed", K(ret));
1240
} else if (OB_ISNULL(gctx_.sql_engine_) || OB_ISNULL(param_store)) {
1241
ret = OB_ERR_UNEXPECTED;
1242
LOG_ERROR("invalid sql engine", K(ret), K(gctx_), K(param_store));
1243
} else if (FALSE_IT(execution_id = gctx_.sql_engine_->get_execution_id())) {
1245
} else if (OB_FAIL(set_session_active(session))) {
1246
LOG_WARN("fail to set session active", K(ret));
1248
if (is_prexecute()) {
1249
ret = static_cast<ObMPStmtPrexecute*>(this)->
1250
execute_response(session,
1256
need_response_error,
1257
is_diagnostics_stmt,
1263
ret = execute_response(session,
1266
need_response_error,
1267
is_diagnostics_stmt,
1273
if ((OB_SUCC(ret) && is_diagnostics_stmt) || async_resp_used) {
1274
// if diagnostic stmt succeed, no need to clear warning buf.
1275
// or async resp is used, it will be cleared in callback thread.
1276
session.update_show_warnings_buf();
1278
session.set_show_warnings_buf(ret);
1283
exec_end_timestamp_ = ObTimeUtility::current_time();
1285
// some statistics must be recorded for plan stat, even though sql audit disabled
1286
bool first_record = (1 == audit_record.try_cnt_);
1287
ObExecStatUtils::record_exec_timestamp(*this, first_record, audit_record.exec_timestamp_);
1288
audit_record.exec_timestamp_.update_stage_time();
1290
if (enable_perf_event) {
1291
audit_record.exec_record_.record_end(di);
1292
record_stat(result.get_stmt_type(), exec_end_timestamp_);
1293
audit_record.exec_record_.wait_time_end_ = total_wait_desc.time_waited_;
1294
audit_record.exec_record_.wait_count_end_ = total_wait_desc.total_waits_;
1295
audit_record.update_event_stage_state();
1298
if (enable_perf_event && !THIS_THWORKER.need_retry()
1299
&& OB_NOT_NULL(result.get_physical_plan())) {
1300
const int64_t time_cost = exec_end_timestamp_ - get_receive_timestamp();
1301
ObSQLUtils::record_execute_time(result.get_physical_plan()->get_plan_type(), time_cost);
1306
&& need_response_error
1308
&& !THIS_WORKER.need_retry()
1309
&& !retry_ctrl_.need_retry()) {
1310
LOG_WARN("query failed", K(ret), K(retry_ctrl_.need_retry()), K_(stmt_id));
1311
// 当need_retry=false时,可能给客户端回过包了,可能还没有回过任何包。
1312
// 不过,可以确定:这个请求出错了,还没处理完。如果不是已经交给异步EndTrans收尾,
1313
// 则需要在下面回复一个error_packet作为收尾。否则后面没人帮忙发错误包给客户端了,
1315
bool is_partition_hit = session.get_err_final_partition_hit(ret);
1316
int err = send_error_packet(ret, NULL, is_partition_hit, (void *)ctx_.get_reroute_info());
1317
if (OB_SUCCESS != err) { // 发送error包
1318
LOG_WARN("send error packet failed", K(ret), K(err));
1323
audit_record.status_ =
1324
(0 == ret || OB_ITER_END == ret) ? REQUEST_SUCC : (ret);
1325
if (enable_sql_audit && !is_ps_cursor()) {
1326
ObPhysicalPlan *plan = result.get_physical_plan();
1327
audit_record.seq_ = 0; //don't use now
1328
audit_record.execution_id_ = execution_id;
1329
audit_record.client_addr_ = session.get_peer_addr();
1330
audit_record.user_client_addr_ = session.get_user_client_addr();
1331
audit_record.user_group_ = THIS_WORKER.get_group_id();
1332
MEMCPY(audit_record.sql_id_, ctx_.sql_id_, (int32_t)sizeof(audit_record.sql_id_));
1334
audit_record.plan_type_ = plan->get_plan_type();
1335
audit_record.table_scan_ = plan->contain_table_scan();
1336
audit_record.plan_id_ = plan->get_plan_id();
1337
audit_record.plan_hash_ = plan->get_plan_hash_value();
1338
audit_record.rule_name_ = const_cast<char *>(plan->get_rule_name().ptr());
1339
audit_record.rule_name_len_ = plan->get_rule_name().length();
1340
audit_record.partition_hit_ = session.partition_hit().get_bool();
1342
audit_record.affected_rows_ = result.get_affected_rows();
1343
audit_record.return_rows_ = result.get_return_rows();
1344
audit_record.partition_cnt_ =
1345
result.get_exec_context().get_das_ctx().get_related_tablet_cnt();
1346
audit_record.expected_worker_cnt_ =
1347
result.get_exec_context().get_task_exec_ctx().get_expected_worker_cnt();
1348
audit_record.used_worker_cnt_ =
1349
result.get_exec_context().get_task_exec_ctx().get_admited_worker_cnt();
1351
audit_record.is_executor_rpc_ = false;
1352
audit_record.is_inner_sql_ = false;
1353
audit_record.is_hit_plan_cache_ = result.get_is_from_plan_cache();
1354
audit_record.sql_ = const_cast<char *>(ctx_.cur_sql_.ptr());
1355
audit_record.sql_len_ = min(ctx_.cur_sql_.length(), OB_MAX_SQL_LENGTH);
1356
audit_record.sql_cs_type_ = session.get_local_collation_connection();
1357
audit_record.ps_stmt_id_ = stmt_id_;
1358
audit_record.ps_inner_stmt_id_ = inner_stmt_id;
1359
audit_record.params_value_ = params_value_;
1360
audit_record.params_value_len_ = params_value_len_;
1361
audit_record.is_perf_event_closed_ = !lib::is_diagnose_info_enabled();
1363
ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
1364
if (OB_NOT_NULL(plan_ctx)) {
1365
audit_record.consistency_level_ = plan_ctx->get_consistency_level();
1369
//update v$sql statistics
1370
if (session.get_local_ob_enable_plan_cache()
1371
&& !retry_ctrl_.need_retry()
1372
&& !is_ps_cursor()) {
1373
// ps cursor do this in inner open
1374
ObIArray<ObTableRowCount> *table_row_count_list = NULL;
1375
ObPhysicalPlan *plan = result.get_physical_plan();
1376
ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
1377
if (OB_NOT_NULL(plan_ctx)) {
1378
table_row_count_list = &(plan_ctx->get_table_row_count_list());
1379
audit_record.table_scan_stat_ = plan_ctx->get_table_scan_stat();
1382
if (!(ctx_.self_add_plan_) && ctx_.plan_cache_hit_) {
1383
plan->update_plan_stat(audit_record,
1384
false, // false mean not first update plan stat
1385
result.get_exec_context().get_is_evolution(),
1386
table_row_count_list);
1387
plan->update_cache_access_stat(audit_record.table_scan_stat_);
1388
} else if (ctx_.self_add_plan_ && !ctx_.plan_cache_hit_) {
1389
plan->update_plan_stat(audit_record,
1391
result.get_exec_context().get_is_evolution(),
1392
table_row_count_list);
1393
plan->update_cache_access_stat(audit_record.table_scan_stat_);
1394
} else if (ctx_.self_add_plan_ && ctx_.plan_cache_hit_) {
1395
// spm evolution plan first execute
1396
plan->update_plan_stat(audit_record,
1398
result.get_exec_context().get_is_evolution(),
1399
table_row_count_list);
1400
plan->update_cache_access_stat(audit_record.table_scan_stat_);
1405
// reset thread waring buffer in sync mode
1406
if (!async_resp_used) {
1407
clear_wb_content(session);
1410
bool need_retry = (THIS_THWORKER.need_retry()
1411
|| RETRY_TYPE_NONE != retry_ctrl_.get_retry_type());
1412
if (!is_ps_cursor()) {
1415
(void)ObSQLUtils::handle_plan_baseline(audit_record, result.get_physical_plan(), ret, ctx_);
1418
// ps cursor has already record after inner_open in spi
1419
ObSQLUtils::handle_audit_record(need_retry, EXECUTE_PS_EXECUTE, session, ctx_.is_sensitive_);
1425
// return false only if send packet fail.
1426
int ObMPStmtExecute::response_result(
1427
ObMySQLResultSet &result,
1428
ObSQLSessionInfo &session,
1429
bool force_sync_resp,
1430
bool &async_resp_used)
1432
int ret = OB_SUCCESS;
1434
bool need_trans_cb = result.need_end_trans_callback() && (!force_sync_resp);
1436
bool need_trans_cb = result.need_end_trans_callback() &&
1437
(!force_sync_resp) &&
1438
(!ctx_.spm_ctx_.check_execute_status_);
1441
// NG_TRACE_EXT(exec_begin, ID(arg1), force_sync_resp, ID(end_trans_cb), need_trans_cb);
1443
if (OB_LIKELY(NULL != result.get_physical_plan())) {
1444
if (need_trans_cb) {
1445
ObAsyncPlanDriver drv(gctx_, ctx_, session, retry_ctrl_, *this, is_prexecute());
1446
// NOTE: sql_end_cb必须在drv.response_result()之前初始化好
1447
ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
1448
if (OB_FAIL(sql_end_cb.init(packet_sender_, &session,
1449
stmt_id_, params_num_,
1450
is_prexecute() ? packet_sender_.get_comp_seq() : 0))) {
1451
LOG_WARN("failed to init sql end callback", K(ret));
1452
} else if (OB_FAIL(drv.response_result(result))) {
1453
LOG_WARN("fail response async result", K(ret));
1455
async_resp_used = result.is_async_end_trans_submitted();
1457
// 试点ObQuerySyncDriver
1458
int32_t iteration_count = OB_INVALID_COUNT;
1459
if (is_prexecute()) {
1460
iteration_count = static_cast<ObMPStmtPrexecute*>(this)->get_iteration_count();
1462
ObSyncPlanDriver drv(gctx_,
1469
ret = drv.response_result(result);
1472
if (need_trans_cb) {
1473
ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
1474
ObAsyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this, is_prexecute());
1475
if (OB_FAIL(sql_end_cb.init(packet_sender_, &session,
1476
stmt_id_, params_num_,
1477
is_prexecute() ? packet_sender_.get_comp_seq() : 0))) {
1478
LOG_WARN("failed to init sql end callback", K(ret));
1479
} else if (OB_FAIL(drv.response_result(result))) {
1480
LOG_WARN("fail response async result", K(ret));
1482
LOG_DEBUG("use async cmd driver success!",
1483
K(result.get_stmt_type()), K(session.get_local_autocommit()));
1485
async_resp_used = result.is_async_end_trans_submitted();
1487
ObSyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this, is_prexecute());
1488
session.set_pl_query_sender(&drv);
1489
session.set_ps_protocol(result.is_ps_protocol());
1490
if (OB_FAIL(drv.response_result(result))) {
1491
LOG_WARN("failed response sync result", K(ret));
1493
LOG_DEBUG("use sync cmd driver success!",
1494
K(result.get_stmt_type()), K(session.get_local_autocommit()));
1496
session.set_pl_query_sender(NULL);
1499
// NG_TRACE(exec_end);
1503
OB_NOINLINE int ObMPStmtExecute::process_retry(ObSQLSessionInfo &session,
1504
ParamStore *param_store,
1505
bool has_more_result,
1506
bool force_sync_resp,
1507
bool &async_resp_used)
1509
int ret = OB_SUCCESS;
1510
//create a temporary memory context to process retry, avoid memory bloat caused by retries
1511
lib::ContextParam param;
1512
param.set_mem_attr(MTL_ID(),
1513
ObModIds::OB_SQL_EXECUTOR, ObCtxIds::DEFAULT_CTX_ID)
1514
.set_properties(lib::USE_TL_PAGE_OPTIONAL)
1515
.set_page_size(!lib::is_mini_mode() ? OB_MALLOC_BIG_BLOCK_SIZE
1516
: OB_MALLOC_MIDDLE_BLOCK_SIZE)
1517
.set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE);
1518
CREATE_WITH_TEMP_CONTEXT(param) {
1519
ret = do_process(session,
1529
int ObMPStmtExecute::do_process_single(ObSQLSessionInfo &session,
1530
ParamStore *param_store,
1531
bool has_more_result,
1532
bool force_sync_resp,
1533
bool &async_resp_used)
1535
int ret = OB_SUCCESS;
1537
ctx_.self_add_plan_ = false;
1538
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
1540
// 每次都必须设置为OB_SCCESS, 否则可能会因为没有调用do_process()造成死循环
1542
share::schema::ObSchemaGetterGuard schema_guard;
1543
int64_t tenant_version = 0;
1544
int64_t sys_version = 0;
1545
retry_ctrl_.clear_state_before_each_retry(session.get_retry_info_for_update());
1546
OZ (gctx_.schema_service_->get_tenant_schema_guard(session.get_effective_tenant_id(),
1548
OZ (schema_guard.get_schema_version(session.get_effective_tenant_id(), tenant_version));
1549
OZ (schema_guard.get_schema_version(OB_SYS_TENANT_ID, sys_version));
1550
OX (ctx_.schema_guard_ = &schema_guard);
1551
OX (retry_ctrl_.set_tenant_local_schema_version(tenant_version));
1552
OX (retry_ctrl_.set_sys_local_schema_version(sys_version));
1554
if (OB_SUCC(ret) && !is_send_long_data()) {
1555
if (OB_LIKELY(session.get_is_in_retry())
1556
|| (is_arraybinding_ && (prepare_packet_sent_ || !is_prexecute()))) {
1557
ret = process_retry(session,
1563
ret = do_process(session,
1570
session.set_session_in_retry(retry_ctrl_.need_retry());
1572
} while (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type());
1574
if (OB_SUCC(ret) && retry_ctrl_.get_retry_times() > 0) {
1575
// 经过重试之后才成功的,把sql打印出来。这里只能覆盖到本地重试的情况,没法覆盖到扔回队列重试的情况。
1576
// 如果需要重试则ret不可能为OB_SUCCESS,因此这里不用判断retry_type。
1577
LOG_TRACE("sql retry",
1578
K(ret), "retry_times", retry_ctrl_.get_retry_times(), "sql", ctx_.cur_sql_);
1583
int ObMPStmtExecute::is_arraybinding_returning(sql::ObSQLSessionInfo &session, bool &is_ab_return)
1585
int ret = OB_SUCCESS;
1586
ObPsCache *ps_cache = NULL;
1587
ObPsStmtId inner_stmt_id = OB_INVALID_ID;
1588
ObPsStmtInfoGuard guard;
1589
ObPsStmtInfo *ps_info = NULL;
1590
is_ab_return = false;
1591
if (OB_ISNULL(ps_cache = session.get_ps_cache())) {
1592
ret = OB_INVALID_ARGUMENT;
1593
LOG_ERROR("physical plan context or ps plan cache is NULL or schema_guard is null",
1594
K(ret), K(ps_cache));
1595
} else if (OB_FAIL(session.get_inner_ps_stmt_id(stmt_id_, inner_stmt_id))) {
1596
LOG_WARN("fail to get inner ps stmt_id", K(ret), K(stmt_id_), K(inner_stmt_id));
1597
} else if (OB_FAIL(session.get_ps_cache()->get_stmt_info_guard(inner_stmt_id, guard))) {
1598
LOG_WARN("get stmt info guard failed", K(ret), K(stmt_id_), K(inner_stmt_id));
1599
} else if (OB_ISNULL(ps_info = guard.get_stmt_info())) {
1600
ret = OB_ERR_UNEXPECTED;
1601
LOG_WARN("get stmt info is null", K(ret));
1602
} else if (ps_info->get_num_of_returning_into() > 0) {
1603
is_ab_return = true;
1604
LOG_TRACE("is arraybinding returning", K(ret), KPC(ps_info));
1609
int ObMPStmtExecute::try_batch_multi_stmt_optimization(ObSQLSessionInfo &session,
1610
bool has_more_result,
1611
bool force_sync_resp,
1612
bool &async_resp_used,
1613
bool &optimization_done)
1615
// 1. save_exception 不能batch
1616
// 2. returning 不能batch
1617
int ret = OB_SUCCESS;
1618
optimization_done = false;
1619
ctx_.multi_stmt_item_.set_ps_mode(true);
1620
ctx_.multi_stmt_item_.set_ab_cnt(arraybinding_size_);
1621
bool is_ab_returning = false;
1622
ParamStore *array_binding_params = NULL;
1623
bool enable_batch_opt = session.is_enable_batched_multi_statement();
1624
bool use_plan_cache = session.get_local_ob_enable_plan_cache();
1625
ObIAllocator &alloc = CURRENT_CONTEXT->get_arena_allocator();
1627
if (!enable_batch_opt) {
1629
LOG_TRACE("not open the batch optimization");
1630
} else if (!use_plan_cache) {
1631
LOG_TRACE("not enable the plan_cache", K(use_plan_cache));
1633
} else if (!is_prexecute()) {
1635
} else if (is_pl_stmt(stmt_type_)) {
1636
LOG_TRACE("is pl execution, can't do the batch optimization");
1637
} else if (1 == arraybinding_size_) {
1638
LOG_TRACE("arraybinding size is 1, not need d batch");
1639
} else if (get_save_exception()) {
1640
LOG_TRACE("is save exception mode, not supported batch optimization");
1641
} else if (OB_FAIL(is_arraybinding_returning(session, is_ab_returning))) {
1642
LOG_WARN("failed to check is arraybinding returning", K(ret));
1643
} else if (is_ab_returning) {
1644
LOG_TRACE("returning not support the batch optimization");
1645
} else if (OB_FAIL(ObSQLUtils::transform_pl_ext_type(*arraybinding_params_,
1648
array_binding_params))) {
1649
LOG_WARN("fail to trans_form extend type params_store", K(ret), K(arraybinding_size_));
1650
} else if (OB_FAIL(do_process_single(session, array_binding_params, has_more_result, force_sync_resp, async_resp_used))) {
1652
if (THIS_WORKER.need_retry()) {
1653
// just go back to large query queue and retry
1654
} else if (OB_BATCHED_MULTI_STMT_ROLLBACK == ret) {
1655
LOG_TRACE("batched multi_stmt needs rollback", K(ret));
1658
// 无论什么报错,都走单行执行一次,用于容错
1661
LOG_WARN("failed to process batch stmt, cover the error code, reset retry flag, then execute with single row",
1662
K(ret_tmp), K(ret), K(THIS_WORKER.need_retry()));
1665
optimization_done = true;
1667
LOG_TRACE("after try batched multi-stmt optimization", K(ret), K(stmt_type_), K(use_plan_cache),
1668
K(optimization_done), K(enable_batch_opt), K(is_ab_returning), K(THIS_WORKER.need_retry()), K(arraybinding_size_));
1672
int ObMPStmtExecute::process_execute_stmt(const ObMultiStmtItem &multi_stmt_item,
1673
ObSQLSessionInfo &session,
1674
bool has_more_result,
1675
bool force_sync_resp,
1676
bool &async_resp_used)
1678
int ret = OB_SUCCESS;
1679
bool need_response_error = true;
1681
// 执行setup_wb后,所有WARNING都会写入到当前session的WARNING BUFFER中
1683
const bool enable_trace_log = lib::is_trace_log_enabled();
1684
//============================ 注意这些变量的生命周期 ================================
1685
ObSMConnection *conn = get_conn();
1686
ObSessionStatEstGuard stat_est_guard(conn->tenant_->id(), session.get_sessid());
1687
if (OB_FAIL(init_process_var(ctx_, multi_stmt_item, session))) {
1688
LOG_WARN("init process var failed.", K(ret), K(multi_stmt_item));
1690
if (enable_trace_log) {
1691
//set session log_level.Must use ObThreadLogLevelUtils::clear() in pair
1692
ObThreadLogLevelUtils::init(session.get_log_id_level_map());
1694
// obproxy may use 'SET @@last_schema_version = xxxx' to set newest schema,
1695
// observer will force refresh schema if local_schema_version < last_schema_version;
1696
if (OB_FAIL(check_and_refresh_schema(session.get_login_tenant_id(),
1697
session.get_effective_tenant_id()))) {
1698
LOG_WARN("failed to check_and_refresh_schema", K(ret));
1699
} else if (OB_FAIL(session.update_timezone_info())) {
1700
LOG_WARN("fail to update time zone info", K(ret));
1701
} else if (is_arraybinding_) {
1702
need_response_error = false;
1703
bool optimization_done = false;
1704
if (ctx_.can_reroute_sql_) {
1705
ctx_.can_reroute_sql_ = false;
1706
LOG_INFO("arraybinding not support reroute sql.");
1708
ObSEArray<ObSavedException, 4> exception_array;
1709
if (OB_UNLIKELY(arraybinding_size_ <= 0)) {
1710
ret = OB_NOT_SUPPORTED;
1711
LOG_WARN("arraybinding has no parameters", K(ret), K(arraybinding_size_));
1712
LOG_USER_ERROR(OB_NOT_SUPPORTED, "oci arraybinding has no parameters");
1713
} else if (OB_FAIL(try_batch_multi_stmt_optimization(session,
1716
async_resp_used, optimization_done))) {
1717
LOG_WARN("fail to try_batch_multi_stmt_optimization", K(ret));
1718
} else if (!optimization_done) {
1719
ctx_.multi_stmt_item_.set_ps_mode(true);
1720
ctx_.multi_stmt_item_.set_ab_cnt(0);
1721
for (int64_t i = 0; OB_SUCC(ret) && i < arraybinding_size_; ++i) {
1722
set_curr_sql_idx(i);
1723
OZ (construct_execute_param_for_arraybinding(i));
1724
OZ (do_process_single(session, params_, has_more_result, force_sync_resp, async_resp_used));
1726
if (is_save_exception_ && !is_prexecute()) {
1727
// The old ps protocol will only collect error information here,
1728
// and the new one has already done fault tolerance in the front
1729
ret = save_exception_for_arraybinding(i, ret, exception_array);
1733
// If there is still an error in the new ps protocol,
1734
// then send an err package,
1735
// indicating that the server has an error that is not expected by the customer
1736
need_response_error = true;
1743
reset_complex_param_memory(arraybinding_params_, session);
1744
OZ (response_result_for_arraybinding(session, exception_array));
1746
need_response_error = false;
1747
if (OB_FAIL(do_process_single(session, params_, has_more_result, force_sync_resp, async_resp_used))) {
1748
LOG_WARN("fail to do process", K(ret), K(ctx_.cur_sql_));
1750
if (OB_UNLIKELY(NULL != GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && is_conn_valid()) {
1752
ObSQLSessionInfo *sess = NULL;
1753
if (OB_FAIL(get_session(sess))) {
1754
LOG_WARN("get session fail", K(ret));
1755
} else if (OB_ISNULL(sess)) {
1756
ret = OB_ERR_UNEXPECTED;
1757
LOG_WARN("session is NULL or invalid", K(ret));
1759
// Call setup_user_resource_group no matter OB_SUCC or OB_FAIL
1760
if (OB_FAIL(setup_user_resource_group(*conn, sess->get_effective_tenant_id(), sess))) {
1761
LOG_WARN("fail setup user resource group", K(ret));
1765
revert_session(sess);
1767
ret = OB_SUCC(bak_ret) ? ret : bak_ret;
1769
reset_complex_param_memory(params_, session);
1771
if (enable_trace_log) {
1772
ObThreadLogLevelUtils::clear();
1774
const int64_t debug_sync_timeout = GCONF.debug_sync_timeout;
1775
if (debug_sync_timeout > 0) {
1776
// ignore thread local debug sync actions to session actions failed
1777
int tmp_ret = OB_SUCCESS;
1778
tmp_ret = GDS.collect_result_actions(session.get_debug_sync_actions());
1779
if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
1780
LOG_WARN("set thread local debug sync actions to session actions failed", K(tmp_ret));
1785
//对于tracelog的处理, 不影响正常逻辑, 错误码无须赋值给ret, 清空WARNING BUFFER
1786
do_after_process(session, ctx_, async_resp_used);
1788
if (OB_FAIL(ret) && need_response_error && is_conn_valid()) {
1789
send_error_packet(ret, NULL, (void *)(ctx_.get_reroute_info()));
1796
int ObMPStmtExecute::process()
1798
int ret = OB_SUCCESS;
1799
int flush_ret = OB_SUCCESS;
1800
trace::UUID ps_execute_span_id;
1801
ObSQLSessionInfo *sess = NULL;
1802
bool need_response_error = true;
1803
bool need_disconnect = true;
1804
bool async_resp_used = false; // 由事务提交线程异步回复客户端
1805
int64_t query_timeout = 0;
1807
ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id();
1808
ObSMConnection *conn = get_conn();
1809
if (OB_ISNULL(req_) || OB_ISNULL(conn) || OB_ISNULL(cur_trace_id)) {
1810
ret = OB_ERR_UNEXPECTED;
1811
LOG_WARN("null conn ptr", K_(stmt_id), K_(req), K(cur_trace_id), K(ret));
1812
} else if (OB_UNLIKELY(!conn->is_in_authed_phase())) {
1813
ret = OB_ERR_NO_PRIVILEGE;
1814
LOG_WARN("receive sql without session", K_(stmt_id), K(ret));
1815
} else if (OB_ISNULL(conn->tenant_)) {
1816
ret = OB_ERR_UNEXPECTED;
1817
LOG_ERROR("invalid tenant", K_(stmt_id), K(conn->tenant_), K(ret));
1818
} else if (OB_FAIL(get_session(sess))) {
1819
LOG_WARN("get session fail", K_(stmt_id), K(ret));
1820
} else if (OB_ISNULL(sess)) {
1821
ret = OB_ERR_UNEXPECTED;
1822
LOG_WARN("session is NULL or invalid", K_(stmt_id), K(sess), K(ret));
1823
} else if (OB_FAIL(update_transmission_checksum_flag(*sess))) {
1824
LOG_WARN("update transmisson checksum flag failed", K(ret));
1826
ObSQLSessionInfo &session = *sess;
1827
int64_t tenant_version = 0;
1828
int64_t sys_version = 0;
1829
THIS_WORKER.set_session(sess);
1830
lib::CompatModeGuard g(sess->get_compatibility_mode() == ORACLE_MODE ?
1831
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL);
1832
ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
1833
session.set_current_trace_id(ObCurTraceId::get_trace_id());
1834
session.init_use_rich_format();
1835
session.get_raw_audit_record().request_memory_used_ = 0;
1836
observer::ObProcessMallocCallback pmcb(0,
1837
session.get_raw_audit_record().request_memory_used_);
1838
lib::ObMallocCallbackGuard guard(pmcb);
1839
session.set_thread_id(GETTID());
1840
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
1841
int64_t packet_len = pkt.get_clen();
1842
if (OB_UNLIKELY(!session.is_valid())) {
1843
ret = OB_ERR_UNEXPECTED;
1844
LOG_ERROR("invalid session", K_(stmt_id), K(ret));
1845
} else if (OB_FAIL(process_kill_client_session(session))) {
1846
LOG_WARN("client session has been killed", K(ret));
1847
} else if (OB_UNLIKELY(session.is_zombie())) {
1848
//session has been killed some moment ago
1849
ret = OB_ERR_SESSION_INTERRUPTED;
1850
LOG_WARN("session has been killed", K(session.get_session_state()), K_(stmt_id),
1851
K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(), K(ret));
1852
} else if (OB_FAIL(session.check_and_init_retry_info(*cur_trace_id, ctx_.cur_sql_))) {
1853
LOG_WARN("fail to check and init retry info", K(ret), K(*cur_trace_id), K(ctx_.cur_sql_));
1854
} else if (OB_FAIL(session.get_query_timeout(query_timeout))) {
1855
LOG_WARN("fail to get query timeout", K(ret));
1856
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
1857
session.get_effective_tenant_id(), tenant_version))) {
1858
LOG_WARN("fail get tenant broadcast version", K(ret));
1859
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
1860
OB_SYS_TENANT_ID, sys_version))) {
1861
LOG_WARN("fail get tenant broadcast version", K(ret));
1862
} else if (pkt.exist_trace_info()
1863
&& OB_FAIL(session.update_sys_variable(SYS_VAR_OB_TRACE_INFO,
1864
pkt.get_trace_info()))) {
1865
LOG_WARN("fail to update trace info", K(ret));
1866
} else if (FALSE_IT(session.set_txn_free_route(pkt.txn_free_route()))) {
1867
} else if (OB_FAIL(process_extra_info(session, pkt, need_response_error))) {
1868
LOG_WARN("fail get process extra info", K(ret));
1869
} else if (FALSE_IT(session.post_sync_session_info())) {
1870
} else if (OB_UNLIKELY(packet_len > session.get_max_packet_size())) {
1871
//packet size check with session variable max_allowd_packet or net_buffer_length
1872
ret = OB_ERR_NET_PACKET_TOO_LARGE;
1873
LOG_WARN("packet too large than allowed for the session", K_(stmt_id), K(ret));
1874
} else if (OB_FAIL(sql::ObFLTUtils::init_flt_info(pkt.get_extra_info(), session,
1875
conn->proxy_cap_flags_.is_full_link_trace_support()))) {
1876
LOG_WARN("failed to init flt extra info", K(ret));
1877
} else if (OB_FAIL(session.gen_configs_in_pc_str())) {
1878
LOG_WARN("fail to generate configuration string that can influence execution plan", K(ret));
1880
FLTSpanGuard(ps_execute);
1881
FLT_SET_TAG(log_trace_id, ObCurTraceId::get_trace_id_str(),
1882
receive_ts, get_receive_timestamp(),
1883
client_info, session.get_client_info(),
1884
module_name, session.get_module_name(),
1885
action_name, session.get_action_name(),
1886
sess_id, session.get_sessid());
1887
THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
1888
retry_ctrl_.set_tenant_global_schema_version(tenant_version);
1889
retry_ctrl_.set_sys_global_schema_version(sys_version);
1890
session.partition_hit().reset();
1891
session.set_pl_can_retry(true);
1892
ObLockWaitNode &lock_wait_node = req_->get_lock_wait_node();
1893
lock_wait_node.set_session_info(session.get_sessid());
1895
need_response_error = false;
1896
need_disconnect = false;
1897
ret = process_execute_stmt(ObMultiStmtItem(false, 0, ObString()),
1900
false, // force_sync_resp
1903
// 退出前打印出SQL语句,便于定位各种问题
1905
if (OB_EAGAIN == ret) {
1906
//large query, do nothing
1907
} else if (is_conn_valid()) {// The memory of sql string is invalid if conn_valid_ has been set false.
1908
LOG_WARN("fail execute sql", "sql_id", ctx_.sql_id_, K_(stmt_id), K(ret));
1910
LOG_WARN("fail execute sql", K(ret));
1914
session.check_and_reset_retry_info(*cur_trace_id, THIS_WORKER.need_retry());
1915
session.set_last_trace_id(ObCurTraceId::get_trace_id());
1917
if (!retry_ctrl_.need_retry()) {
1918
// if no retry would be performed any more, clear the piece cache
1919
ObPieceCache *piece_cache = nullptr;
1920
int upper_scope_ret = ret;
1922
piece_cache = session.get_piece_cache();
1923
if (OB_NOT_NULL(piece_cache)) {
1924
for (uint64_t i = 0; OB_SUCC(ret) && i < params_num_; i++) {
1925
if (OB_FAIL(piece_cache->remove_piece(
1926
piece_cache->get_piece_key(stmt_id_, i), session))) {
1927
if (OB_HASH_NOT_EXIST == ret) {
1929
LOG_INFO("piece hash not exist", K(ret), K(stmt_id_), K(i));
1931
need_disconnect = true;
1932
LOG_WARN("remove piece fail", K(ret), K(need_disconnect), K(stmt_id_), K(i));
1937
LOG_DEBUG("piece_cache_ is null");
1939
ret = upper_scope_ret;
1942
record_flt_trace(session);
1945
if (OB_NOT_NULL(sess) && !sess->get_in_transaction()) {
1946
// transcation ends, end trace
1950
if (OB_FAIL(ret) && is_conn_valid()) {
1951
if (need_response_error) {
1952
send_error_packet(ret, NULL, (void *)(ctx_.get_reroute_info()));
1954
if (need_disconnect) {
1956
LOG_WARN("disconnect connection when process query", K(ret));
1960
// 如果已经异步回包,则这部分逻辑在cb中执行,这里跳过flush_buffer()
1961
if (!THIS_WORKER.need_retry()) {
1962
if (async_resp_used) {
1963
async_resp_used_ = true;
1964
packet_sender_.disable_response();
1966
flush_ret = flush_buffer(true);
1972
THIS_WORKER.set_session(NULL);
1974
revert_session(sess); //current ignore revert session ret
1977
return (OB_SUCCESS != ret) ? ret : flush_ret;
1980
int ObMPStmtExecute::get_udt_by_name(ObString relation_name,
1982
const share::schema::ObUDTTypeInfo *&udt_info)
1984
int ret = OB_SUCCESS;
1985
ObString new_relation_name;
1986
CK (OB_NOT_NULL(ctx_.schema_guard_));
1987
CK (OB_NOT_NULL(ctx_.session_info_));
1989
if (relation_name.empty()) {
1990
if (ctx_.session_info_->get_database_name().empty()) {
1991
ret = OB_ERR_NO_DB_SELECTED;
1992
LOG_WARN("no select no database", K(ret));
1994
new_relation_name = ctx_.session_info_->get_database_name();
1997
new_relation_name = relation_name;
2001
uint64_t database_id = OB_INVALID_ID;
2002
OZ (ctx_.schema_guard_->get_database_id(ctx_.session_info_->get_effective_tenant_id(),
2005
CK (OB_LIKELY(OB_INVALID_ID != database_id));
2006
OZ (ctx_.schema_guard_->get_udt_info(ctx_.session_info_->get_effective_tenant_id(),
2011
if (OB_ISNULL(udt_info)) {
2013
if (relation_name.empty()
2014
|| relation_name.case_compare("oceanbase")
2015
|| relation_name.case_compare("sys")) {
2016
OZ (ctx_.schema_guard_->get_udt_info(OB_SYS_TENANT_ID,
2023
if (OB_SUCC(ret) && OB_ISNULL(udt_info)) {
2024
ret = OB_ENTRY_NOT_EXIST;
2025
LOG_WARN("udt not exist", K(ret), K(relation_name), K(type_name));
2031
int ObMPStmtExecute::get_package_type_by_name(ObIAllocator &allocator,
2032
const TypeInfo *type_info,
2033
const pl::ObUserDefinedType *&pl_type)
2035
int ret = OB_SUCCESS;
2036
const share::schema::ObPackageInfo *package_info = NULL;
2037
int64_t compatible_mode = lib::is_oracle_mode() ? COMPATIBLE_ORACLE_MODE
2038
: COMPATIBLE_MYSQL_MODE;
2039
ObSchemaChecker schema_checker;
2040
CK (OB_NOT_NULL(type_info));
2041
CK (OB_NOT_NULL(ctx_.schema_guard_));
2042
CK (OB_NOT_NULL(ctx_.session_info_));
2043
CK (OB_NOT_NULL(ctx_.session_info_->get_pl_engine()));
2044
if (OB_SUCC(ret) && OB_ISNULL(pl_type ))
2045
OZ (schema_checker.init(*ctx_.schema_guard_, ctx_.session_info_->get_sessid()));
2046
OZ (schema_checker.get_package_info(ctx_.session_info_->get_effective_tenant_id(),
2047
type_info->relation_name_,
2048
type_info->package_name_,
2049
share::schema::PACKAGE_TYPE,
2052
CK (OB_NOT_NULL(package_info));
2054
pl::ObPLPackageManager &package_manager
2055
= ctx_.session_info_->get_pl_engine()->get_package_manager();
2056
pl::ObPLPackageGuard package_guard(ctx_.session_info_->get_effective_tenant_id());
2057
pl::ObPLResolveCtx resolve_ctx(allocator,
2058
*(ctx_.session_info_),
2059
*(ctx_.schema_guard_),
2063
OZ (package_manager.get_package_type(
2064
resolve_ctx, package_info->get_package_id(), type_info->type_name_, pl_type));
2065
CK (OB_NOT_NULL(pl_type));
2070
int ObMPStmtExecute::get_pl_type_by_type_info(ObIAllocator &allocator,
2071
const TypeInfo *type_info,
2072
const pl::ObUserDefinedType *&pl_type)
2074
int ret = OB_SUCCESS;
2075
#ifndef OB_BUILD_ORACLE_PL
2076
UNUSEDx(allocator, type_info, pl_type);
2077
ret = OB_NOT_SUPPORTED;
2078
LOG_WARN("not support", K(ret));
2079
LOG_USER_ERROR(OB_NOT_SUPPORTED, "Get PL type by type info is not supported in CE version");
2081
const share::schema::ObUDTTypeInfo *udt_info = NULL;
2082
if (OB_ISNULL(type_info)) {
2083
ret = OB_ERR_UNEXPECTED;
2084
LOG_WARN("type info is null", K(ret), K(type_info));
2085
} else if (!type_info->is_elem_type_) {
2086
if (type_info->package_name_.empty()) {
2087
OZ (get_udt_by_name(type_info->relation_name_, type_info->type_name_, udt_info));
2088
OZ (udt_info->transform_to_pl_type(allocator, pl_type));
2090
OZ (get_package_type_by_name(allocator, type_info, pl_type));
2094
pl::ObNestedTableType *table_type = NULL;
2095
pl::ObPLDataType elem_type;
2096
const pl::ObUserDefinedType *elem_type_ptr = NULL;
2097
if (type_info->elem_type_.get_obj_type() != ObExtendType) {
2098
elem_type.set_data_type(type_info->elem_type_);
2099
} else if (OB_FAIL(get_udt_by_name(type_info->relation_name_, type_info->type_name_, udt_info))) {
2100
LOG_WARN("failed to get udt info", K(ret), K(type_info->relation_name_), K(type_info->type_name_));
2101
} else if (OB_FAIL(udt_info->transform_to_pl_type(allocator, elem_type_ptr))) {
2102
LOG_WARN("failed to transform udt to pl type", K(ret));
2103
} else if (OB_ISNULL(elem_type_ptr)) {
2104
ret = OB_ERR_UNEXPECTED;
2105
LOG_WARN("failed to get elem type ptr", K(ret));
2107
elem_type = *(static_cast<const pl::ObPLDataType*>(elem_type_ptr));
2110
} else if (OB_ISNULL(ptr = allocator.alloc(sizeof(pl::ObNestedTableType)))) {
2111
ret = OB_ALLOCATE_MEMORY_FAILED;
2112
LOG_WARN("failed to allocate memory for ObNestedTableType", K(ret));
2114
table_type = new(ptr)pl::ObNestedTableType();
2115
table_type->set_type_from(pl::ObPLTypeFrom::PL_TYPE_LOCAL);
2116
table_type->set_element_type(elem_type);
2117
pl_type = table_type;
2120
CK (OB_NOT_NULL(pl_type));
2125
int ObMPStmtExecute::parse_complex_param_value(ObIAllocator &allocator,
2126
const ObCharsetType charset,
2127
const ObCollationType cs_type,
2128
const ObCollationType ncs_type,
2130
const common::ObTimeZoneInfo *tz_info,
2131
TypeInfo *type_info,
2134
int ret = OB_SUCCESS;
2135
const pl::ObUserDefinedType *pl_type = NULL;
2136
int64_t param_size = 0, param_pos = 0;
2137
CK (OB_NOT_NULL(type_info));
2138
OZ (get_pl_type_by_type_info(allocator, type_info, pl_type));
2139
CK (OB_NOT_NULL(pl_type));
2140
OZ (pl_type->init_obj(*(ctx_.schema_guard_), allocator, param, param_size));
2141
OX (param.set_udt_id(pl_type->get_user_type_id()));
2142
OZ (pl_type->deserialize(*(ctx_.schema_guard_), allocator, charset, cs_type, ncs_type,
2143
tz_info, data, reinterpret_cast<char *>(param.get_ext()), param_size, param_pos));
2144
OX (param.set_need_to_check_extend_type(true));
2148
int ObMPStmtExecute::parse_basic_param_value(ObIAllocator &allocator,
2149
const uint32_t type,
2150
const ObCharsetType charset,
2151
const ObCharsetType ncharset,
2152
const ObCollationType cs_type,
2153
const ObCollationType ncs_type,
2155
const common::ObTimeZoneInfo *tz_info,
2157
bool is_complex_element,
2158
ObPSAnalysisChecker *checker,
2161
int ret = OB_SUCCESS;
2164
case MYSQL_TYPE_TINY:
2165
case MYSQL_TYPE_SHORT:
2166
case MYSQL_TYPE_LONG:
2167
case MYSQL_TYPE_LONGLONG: {
2168
if (OB_FAIL(parse_integer_value(type, data, param, allocator, is_complex_element, checker, is_unsigned))) {
2169
LOG_WARN("parse integer value from client failed", K(ret));
2173
case MYSQL_TYPE_FLOAT: {
2175
PS_STATIC_DEFENSE_CHECK(checker, sizeof(value))
2177
MEMCPY(&value, data, sizeof(value));
2178
data += sizeof(value);
2179
param.set_float(value);
2183
case MYSQL_TYPE_ORA_BINARY_FLOAT: {
2185
PS_STATIC_DEFENSE_CHECK(checker, sizeof(value))
2187
MEMCPY(&value, data, sizeof(value));
2188
data += sizeof(value);
2189
param.set_float(value);
2193
case MYSQL_TYPE_DOUBLE: {
2195
PS_STATIC_DEFENSE_CHECK(checker, sizeof(value))
2197
MEMCPY(&value, data, sizeof(value));
2198
data += sizeof(value);
2199
if (lib::is_mysql_mode()) {
2200
param.set_double(value);
2203
int64_t buf_len = 0;
2204
number::ObNumber nb;
2205
const int64_t alloc_size = OB_MAX_DOUBLE_FLOAT_DISPLAY_WIDTH;
2206
if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(alloc_size)))) {
2207
ret = OB_ALLOCATE_MEMORY_FAILED;
2208
LOG_WARN("failed to allocate memory", K(ret));
2209
} else if (FALSE_IT(buf_len = ob_gcvt_strict(value, OB_GCVT_ARG_DOUBLE, alloc_size,
2210
buf, NULL, TRUE/*is_oracle_mode*/,
2211
FALSE/*is_binary_double*/, FALSE))) {
2212
} else if (OB_FAIL(nb.from_sci_opt(buf, buf_len, allocator))) {
2213
LOG_WARN("decode double param to number failed", K(ret));
2215
param.set_number(nb);
2221
case MYSQL_TYPE_ORA_BINARY_DOUBLE: {
2223
PS_STATIC_DEFENSE_CHECK(checker, sizeof(value))
2225
MEMCPY(&value, data, sizeof(value));
2226
data += sizeof(value);
2227
param.set_double(value);
2231
case MYSQL_TYPE_YEAR: {
2233
PS_STATIC_DEFENSE_CHECK(checker, 2)
2235
ObMySQLUtil::get_int2(data, value);
2236
param.set_year(static_cast<uint8_t>(value));
2240
case MYSQL_TYPE_DATE:
2241
case MYSQL_TYPE_DATETIME:
2242
case MYSQL_TYPE_TIMESTAMP: {
2243
if (OB_FAIL(parse_mysql_timestamp_value(static_cast<EMySQLFieldType>(type), data,
2244
param, tz_info, checker))) {
2245
LOG_WARN("parse timestamp value from client failed", K(ret));
2249
case MYSQL_TYPE_TIME:{
2250
if (OB_FAIL(parse_mysql_time_value(data, param, checker))) {
2251
LOG_WARN("parse timestamp value from client failed", K(ret));
2255
case MYSQL_TYPE_OB_TIMESTAMP_WITH_TIME_ZONE:
2256
case MYSQL_TYPE_OB_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
2257
case MYSQL_TYPE_OB_TIMESTAMP_NANO: {
2258
ObTimeConvertCtx cvrt_ctx(tz_info, true);
2259
if (OB_FAIL(parse_oracle_timestamp_value(
2260
static_cast<EMySQLFieldType>(type), data, cvrt_ctx, param, checker))) {
2261
LOG_WARN("parse timestamp value from client failed", K(ret));
2265
case MYSQL_TYPE_OB_NVARCHAR2:
2266
case MYSQL_TYPE_OB_NCHAR:
2267
case MYSQL_TYPE_OB_RAW:
2268
case MYSQL_TYPE_TINY_BLOB:
2269
case MYSQL_TYPE_MEDIUM_BLOB:
2270
case MYSQL_TYPE_LONG_BLOB:
2271
case MYSQL_TYPE_BLOB:
2272
case MYSQL_TYPE_STRING:
2273
case MYSQL_TYPE_VARCHAR:
2274
case MYSQL_TYPE_VAR_STRING:
2275
case MYSQL_TYPE_OB_NUMBER_FLOAT:
2276
case MYSQL_TYPE_NEWDECIMAL:
2277
case MYSQL_TYPE_OB_UROWID:
2278
case MYSQL_TYPE_ORA_BLOB:
2279
case MYSQL_TYPE_ORA_CLOB:
2280
case MYSQL_TYPE_JSON:
2281
case MYSQL_TYPE_GEOMETRY: {
2284
uint64_t length = 0;
2285
ObCollationType cur_cs_type = ObCharset::get_default_collation(charset);
2286
ObCollationType cur_ncs_type = ObCollationType::CS_TYPE_INVALID;
2287
if (ncharset == ObCharsetType::CHARSET_INVALID || ncharset == ObCharsetType::CHARSET_BINARY) {
2288
cur_ncs_type = ObCharset::get_default_collation(charset);
2290
cur_ncs_type = ObCharset::get_default_collation(ncharset);
2292
if (OB_FAIL(ObMySQLUtil::get_length(data, length))) {
2293
LOG_ERROR("decode varchar param value failed", K(ret));
2295
PS_STATIC_DEFENSE_CHECK(checker, length)
2297
str.assign_ptr(data, static_cast<ObString::obstr_size_t>(length));
2300
} else if (length > OB_MAX_LONGTEXT_LENGTH) {
2301
ret = OB_ERR_INVALID_INPUT_ARGUMENT;
2302
LOG_WARN("input param len is over size", K(ret), K(length));
2303
} else if (MYSQL_TYPE_OB_NVARCHAR2 == type
2304
|| MYSQL_TYPE_OB_NCHAR == type) {
2305
OZ(copy_or_convert_str(allocator, cur_ncs_type, ncs_type, str, dst));
2307
MYSQL_TYPE_OB_NVARCHAR2 == type ? param.set_nvarchar2(dst)
2308
: param.set_nchar(dst);
2309
param.set_collation_type(ncs_type);
2311
LOG_DEBUG("recieve Nchar param", K(ret), K(str), K(dst));
2312
} else if (ObURowIDType == type) {
2313
// decode bae64 str and get urowid content
2314
ObURowIDData urowid_data;
2315
if (OB_FAIL(ObURowIDData::decode2urowid(str.ptr(), str.length(),
2316
allocator, urowid_data))) {
2317
LOG_WARN("failed to decode to urowid", K(ret));
2318
if (OB_INVALID_ROWID == ret) {
2319
LOG_USER_ERROR(OB_INVALID_ROWID);
2322
param.set_urowid(urowid_data);
2325
bool is_lob_v1 = false;
2326
if (MYSQL_TYPE_STRING == type
2327
|| MYSQL_TYPE_VARCHAR == type
2328
|| MYSQL_TYPE_VAR_STRING == type
2329
|| MYSQL_TYPE_ORA_CLOB == type
2330
|| MYSQL_TYPE_JSON == type
2331
|| MYSQL_TYPE_GEOMETRY == type) {
2332
int64_t extra_len = 0;
2333
if (MYSQL_TYPE_ORA_CLOB == type) {
2334
ObLobLocatorV2 lob(str);
2335
if (lob.is_lob_locator_v1()) {
2337
const ObLobLocator &lobv1 = *(reinterpret_cast<const ObLobLocator *>(str.ptr()));
2338
if (OB_UNLIKELY(! lobv1.is_valid() || lobv1.get_total_size() != length)) {
2339
ret = OB_ERR_UNEXPECTED;
2340
LOG_WARN("got invalid ps lob param", K(length), K(lobv1.magic_code_), K(lobv1.table_id_),
2341
K(lobv1.column_id_), K(lobv1.payload_offset_), K(lobv1.payload_size_),
2342
K(type), K(cs_type), K(lobv1.get_total_size()), K(lobv1.get_data_length()));
2344
extra_len = str.length() - reinterpret_cast<const ObLobLocator *>(str.ptr())->payload_size_;
2347
if (!lob.is_valid()) {
2348
ret = OB_ERR_UNEXPECTED;
2349
LOG_WARN("got invalid ps lob param", K(length), K(lob), K(type), K(cs_type));
2350
} // if INROW, does it need to do copy_or_convert_str?
2353
if (is_lob_v1 || MYSQL_TYPE_ORA_CLOB != type) {
2354
OZ(copy_or_convert_str(allocator,
2357
ObString(str.length() - extra_len, str.ptr() + extra_len),
2361
if (OB_SUCC(ret) && MYSQL_TYPE_ORA_CLOB == type) {
2364
dst.assign_ptr(dst.ptr() - extra_len, dst.length() + extra_len);
2365
MEMCPY(dst.ptr(), str.ptr(), extra_len);
2366
reinterpret_cast<ObLobLocator *>(dst.ptr())->payload_size_ = dst.length() - extra_len;
2368
if (OB_FAIL(ob_write_string(allocator, str, dst))) {
2369
LOG_WARN("Failed to write str", K(ret));
2373
} else if (OB_FAIL(ob_write_string(allocator, str, dst))) {
2374
LOG_WARN("Failed to write str", K(ret));
2377
if (MYSQL_TYPE_NEWDECIMAL == type) {
2378
number::ObNumber nb;
2379
if (OB_FAIL(nb.from(str.ptr(), length, allocator))) {
2380
LOG_WARN("decode varchar param to number failed", K(ret), K(str));
2382
param.set_number(nb);
2384
} else if (MYSQL_TYPE_OB_NUMBER_FLOAT == type) {
2385
number::ObNumber nb;
2386
if (OB_FAIL(nb.from(str.ptr(), length, allocator))) {
2387
LOG_WARN("decode varchar param to number failed", K(ret), K(str));
2389
param.set_number_float(nb);
2391
} else if (MYSQL_TYPE_OB_RAW == type) {
2393
} else if (MYSQL_TYPE_ORA_BLOB == type
2394
|| MYSQL_TYPE_ORA_CLOB == type) {
2395
if (MYSQL_TYPE_ORA_BLOB == type) {
2396
param.set_collation_type(CS_TYPE_BINARY);
2398
param.set_collation_type(cs_type);
2400
ObLobLocatorV2 lobv2(str);
2401
if (lobv2.is_lob_locator_v1()) {
2402
const ObLobLocator &lob = *(reinterpret_cast<const ObLobLocator *>(dst.ptr()));
2403
if (!IS_CLUSTER_VERSION_BEFORE_4_1_0_0 && lob.get_payload_length() == 0) {
2404
// do convert empty lob v1 to v2
2406
if (OB_FAIL(lob.get_payload(payload))) {
2407
LOG_WARN("fail to get payload", K(ret), K(lob));
2409
param.set_lob_value(ObLongTextType, payload.ptr(), payload.length());
2410
if (OB_FAIL(ObTextStringResult::ob_convert_obj_temporay_lob(param, allocator))) {
2411
LOG_WARN("Fail to convert plain lob data to templob",K(ret), K(payload));
2413
LOG_TRACE("convert empty lob v1 to v2", K(lob), K(cs_type), K(type));
2417
param.set_lob_locator(lob);
2418
param.set_has_lob_header();
2419
LOG_TRACE("get lob locator", K(lob), K(cs_type), K(type));
2422
param.set_lob_value(ObLongTextType, dst.ptr(), dst.length());
2423
param.set_has_lob_header();
2424
LOG_TRACE("get lob locator v2", K(lobv2), K(cs_type), K(type));
2426
} else if (MYSQL_TYPE_TINY_BLOB == type
2427
|| MYSQL_TYPE_MEDIUM_BLOB == type
2428
|| MYSQL_TYPE_BLOB == type
2429
|| MYSQL_TYPE_LONG_BLOB == type
2430
|| MYSQL_TYPE_JSON == type
2431
|| MYSQL_TYPE_GEOMETRY == type) {
2433
// Oracle mode: client driver will call hextoraw()
2434
// MySQL mode: no need to call hextoraw
2435
// in text protocol:
2436
// Oracle mode: server will call hextoraw()
2437
// MySQL mode: no need to call hextoraw
2438
// Notice: text tc without lob header here, should not set has_lob_header flag here
2439
param.set_collation_type(cs_type);
2440
if (MYSQL_TYPE_TINY_BLOB == type) {
2441
param.set_lob_value(ObTinyTextType, dst.ptr(), dst.length());
2442
} else if (MYSQL_TYPE_MEDIUM_BLOB == type) {
2443
param.set_lob_value(ObMediumTextType, dst.ptr(), dst.length());
2444
} else if (MYSQL_TYPE_BLOB == type) {
2445
param.set_lob_value(ObTextType, dst.ptr(), dst.length());
2446
} else if (MYSQL_TYPE_LONG_BLOB == type) {
2447
param.set_lob_value(ObLongTextType, dst.ptr(), dst.length());
2448
} else if (MYSQL_TYPE_JSON == type) {
2449
param.set_json_value(ObJsonType, dst.ptr(), dst.length());
2450
} else if (MYSQL_TYPE_GEOMETRY == type) {
2451
param.set_geometry_value(ObGeometryType, dst.ptr(), dst.length());
2453
if (OB_SUCC(ret) && param.is_lob_storage() && dst.length() > 0) {
2454
if (OB_FAIL(ObTextStringResult::ob_convert_obj_temporay_lob(param, allocator))) {
2455
LOG_WARN("Fail to convert plain lob data to templob",K(ret));
2459
param.set_collation_type(cs_type);
2460
if (is_oracle_mode() && !is_complex_element) {
2461
param.set_char(dst);
2463
if (is_complex_element && dst.length()== 0) {
2466
param.set_varchar(dst);
2476
case MYSQL_TYPE_OB_INTERVAL_YM: {
2477
if (OB_FAIL(parse_oracle_interval_ym_value(data, param, checker))) {
2478
LOG_WARN("failed to parse oracle interval year to month value", K(ret));
2482
case MYSQL_TYPE_OB_INTERVAL_DS:{
2483
if (OB_FAIL(parse_oracle_interval_ds_value(data, param, checker))) {
2484
LOG_WARN("failed to parse oracle interval year to month value", K(ret));
2489
LOG_USER_ERROR(OB_ERR_ILLEGAL_TYPE, type);
2490
ret = OB_ERR_ILLEGAL_TYPE;
2494
if (OB_SUCC(ret) && lib::is_mysql_mode()) {
2495
param.set_collation_level(CS_LEVEL_COERCIBLE);
2500
int ObMPStmtExecute::parse_param_value(ObIAllocator &allocator,
2501
const uint32_t type,
2502
const ObCharsetType charset,
2503
const ObCharsetType ncharset,
2504
const ObCollationType cs_type,
2505
const ObCollationType ncs_type,
2507
const common::ObTimeZoneInfo *tz_info,
2508
TypeInfo *type_info,
2513
int ret = OB_SUCCESS;
2514
uint64_t length = 0;
2516
common::ObFixedArray<ObSqlString, ObIAllocator>
2517
str_buf(THIS_WORKER.get_sql_arena_allocator());
2518
ObPieceCache *piece_cache =
2519
NULL == ctx_.session_info_ ? NULL : ctx_.session_info_->get_piece_cache();
2520
ObPiece *piece = NULL;
2521
if (OB_NOT_NULL(piece_cache) && OB_FAIL(piece_cache->get_piece(stmt_id_, param_id, piece))) {
2522
ret = OB_ERR_UNEXPECTED;
2523
LOG_WARN("get piece fail.", K(ret));
2524
} else if (OB_ISNULL(piece_cache) || OB_ISNULL(piece)) {
2525
// send piece data will init piece cache
2526
// if piece cache is null, it must not be send piece protocol
2527
bool is_null = ObSMUtils::update_from_bitmap(param, bitmap, param_id);
2529
LOG_DEBUG("param is null", K(param_id), K(param), K(type));
2530
} else if (OB_UNLIKELY(MYSQL_TYPE_COMPLEX == type)) {
2531
if (OB_FAIL(parse_complex_param_value(allocator, charset, cs_type, ncs_type,
2532
data, tz_info, type_info,
2534
LOG_WARN("failed to parse complex value", K(ret));
2536
} else if (OB_UNLIKELY(MYSQL_TYPE_CURSOR == type)) {
2537
CK (OB_NOT_NULL(ctx_.session_info_));
2539
ObPLCursorInfo *cursor = NULL;
2540
// OZ (ctx_.session_info_->make_cursor(cursor));
2541
OX (param.set_extend(reinterpret_cast<int64_t>(cursor), PL_CURSOR_TYPE));
2542
OX (param.set_param_meta());
2545
bool is_unsigned = NULL == type_info || !type_info->elem_type_.get_meta_type().is_unsigned_integer() ? false : true;
2546
if (OB_FAIL(parse_basic_param_value(allocator, type, charset, ncharset, cs_type, ncs_type,
2547
data, tz_info, param, false, &analysis_checker_, is_unsigned))) {
2548
LOG_WARN("failed to parse basic param value", K(ret));
2550
param.set_param_meta();
2551
param.set_length(param.get_val_len());
2554
} else if (!support_send_long_data(type)) {
2555
ret = OB_ERR_UNEXPECTED;
2556
LOG_WARN("this type is not support send long data.", K(type), K(ret));
2557
} else if (NULL == piece->get_allocator()) {
2558
ret = OB_ERR_UNEXPECTED;
2559
LOG_WARN("piece allocator is null.", K(stmt_id_), K(param_id), K(ret));
2560
} else if (OB_SUCCESS != piece->get_error_ret()) {
2561
ret = piece->get_error_ret();
2562
LOG_WARN("send long data has error. ", K(stmt_id_), K(param_id), K(ret));
2564
if (OB_UNLIKELY(MYSQL_TYPE_COMPLEX == type)) {
2565
// this must be array bounding.
2566
bool is_null = ObSMUtils::update_from_bitmap(param, bitmap, param_id);
2568
LOG_DEBUG("param is null", K(param_id), K(param), K(type));
2571
ObMySQLUtil::get_length(data, count);
2573
int64_t bitmap_bytes = ((count + 7) / 8);
2574
char is_null_map[bitmap_bytes];
2575
MEMSET(is_null_map, 0, bitmap_bytes);
2576
length = piece_cache->get_length_length(count) + bitmap_bytes;
2577
// 3. get string buffer (include lenght + value)
2578
if (OB_FAIL(str_buf.prepare_allocate(count))) {
2579
LOG_WARN("prepare fail.");
2580
} else if (OB_FAIL(piece_cache->get_buffer(stmt_id_,
2586
LOG_WARN("piece get buffer fail.", K(ret), K(stmt_id_), K(param_id));
2588
// 4. merge all this info
2589
char *tmp = static_cast<char*>(piece->get_allocator()->alloc(length));
2591
if (OB_ISNULL(tmp)) {
2592
ret = OB_ALLOCATE_MEMORY_FAILED;
2593
LOG_WARN("failed to alloc memory", K(ret));
2594
} else if (FALSE_IT(MEMSET(tmp, 0, length))) {
2595
} else if (OB_FAIL(ObMySQLUtil::store_length(tmp, length, count, pos))) {
2596
LOG_WARN("store length fail.", K(ret), K(stmt_id_), K(param_id));
2598
MEMCPY(tmp+pos, is_null_map, bitmap_bytes);
2599
pos += bitmap_bytes;
2600
for (int64_t i=0; OB_SUCC(ret) && i<count; i++) {
2601
if (OB_FAIL(ObMySQLUtil::store_obstr(tmp, length, str_buf.at(i).string(), pos))) {
2602
LOG_WARN("store string fail.", K(ret), K(stmt_id_), K(param_id),
2603
K(length), K(pos), K(i), K(str_buf.at(i).string()), K(str_buf.at(i).string().length()),
2604
K(str_buf.at(i).length()));
2611
const char* src = tmp;
2612
if (OB_FAIL(parse_complex_param_value(allocator, charset, cs_type, ncs_type,
2613
src, tz_info, type_info,
2615
LOG_WARN("failed to parse complex value", K(ret));
2618
piece->get_allocator()->free(tmp);
2622
if (OB_FAIL(str_buf.prepare_allocate(count))) {
2623
LOG_WARN("prepare fail.");
2624
} else if (OB_FAIL(piece_cache->get_buffer(stmt_id_,
2630
LOG_WARN("piece get buffer fail.", K(ret), K(stmt_id_), K(param_id));
2632
char *tmp = static_cast<char*>(piece->get_allocator()->alloc(length));
2634
if (OB_ISNULL(tmp)) {
2635
ret = OB_ALLOCATE_MEMORY_FAILED;
2636
LOG_WARN("failed to alloc memory", K(ret));
2637
} else if (FALSE_IT(MEMSET(tmp, 0, length))) {
2638
} else if (OB_FAIL(ObMySQLUtil::store_obstr(tmp, length, str_buf.at(0).string(), pos))) {
2639
LOG_WARN("store string fail.", K(ret), K(stmt_id_), K(param_id));
2641
const char* src = tmp;
2642
bool is_unsigned = NULL == type_info || !type_info->elem_type_.get_meta_type().is_unsigned_integer() ? false : true;
2643
if (OB_FAIL(parse_basic_param_value(allocator, type, charset, ncharset, cs_type, ncs_type,
2644
src, tz_info, param, false, NULL ,is_unsigned))) {
2645
LOG_WARN("failed to parse basic param value", K(ret));
2647
param.set_param_meta();
2648
param.set_length(param.get_val_len());
2651
piece->get_allocator()->free(tmp);
2660
int ObMPStmtExecute::copy_or_convert_str(common::ObIAllocator &allocator,
2661
const ObCollationType src_type,
2662
const ObCollationType dst_type,
2663
const ObString &src,
2665
int64_t extra_buf_len /* = 0 */)
2667
int ret = OB_SUCCESS;
2668
if (!ObCharset::is_valid_collation(src_type) || !ObCharset::is_valid_collation(dst_type)) {
2669
ret = OB_ERR_UNEXPECTED;
2670
LOG_WARN("invalid collation", K(ret), K(dst_type));
2671
} else if (0 == src.length()
2672
|| ObCharset::charset_type_by_coll(src_type)
2673
== ObCharset::charset_type_by_coll(dst_type)) {
2674
int64_t len = src.length() + extra_buf_len;
2676
char *buf = static_cast<char *>(allocator.alloc(len));
2678
ret = OB_ALLOCATE_MEMORY_FAILED;
2679
LOG_WARN("allocate failed", K(ret), K(len));
2681
if (src.length() > 0) {
2682
MEMCPY(buf + extra_buf_len, src.ptr(), src.length());
2684
out.assign_ptr(buf + extra_buf_len, src.length());
2690
int64_t maxmb_len = 0;
2691
OZ(ObCharset::get_mbmaxlen_by_coll(dst_type, maxmb_len));
2692
const int64_t len = maxmb_len * src.length() + 1 + extra_buf_len;
2693
uint32_t res_len = 0;
2695
char *buf = static_cast<char *>(allocator.alloc(len));
2697
ret = OB_ALLOCATE_MEMORY_FAILED;
2698
LOG_WARN("allocate failed", K(ret), K(len));
2700
ObDataBuffer buf_alloc(buf + extra_buf_len, len - extra_buf_len);
2701
if (OB_FAIL(ObCharset::charset_convert(buf_alloc,
2706
ObCharset::REPLACE_UNKNOWN_CHARACTER))) {
2707
LOG_WARN("fail to charset convert", K(ret), K(src_type), K(dst_type),
2708
K(src), K(len), K(extra_buf_len));
2716
int ObMPStmtExecute::parse_integer_value(const uint32_t type,
2719
ObIAllocator &allocator,
2720
bool is_complex_element,
2721
ObPSAnalysisChecker *checker,
2722
bool is_unsigned) // oracle unsigned need
2724
int ret = OB_SUCCESS;
2725
bool cast_to_number = !(lib::is_mysql_mode() || is_complex_element || MYSQL_TYPE_TINY == type);
2726
int64_t res_val = 0;
2728
case MYSQL_TYPE_TINY: {
2729
PS_STATIC_DEFENSE_CHECK(checker, 1)
2732
ObMySQLUtil::get_int1(data, value);
2733
is_unsigned ? param.set_utinyint(value) : param.set_tinyint(value);
2737
case MYSQL_TYPE_SHORT: {
2738
PS_STATIC_DEFENSE_CHECK(checker, 2)
2741
ObMySQLUtil::get_int2(data, value);
2742
if (!cast_to_number) {
2743
is_unsigned ? param.set_usmallint(value) : param.set_smallint(value);
2745
res_val = static_cast<int64_t>(value);
2747
if (((1LL << 16) + res_val) < 1 || res_val > 0xFFFF) {
2748
ret = OB_DECIMAL_OVERFLOW_WARN;
2749
LOG_WARN("param is over flower.", K(res_val), K(type), K(ret));
2751
res_val = res_val < 0 ? ((1LL << 16) + res_val) : res_val;
2758
case MYSQL_TYPE_LONG: {
2759
PS_STATIC_DEFENSE_CHECK(checker, 4)
2762
ObMySQLUtil::get_int4(data, value);
2763
if (!cast_to_number) {
2764
is_unsigned ? param.set_uint32(value) : param.set_int32(value);
2766
res_val = static_cast<int64_t>(value);
2768
if (((1LL << 32) + res_val) < 1 || res_val > 0xFFFFFFFF) {
2769
ret = OB_DECIMAL_OVERFLOW_WARN;
2770
LOG_WARN("param is over flower.", K(res_val), K(type), K(ret));
2772
res_val = res_val < 0 ? ((1LL << 32) + res_val) : res_val;
2779
case MYSQL_TYPE_LONGLONG: {
2780
PS_STATIC_DEFENSE_CHECK(checker, 8)
2783
ObMySQLUtil::get_int8(data, value);
2784
if (!cast_to_number) {
2785
is_unsigned ? param.set_uint(ObUInt64Type, value) : param.set_int(value);
2793
ret = OB_ERR_UNEXPECTED;
2794
LOG_ERROR("unexpected integer type", K(type), K(ret));
2798
if (OB_SUCC(ret) && cast_to_number) {
2799
number::ObNumber nb;
2800
if (is_unsigned && OB_FAIL(nb.from(static_cast<uint64_t>(res_val), allocator))) {
2801
LOG_WARN("decode param to number failed", K(ret), K(res_val));
2802
} else if (!is_unsigned && OB_FAIL(nb.from(static_cast<int64_t>(res_val), allocator))) {
2803
LOG_WARN("decode param to number failed", K(ret), K(res_val));
2805
param.set_number(nb);
2811
int ObMPStmtExecute::parse_mysql_timestamp_value(const EMySQLFieldType field_type,
2814
const common::ObTimeZoneInfo *tz_info,
2815
ObPSAnalysisChecker *checker)
2817
int ret = OB_SUCCESS;
2825
int32_t microsecond = 0;
2826
ObPreciseDateTime value;
2827
PS_STATIC_DEFENSE_CHECK(checker, 1)
2829
ObMySQLUtil::get_int1(data, length);
2832
} else if (4 == length) {
2833
PS_STATIC_DEFENSE_CHECK(checker, 4)
2835
ObMySQLUtil::get_int2(data, year);
2836
ObMySQLUtil::get_int1(data, month);
2837
ObMySQLUtil::get_int1(data, day);
2839
} else if (7 == length) {
2840
PS_STATIC_DEFENSE_CHECK(checker, 7)
2842
ObMySQLUtil::get_int2(data, year);
2843
ObMySQLUtil::get_int1(data, month);
2844
ObMySQLUtil::get_int1(data, day);
2845
ObMySQLUtil::get_int1(data, hour);
2846
ObMySQLUtil::get_int1(data, min);
2847
ObMySQLUtil::get_int1(data, second);
2849
} else if (11 == length) {
2850
PS_STATIC_DEFENSE_CHECK(checker, 11)
2852
ObMySQLUtil::get_int2(data, year);
2853
ObMySQLUtil::get_int1(data, month);
2854
ObMySQLUtil::get_int1(data, day);
2855
ObMySQLUtil::get_int1(data, hour);
2856
ObMySQLUtil::get_int1(data, min);
2857
ObMySQLUtil::get_int1(data, second);
2858
ObMySQLUtil::get_int4(data, microsecond);
2862
LOG_WARN("invalid mysql timestamp value length", K(length));
2869
if (lib::is_oracle_mode()) {
2870
//oracle mode datetime should not has microsecond
2873
ob_time.parts_[DT_YEAR] = year;
2874
ob_time.parts_[DT_MON] = month;
2875
ob_time.parts_[DT_MDAY] = day;
2876
ob_time.parts_[DT_HOUR] = hour;
2877
ob_time.parts_[DT_MIN] = min;
2878
ob_time.parts_[DT_SEC] = second;
2879
ob_time.parts_[DT_USEC] = microsecond;
2880
if (!ObTimeUtility2::is_valid_date(year, month, day)
2881
|| !ObTimeUtility2::is_valid_time(hour, min, second, microsecond)) {
2882
ret = OB_INVALID_DATE_FORMAT;
2883
LOG_WARN("invalid date format", K(ret));
2885
ObTimeConvertCtx cvrt_ctx(NULL, false);
2886
ob_time.parts_[DT_DATE] = ObTimeConverter::ob_time_to_date(ob_time);
2887
if (field_type == MYSQL_TYPE_DATE) {
2888
value = ob_time.parts_[DT_DATE];
2889
} else if (OB_FAIL(ObTimeConverter::ob_time_to_datetime(ob_time, cvrt_ctx, value))){
2890
LOG_WARN("convert obtime to datetime failed", K(value), K(year), K(month),
2891
K(day), K(hour), K(min), K(second));
2897
if (field_type == MYSQL_TYPE_TIMESTAMP) {
2898
int64_t ts_value = 0;
2899
if (OB_FAIL(ObTimeConverter::datetime_to_timestamp(value, tz_info, ts_value))) {
2900
LOG_WARN("datetime to timestamp failed", K(ret));
2902
param.set_timestamp(ts_value);
2904
} else if (field_type == MYSQL_TYPE_DATETIME) {
2905
param.set_datetime(value);
2906
} else if (field_type == MYSQL_TYPE_DATE) {
2907
param.set_date(static_cast<int32_t>(value));
2910
LOG_DEBUG("get datetime", K(length), K(year), K(month), K(day), K(hour), K(min),K(second), K(microsecond), K(value));
2914
int ObMPStmtExecute::parse_oracle_timestamp_value(const obmysql::EMySQLFieldType field_type,
2915
const char *&data, const ObTimeConvertCtx &cvrt_ctx, ObObj ¶m, ObPSAnalysisChecker *checker)
2917
int ret = OB_SUCCESS;
2918
int8_t total_len = 0;
2920
ObOTimestampData ot_data;
2922
PS_STATIC_DEFENSE_CHECK(checker, 1)
2924
ObMySQLUtil::get_int1(data, total_len);
2927
} else if (OB_FAIL(ObSMUtils::get_ob_type(obj_type, field_type))) {
2928
LOG_WARN("failed to get_ob_type", K(ret));
2929
} else if (OB_FAIL(ObTimeConverter::decode_otimestamp(obj_type, data, total_len, cvrt_ctx, ot_data, scale))) {
2930
LOG_WARN("failed to decode_otimestamp", K(ret));
2932
PS_STATIC_DEFENSE_CHECK(checker, total_len)
2935
param.set_otimestamp_value(obj_type, ot_data);
2936
param.set_scale(scale);
2942
int ObMPStmtExecute::parse_mysql_time_value(const char *&data, ObObj ¶m, ObPSAnalysisChecker *checker)
2944
int ret = OB_SUCCESS;
2946
int8_t is_negative = 0;
2953
int32_t microsecond = 0;
2955
MEMSET(&tmval, 0, sizeof(tmval));
2957
PS_STATIC_DEFENSE_CHECK(checker, 1)
2959
ObMySQLUtil::get_int1(data, length);
2962
} else if (8 == length) {
2963
PS_STATIC_DEFENSE_CHECK(checker, 8)
2965
ObMySQLUtil::get_int1(data, is_negative);
2966
ObMySQLUtil::get_int4(data, day);
2967
ObMySQLUtil::get_int1(data, hour);
2968
ObMySQLUtil::get_int1(data, min);
2969
ObMySQLUtil::get_int1(data, second);
2971
} else if (12 == length) {
2972
PS_STATIC_DEFENSE_CHECK(checker, 12)
2974
ObMySQLUtil::get_int1(data, is_negative);
2975
ObMySQLUtil::get_int4(data, day);
2976
ObMySQLUtil::get_int1(data, hour);
2977
ObMySQLUtil::get_int1(data, min);
2978
ObMySQLUtil::get_int1(data, second);
2979
ObMySQLUtil::get_int4(data, microsecond);
2982
ret = OB_ERR_UNEXPECTED;
2983
LOG_ERROR("unexpected time length", K(length), K(ret));
2989
ob_time.parts_[DT_YEAR] = year;
2990
ob_time.parts_[DT_MON] = month;
2991
ob_time.parts_[DT_MDAY] = day;
2992
ob_time.parts_[DT_HOUR] = hour;
2993
ob_time.parts_[DT_MIN] = min;
2994
ob_time.parts_[DT_SEC] = second;
2995
ob_time.parts_[DT_USEC] = microsecond;
2996
if (!ObTimeUtility2::is_valid_time(hour, min, second, microsecond)) {
2997
ret = OB_INVALID_DATE_FORMAT;
2998
LOG_WARN("invalid date format", K(ret));
3000
ob_time.parts_[DT_DATE] = ObTimeConverter::ob_time_to_date(ob_time);
3001
ob_time.parts_[DT_HOUR] += ob_time.parts_[DT_MDAY] * 24;
3002
ob_time.parts_[DT_MDAY] = 0;
3003
value = ObTimeConverter::ob_time_to_time(ob_time);
3012
param.set_time(value);
3014
LOG_INFO("get time", K(length), K(year), K(month), K(day), K(hour), K(min),K(second), K(microsecond), K(value));
3018
int ObMPStmtExecute::parse_oracle_interval_ds_value(const char *&data, ObObj ¶m, ObPSAnalysisChecker *checker)
3020
int ret = OB_SUCCESS;
3023
ObIntervalDSValue value;
3025
ObMySQLUtil::get_int1(data, length);
3026
PS_STATIC_DEFENSE_CHECK(checker, length)
3028
if (OB_FAIL(ObTimeConverter::decode_interval_ds(data, length, value, scale))) {
3029
LOG_WARN("fail to decode interval day to second", K(ret), K(length));
3031
param.set_interval_ds(value);
3032
param.set_scale(scale);
3039
int ObMPStmtExecute::parse_oracle_interval_ym_value(const char *&data, ObObj ¶m, ObPSAnalysisChecker *checker)
3041
int ret = OB_SUCCESS;
3044
ObIntervalYMValue value;
3046
ObMySQLUtil::get_int1(data, length);
3047
PS_STATIC_DEFENSE_CHECK(checker, length)
3049
if (OB_FAIL(ObTimeConverter::decode_interval_ym(data, length, value, scale))) {
3050
LOG_WARN("fail to decode interval year to month", K(ret), K(length));
3052
param.set_interval_ym(value);
3053
param.set_scale(scale);
3060
void ObMPStmtExecute::record_stat(const stmt::StmtType type, const int64_t end_time) const
3062
#define ADD_STMT_STAT(type) \
3063
case stmt::T_##type: \
3064
EVENT_INC(SQL_##type##_COUNT); \
3065
EVENT_ADD(SQL_##type##_TIME, time_cost); \
3067
if (lib::is_diagnose_info_enabled()) {
3068
const int64_t time_cost = end_time - get_receive_timestamp();
3069
if (!THIS_THWORKER.need_retry()) {
3070
EVENT_INC(SQL_PS_EXECUTE_COUNT);
3072
ADD_STMT_STAT(SELECT);
3073
ADD_STMT_STAT(INSERT);
3074
ADD_STMT_STAT(REPLACE);
3075
ADD_STMT_STAT(UPDATE);
3076
ADD_STMT_STAT(DELETE);
3078
EVENT_INC(SQL_OTHER_COUNT);
3079
EVENT_ADD(SQL_OTHER_TIME, time_cost);
3087
int ObMPStmtExecute::response_query_header(ObSQLSessionInfo &session, pl::ObDbmsCursorInfo &cursor)
3089
int ret = OB_SUCCESS;
3090
ObSyncPlanDriver drv(gctx_,
3097
if (0 == cursor.get_field_columns().count()) {
3098
// SELECT * INTO OUTFILE return null field, and only response ok packet
3099
ObOKPParam ok_param;
3100
ok_param.affected_rows_ = 0;
3101
ok_param.is_partition_hit_ = session.partition_hit().get_bool();
3102
ok_param.has_more_result_ = false;
3103
if (OB_FAIL(send_ok_packet(session, ok_param))) {
3104
LOG_WARN("fail to send ok packt", K(ok_param), K(ret));
3107
if (OB_FAIL(drv.response_query_header(cursor.get_field_columns(),
3111
LOG_WARN("fail to get autocommit", K(ret));
3117
} //end of namespace observer
3118
} //end of namespace oceanbase