2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
15
#include "observer/mysql/obmp_query.h"
17
#include "lib/utility/ob_macro_utils.h"
18
#include "lib/utility/ob_tracepoint.h"
19
#include "lib/worker.h"
20
#include "lib/stat/ob_session_stat.h"
21
#include "lib/profile/ob_perf_event.h"
22
#include "share/ob_debug_sync.h"
23
#include "share/config/ob_server_config.h"
24
#include "share/schema/ob_multi_version_schema_service.h"
25
#include "share/schema/ob_schema_getter_guard.h"
26
#include "share/client_feedback/ob_feedback_partition_struct.h"
27
#include "share/ob_resource_limit.h"
28
#include "rpc/ob_request.h"
29
#include "rpc/obmysql/ob_mysql_packet.h"
30
#include "rpc/obmysql/ob_mysql_request_utils.h"
31
#include "rpc/obmysql/packet/ompk_ok.h"
32
#include "rpc/obmysql/packet/ompk_error.h"
33
#include "rpc/obmysql/packet/ompk_resheader.h"
34
#include "rpc/obmysql/packet/ompk_field.h"
35
#include "rpc/obmysql/packet/ompk_eof.h"
36
#include "rpc/obmysql/packet/ompk_row.h"
37
#include "sql/ob_sql_context.h"
38
#include "sql/ob_sql.h"
39
#include "sql/ob_sql_trans_util.h"
40
#include "sql/session/ob_sql_session_mgr.h"
41
#include "sql/resolver/cmd/ob_variable_set_stmt.h"
42
#include "sql/engine/px/ob_px_admission.h"
43
#include "observer/mysql/ob_mysql_result_set.h"
44
#include "rpc/obmysql/obsm_struct.h"
45
#include "observer/mysql/ob_sync_plan_driver.h"
46
#include "observer/mysql/ob_sync_cmd_driver.h"
47
#include "observer/mysql/ob_async_cmd_driver.h"
48
#include "observer/mysql/ob_async_plan_driver.h"
49
#include "observer/ob_req_time_service.h"
50
#include "observer/omt/ob_tenant.h"
51
#include "observer/ob_server.h"
52
#include "observer/virtual_table/ob_virtual_table_iterator_factory.h"
53
#include "sql/monitor/ob_phy_plan_monitor_info.h"
54
#include "sql/monitor/ob_security_audit.h"
55
#include "lib/rc/context.h"
56
#include "sql/monitor/ob_security_audit_utils.h"
57
#include "observer/mysql/obmp_utils.h"
58
#include "lib/ash/ob_active_session_guard.h"
59
#include "lib/trace/ob_trace.h"
61
using namespace oceanbase::rpc;
62
using namespace oceanbase::obmysql;
63
using namespace oceanbase::common;
64
using namespace oceanbase::observer;
65
using namespace oceanbase::share;
66
using namespace oceanbase::share::schema;
67
using namespace oceanbase::trace;
68
using namespace oceanbase::sql;
69
ObMPQuery::ObMPQuery(const ObGlobalContext &gctx)
71
single_process_timestamp_(0),
72
exec_start_timestamp_(0),
73
exec_end_timestamp_(0),
74
is_com_filed_list_(false),
78
ctx_.exec_type_ = MpQuery;
82
ObMPQuery::~ObMPQuery()
87
int ObMPQuery::process()
90
int tmp_ret = OB_SUCCESS;
91
ObSQLSessionInfo *sess = NULL;
93
bool need_response_error = true;
94
bool need_disconnect = true;
95
bool async_resp_used = false; // 由事务提交线程异步回复客户端
96
int64_t query_timeout = 0;
97
ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id();
98
ObSMConnection *conn = get_conn();
99
static int64_t concurrent_count = 0;
100
bool need_dec = false;
101
bool do_ins_batch_opt = false;
103
if (ATOMIC_FAA(&concurrent_count, 1) > RL_CONF.get_max_concurrent_query_count()) {
104
ret = OB_RESOURCE_OUT;
105
LOG_WARN("reach max concurrent limit", K(ret), K(concurrent_count),
106
K(RL_CONF.get_max_concurrent_query_count()));
110
DEFER(if (need_dec) (void)ATOMIC_FAA(&concurrent_count, -1));
113
} else if (OB_ISNULL(req_) || OB_ISNULL(conn) || OB_ISNULL(cur_trace_id)) {
114
ret = OB_ERR_UNEXPECTED;
115
LOG_WARN("null conn ptr", K_(sql), K_(req), K(conn), K(cur_trace_id), K(ret));
116
} else if (OB_UNLIKELY(!conn->is_in_authed_phase())) {
117
ret = OB_ERR_NO_PRIVILEGE;
118
LOG_WARN("receive sql without session", K_(sql), K(ret));
119
} else if (OB_ISNULL(conn->tenant_)) {
120
ret = OB_ERR_UNEXPECTED;
121
LOG_ERROR("invalid tenant", K_(sql), K(conn->tenant_), K(ret));
122
} else if (OB_FAIL(get_session(sess))) {
123
LOG_WARN("get session fail", K_(sql), K(ret));
124
} else if (OB_ISNULL(sess)) {
125
ret = OB_ERR_UNEXPECTED;
126
LOG_WARN("session is NULL or invalid", K_(sql), K(sess), K(ret));
128
lib::CompatModeGuard g(sess->get_compatibility_mode() == ORACLE_MODE ?
129
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL);
130
THIS_WORKER.set_session(sess);
131
ObSQLSessionInfo &session = *sess;
132
ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
133
session.set_current_trace_id(ObCurTraceId::get_trace_id());
134
session.init_use_rich_format();
136
const bool check_throttle = !is_root_user(sess->get_user_id());
138
if (check_throttle &&
140
sess->get_raw_audit_record().try_cnt_ == 0 &&
141
lib::Worker::WS_OUT_OF_THROTTLE == THIS_THWORKER.check_rate_limiter()) {
142
ret = OB_KILLED_BY_THROTTLING;
143
LOG_WARN("query is throttled", K(ret), K(sess->get_user_id()));
144
need_disconnect = false;
145
} else if (OB_SUCC(sess->get_sql_throttle_current_priority(val))) {
146
THIS_WORKER.set_sql_throttle_current_priority(check_throttle ? val : -1);
147
if (lib::Worker::WS_OUT_OF_THROTTLE == THIS_THWORKER.check_qtime_throttle()) {
148
ret = OB_KILLED_BY_THROTTLING;
149
LOG_WARN("query is throttled", K(ret));
150
need_disconnect = false;
153
LOG_WARN("get system variable sql_throttle_current_priority fail", K(ret));
154
// reset ret for compatibility.
158
sessid = conn->sessid_;
159
int64_t tenant_version = 0;
160
int64_t sys_version = 0;
161
session.set_thread_id(GETTID());
162
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
163
int64_t packet_len = pkt.get_clen();
164
req_->set_trace_point(ObRequest::OB_EASY_REQUEST_MPQUERY_PROCESS);
165
if (OB_UNLIKELY(!session.is_valid())) {
166
ret = OB_ERR_UNEXPECTED;
167
LOG_ERROR("invalid session", K_(sql), K(ret));
168
} else if (OB_FAIL(process_kill_client_session(session))) {
169
LOG_WARN("client session has been killed", K(ret));
170
} else if (OB_UNLIKELY(session.is_zombie())) {
171
//session has been killed some moment ago
172
ret = OB_ERR_SESSION_INTERRUPTED;
173
LOG_WARN("session has been killed", K(session.get_session_state()), K_(sql),
174
K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(), K(ret));
175
} else if (OB_FAIL(session.check_and_init_retry_info(*cur_trace_id, sql_))) {
176
// 注意,retry info和last query trace id的逻辑要写在query lock内,否则会有并发问题
177
LOG_WARN("fail to check and init retry info", K(ret), K(*cur_trace_id), K_(sql));
178
} else if (OB_FAIL(session.get_query_timeout(query_timeout))) {
179
LOG_WARN("fail to get query timeout", K_(sql), K(ret));
180
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
181
session.get_effective_tenant_id(), tenant_version))) {
182
LOG_WARN("fail get tenant broadcast version", K(ret));
183
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
184
OB_SYS_TENANT_ID, sys_version))) {
185
LOG_WARN("fail get tenant broadcast version", K(ret));
186
} else if (pkt.exist_trace_info()
187
&& OB_FAIL(session.update_sys_variable(SYS_VAR_OB_TRACE_INFO,
188
pkt.get_trace_info()))) {
189
LOG_WARN("fail to update trace info", K(ret));
190
} else if (FALSE_IT(session.set_txn_free_route(pkt.txn_free_route()))) {
191
} else if (OB_FAIL(process_extra_info(session, pkt, need_response_error))) {
192
LOG_WARN("fail get process extra info", K(ret));
193
} else if (FALSE_IT(session.post_sync_session_info())) {
194
} else if (OB_UNLIKELY(packet_len > session.get_max_packet_size())) {
195
//packet size check with session variable max_allowd_packet or net_buffer_length
196
need_disconnect = false;
197
ret = OB_ERR_NET_PACKET_TOO_LARGE;
198
LOG_WARN("packet too large than allowed for the session", K_(sql), K(ret));
199
} else if (OB_FAIL(sql::ObFLTUtils::init_flt_info(pkt.get_extra_info(), session,
200
conn->proxy_cap_flags_.is_full_link_trace_support()))) {
201
LOG_WARN("failed to update flt extra info", K(ret));
202
} else if (OB_FAIL(session.gen_configs_in_pc_str())) {
203
LOG_WARN("fail to generate configuration strings that can influence execution plan", K(ret));
205
FLTSpanGuard(com_query_process);
206
FLT_SET_TAG(log_trace_id, ObCurTraceId::get_trace_id_str(),
207
receive_ts, get_receive_timestamp(),
208
client_info, session.get_client_info(),
209
module_name, session.get_module_name(),
210
action_name, session.get_action_name(),
211
sess_id, session.get_sessid());
213
THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
214
retry_ctrl_.set_tenant_global_schema_version(tenant_version);
215
retry_ctrl_.set_sys_global_schema_version(sys_version);
216
session.partition_hit().reset();
217
session.set_pl_can_retry(true);
218
ObLockWaitNode &lock_wait_node = req_->get_lock_wait_node();
219
lock_wait_node.set_session_info(session.get_sessid());
221
bool has_more = false;
222
bool force_sync_resp = false;
223
need_response_error = false;
224
ObParser parser(THIS_WORKER.get_sql_arena_allocator(),
225
session.get_sql_mode(), session.get_charsets4parser());
226
//为了性能优化考虑,减少数组长度,降低无用元素的构造和析构开销
227
ObSEArray<ObString, 1> queries;
228
ObSEArray<ObString, 1> ins_queries;
229
ObMPParseStat parse_stat;
230
if (GCONF.enable_record_trace_id) {
231
PreParseResult pre_parse_result;
232
if (OB_FAIL(ObParser::pre_parse(sql_, pre_parse_result))) {
233
LOG_WARN("fail to pre parse", K(ret));
235
session.set_app_trace_id(pre_parse_result.trace_id_);
236
LOG_DEBUG("app trace id", "app_trace_id", pre_parse_result.trace_id_,
237
"sessid", session.get_sessid(), K_(sql));
243
} else if (OB_FAIL(parser.split_multiple_stmt(sql_, queries, parse_stat))) {
244
// 进入本分支,说明push_back出错,OOM,委托外层代码返回错误码
246
need_response_error = true;
247
} else if (OB_UNLIKELY(queries.count() <= 0)) {
248
ret = OB_ERR_UNEXPECTED;
249
need_response_error = true;//进入此分支之后,要断连接,极其严重错误
250
LOG_ERROR("emtpy query count. client would have suspended. never be here!", K_(sql), K(ret));
251
} else if (OB_UNLIKELY(1 == session.get_capability().cap_flags_.OB_CLIENT_MULTI_STATEMENTS)) {
252
// 处理Multiple Statement
253
/* MySQL处理Multi-Stmt出错时候的行为:
254
* 遇到首次运行失败(包括解析或执行)的SQL后,停止读取后继数据
256
* (1) select 1; selct 2; select 3;
257
* select 1执行成功,selct 2报语法错误,select 3不被执行
258
* (2) select 1; drop table not_exists_table; select 3;
259
* select 1执行成功,drop table not_exists_table报表不存在错误,select 3不被执行
262
* split_multiple_stmt是根据分号来分割语句,但有可能遇到“语法错误”,
263
* 这里说的“语法错误”不是说select写成了selct,而是“token”级别的语法错误,例如语句
264
* select 1;`select 2; select 3;
265
* 上面`和'都没有形成闭合的字符串token,token parser会报告语法错误
266
* 上面的例子中,得到的queries.count() 等于 2,分别为select 1和 `select 2; select 3;
268
bool optimization_done = false;
269
const char *p_normal_start = nullptr;
270
if (queries.count() > 1 && session.is_txn_free_route_temp()) {
271
need_disconnect = false;
272
need_response_error = true;
273
ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED;
274
LOG_WARN("multi stmt is not supported to be executed on txn temporary node", KR(ret),
275
"tx_free_route_ctx", session.get_txn_free_route_ctx(),
276
"trans_id", session.get_tx_id(), K(session));
277
} else if (queries.count() > 1
278
&& OB_FAIL(try_batched_multi_stmt_optimization(session,
285
LOG_WARN("failed to try multi-stmt-optimization", K(ret));
286
} else if (!optimization_done
287
&& session.is_enable_batched_multi_statement()
288
&& ObSQLUtils::is_enable_explain_batched_multi_statement()
289
&& ObParser::is_explain_stmt(queries.at(0), p_normal_start)) {
290
ret = OB_SUCC(ret) ? OB_NOT_SUPPORTED : ret;
291
need_disconnect = false;
292
need_response_error = true;
293
LOG_WARN("explain batch statement failed", K(ret));
294
} else if (!optimization_done) {
295
ARRAY_FOREACH(queries, i) {
296
// in multistmt sql, audit_record will record multistmt_start_ts_ when count over 1
297
// queries.count()>1 -> batch,(m)sql1,(m)sql2,... | queries.count()=1 -> sql1
299
session.get_raw_audit_record().exec_timestamp_.multistmt_start_ts_
300
= ObTimeUtility::current_time();
301
// before handle multi-stmt's followers, re-calc the txn_free_route's baseline
302
// in order to capture accurate state changed by current stmt
303
session.prep_txn_free_route_baseline();
305
need_disconnect = true;
306
//FIXME qianfu NG_TRACE_EXT(set_disconnect, OB_ID(disconnect), true, OB_ID(pos), "multi stmt begin");
307
if (OB_UNLIKELY(parse_stat.parse_fail_
308
&& (i == parse_stat.fail_query_idx_)
309
&& ObSQLUtils::check_need_disconnect_parser_err(parse_stat.fail_ret_))) {
310
// 进入本分支,说明在multi_query中的某条query parse失败,如果不是语法错,则进入该分支
311
// 如果当前query_count 为1, 则不断连接;如果大于1,
312
// 则需要在发错误包之后断连接,防止客户端一直在等接下来的回包
314
ret = parse_stat.fail_ret_;
315
need_response_error = true;
318
has_more = (queries.count() > i + 1);
319
// 本来可以做成不管queries.count()是多少,最后一个query都可以异步回包的,
320
// 但是目前的代码实现难以在不同的线程处理同一个请求的回包,
321
// 因此这里只允许只有一个query的multi query请求异步回包。
322
force_sync_resp = queries.count() <= 1? false : true;
323
// is_part_of_multi 表示当前sql是 multi stmt 中的一条,
324
// 原来的值默认为true,会影响单条sql的二次路由,现在改为用 queries.count() 判断。
325
bool is_part_of_multi = queries.count() > 1 ? true : false;
326
ret = process_single_stmt(ObMultiStmtItem(is_part_of_multi, i, queries.at(i)),
335
// 以multiple query协议发过来的语句总数
336
EVENT_INC(SQL_MULTI_QUERY_COUNT);
337
// 以multiple query协议发过来,但实际只包含一条SQL的语句的个数
338
if (queries.count() <= 1) {
339
EVENT_INC(SQL_MULTI_ONE_QUERY_COUNT);
341
} else { // OB_CLIENT_MULTI_STATEMENTS not enabled
342
if (OB_UNLIKELY(queries.count() != 1)) {
343
ret = OB_ERR_PARSER_SYNTAX;
344
need_disconnect = false;
345
need_response_error = true;
346
LOG_WARN("unexpected error. multi stmts sql while OB_CLIENT_MULTI_STATEMENTS not enabled.", K(ret), K(sql_));
348
EVENT_INC(SQL_SINGLE_QUERY_COUNT);
349
// 处理普通的Single Statement
350
ret = process_single_stmt(ObMultiStmtItem(false, 0, sql_),
359
FLT_SET_TAG(err_code, ret);
363
// THIS_WORKER.need_retry()是指是否扔回队列重试,包括大查询被扔回队列的情况。
364
session.check_and_reset_retry_info(*cur_trace_id, THIS_WORKER.need_retry());
365
session.set_last_trace_id(ObCurTraceId::get_trace_id());
366
IGNORE_RETURN record_flt_trace(session);
369
if (OB_UNLIKELY(NULL != GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && is_conn_valid()) {
370
int tmp_ret = OB_SUCCESS;
371
// Call setup_user_resource_group no matter OB_SUCC or OB_FAIL
372
// because we have to reset conn.group_id_ according to user_name.
373
// Otherwise, suppose we execute a query with a mapping rule on the column in the query at first,
374
// we switch to the defined consumer group, batch_group for example,
375
// and after that, the next query will also be executed with batch_group.
376
if (OB_UNLIKELY(OB_SUCCESS !=
377
(tmp_ret = setup_user_resource_group(*conn, sess->get_effective_tenant_id(), sess)))) {
378
LOG_WARN("fail setup user resource group", K(tmp_ret), K(ret));
379
ret = OB_SUCC(ret) ? tmp_ret : ret;
383
if (OB_FAIL(ret) && need_response_error && is_conn_valid()) {
384
send_error_packet(ret, NULL);
386
if (OB_FAIL(ret) && OB_UNLIKELY(need_disconnect) && is_conn_valid()) {
388
LOG_WARN("disconnect connection", KR(ret));
391
// 如果已经异步回包,则这部分逻辑在cb中执行,这里跳过flush_buffer()
392
if (!THIS_WORKER.need_retry()) {
393
if (async_resp_used) {
394
async_resp_used_ = true;
395
packet_sender_.disable_response();
396
} else if (OB_UNLIKELY(!is_conn_valid())) {
397
tmp_ret = OB_CONNECT_ERROR;
398
LOG_WARN("connection in error, maybe has disconnected", K(tmp_ret));
399
} else if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = flush_buffer(true)))) {
400
LOG_WARN("failed to flush_buffer", K(tmp_ret));
407
// 必须总是将 THIS_WORKER 里的指针设置为 null
408
THIS_WORKER.set_session(NULL); // clear session
411
revert_session(sess); //current ignore revert session ret
414
return (OB_SUCCESS != ret) ? ret : tmp_ret;
418
* Try to evaluate multiple update queries as a single query to optimize rpc cost
419
* for details, please ref to
421
int ObMPQuery::try_batched_multi_stmt_optimization(sql::ObSQLSessionInfo &session,
422
common::ObIArray<ObString> &queries,
423
const ObMPParseStat &parse_stat,
424
bool &optimization_done,
425
bool &async_resp_used,
426
bool &need_disconnect,
427
bool is_ins_multi_val_opt)
429
int ret = OB_SUCCESS;
430
bool has_more = false;
431
bool force_sync_resp = true;
432
bool enable_batch_opt = session.is_enable_batched_multi_statement();
433
bool use_plan_cache = session.get_local_ob_enable_plan_cache();
434
optimization_done = false;
435
if (queries.count() <= 1 || parse_stat.parse_fail_) {
437
} else if (!enable_batch_opt) {
439
} else if (!use_plan_cache) {
440
// 不打开plan_cache开关,则优化不支持
441
} else if (OB_FAIL(process_single_stmt(ObMultiStmtItem(false, 0, sql_, &queries, is_ins_multi_val_opt),
448
if (THIS_WORKER.need_retry()) {
449
// fail optimize, is a large query, just go back to large query queue and retry
453
LOG_WARN("failed to process batch stmt, cover the error code and reset retry flag",
454
K(tmp_ret), K(ret), K(THIS_WORKER.need_retry()));
456
optimization_done = true;
459
LOG_TRACE("after to try batched multi-stmt optimization", K(optimization_done),
460
K(queries), K(enable_batch_opt), K(ret), K(THIS_WORKER.need_retry()), K(retry_ctrl_.need_retry()), K(retry_ctrl_.get_retry_type()));
464
int ObMPQuery::process_single_stmt(const ObMultiStmtItem &multi_stmt_item,
465
ObSQLSessionInfo &session,
466
bool has_more_result,
467
bool force_sync_resp,
468
bool &async_resp_used,
469
bool &need_disconnect)
471
int ret = OB_SUCCESS;
472
FLTSpanGuard(mpquery_single_stmt);
473
ctx_.spm_ctx_.reset();
474
bool need_response_error = true;
475
const bool enable_trace_log = lib::is_trace_log_enabled();
476
session.get_raw_audit_record().request_memory_used_ = 0;
477
observer::ObProcessMallocCallback pmcb(0,
478
session.get_raw_audit_record().request_memory_used_);
479
lib::ObMallocCallbackGuard guard(pmcb);
480
// 执行setup_wb后,所有WARNING都会写入到当前session的WARNING BUFFER中
482
// 当新语句开始的时候,将该值归0,因为curr_trans_last_stmt_end_time是用于
483
// 实现事务内部的语句执行间隔过长超时功能的。
484
session.set_curr_trans_last_stmt_end_time(0);
486
//============================ 注意这些变量的生命周期 ================================
487
ObSessionStatEstGuard stat_est_guard(get_conn()->tenant_->id(), session.get_sessid());
488
if (OB_FAIL(init_process_var(ctx_, multi_stmt_item, session))) {
489
LOG_WARN("init process var failed.", K(ret), K(multi_stmt_item));
491
if (enable_trace_log) {
492
//set session log_level.Must use ObThreadLogLevelUtils::clear() in pair
493
ObThreadLogLevelUtils::init(session.get_log_id_level_map());
495
// obproxy may use 'SET @@last_schema_version = xxxx' to set newest schema,
496
// observer will force refresh schema if local_schema_version < last_schema_version;
497
if (OB_FAIL(check_and_refresh_schema(session.get_login_tenant_id(),
498
session.get_effective_tenant_id(),
500
LOG_WARN("failed to check_and_refresh_schema", K(ret));
501
} else if (OB_FAIL(session.update_timezone_info())) {
502
LOG_WARN("fail to update time zone info", K(ret));
504
need_response_error = false;
506
ctx_.self_add_plan_ = false;
507
retry_ctrl_.reset_retry_times();//每个statement单独记录retry times
508
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
510
ret = OB_SUCCESS; //当发生本地重试的时候,需要重置错误码,不然无法推进重试
511
need_disconnect = true;
513
//create a new temporary memory context for executing sql can
514
//avoid the problem the memory cannot be released in time due to too many sql items
515
//but it will drop the sysbench performance about 1~4%
516
//so we execute the first sql with the default memory context and
517
//execute the rest sqls with a temporary memory context to avoid memory dynamic leaks
518
retry_ctrl_.clear_state_before_each_retry(session.get_retry_info_for_update());
519
bool first_exec_sql = session.get_is_in_retry() ? false :
520
(multi_stmt_item.is_part_of_multi_stmt() ? multi_stmt_item.get_seq_num() <= 1 : true);
521
if (OB_LIKELY(first_exec_sql)) {
522
ret = do_process(session,
529
ret = process_with_tmp_context(session,
535
//set session retry state
536
session.set_session_in_retry(retry_ctrl_.need_retry());
537
} while (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type());
538
//@notice: after the async packet is responsed,
539
//the easy_buf_ hold by the sql string may have been released.
540
//from here on, we can no longer access multi_stmt_item.sql_,
541
//otherwise there is a risk of coredump
542
//@TODO: need to determine a mechanism to ensure the safety of memory access here
544
ObThreadLogLevelUtils::clear();
545
const int64_t debug_sync_timeout = GCONF.debug_sync_timeout;
546
if (debug_sync_timeout > 0) {
547
// ignore thread local debug sync actions to session actions failed
548
int tmp_ret = OB_SUCCESS;
549
tmp_ret = GDS.collect_result_actions(session.get_debug_sync_actions());
550
if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
551
LOG_WARN("set thread local debug sync actions to session actions failed", K(tmp_ret));
556
//对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret
557
int tmp_ret = OB_SUCCESS;
559
tmp_ret = do_after_process(session, ctx_, async_resp_used);
561
// 设置上一条语句的结束时间,由于这里只用于实现事务内部的语句之间的执行超时,
562
// 因此,首先,需要判断是否处于事务执行的过程中。然后对于事务提交的时候的异步回包,
563
// 也不需要在这里设置结束时间,因为这已经相当于事务的最后一条语句了。
564
// 最后,需要判断ret错误码,只有成功执行的sql才记录结束时间
565
if (session.get_in_transaction() && !async_resp_used && OB_SUCC(ret)) {
566
session.set_curr_trans_last_stmt_end_time(common::ObTimeUtility::current_time());
569
// need_response_error这个变量保证仅在
570
// do { do_process } while(retry) 之前出错才会
571
// 走到send_error_packet逻辑
572
// 所以无需考虑当前为sync还是async模式
573
if (!OB_SUCC(ret) && need_response_error && is_conn_valid()) {
574
send_error_packet(ret, NULL);
580
OB_NOINLINE int ObMPQuery::process_with_tmp_context(ObSQLSessionInfo &session,
581
bool has_more_result,
582
bool force_sync_resp,
583
bool &async_resp_used,
584
bool &need_disconnect)
586
int ret = OB_SUCCESS;
587
//create a temporary memory context to process retry or the rest sql of multi-query,
588
//avoid memory dynamic leaks caused by query retry or too many multi-query items
589
lib::ContextParam param;
590
param.set_mem_attr(MTL_ID(),
591
ObModIds::OB_SQL_EXECUTOR, ObCtxIds::DEFAULT_CTX_ID)
592
.set_properties(lib::USE_TL_PAGE_OPTIONAL)
593
.set_page_size(OB_MALLOC_REQ_NORMAL_BLOCK_SIZE)
594
.set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE);
595
CREATE_WITH_TEMP_CONTEXT(param) {
596
ret = do_process(session,
601
ctx_.first_plan_hash_ = 0;
602
ctx_.first_outline_data_.reset();
608
OB_INLINE int ObMPQuery::get_tenant_schema_info_(const uint64_t tenant_id,
609
ObTenantCachedSchemaGuardInfo *cache_info,
610
ObSchemaGetterGuard *&schema_guard,
611
int64_t &tenant_version,
612
int64_t &sys_version)
614
int ret = OB_SUCCESS;
615
ObSchemaGetterGuard &cached_guard = cache_info->get_schema_guard();
616
bool need_refresh = false;
618
if (!cached_guard.is_inited()) {
621
} else if (tenant_id != cached_guard.get_tenant_id()) {
625
int64_t tmp_tenant_version = 0;
626
int64_t tmp_sys_version = 0;
627
if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(tenant_id, tmp_tenant_version))) {
628
LOG_WARN("get tenant refreshed schema version error", K(ret), K(tenant_id));
629
} else if (OB_FAIL(cached_guard.get_schema_version(tenant_id, tenant_version))) {
630
LOG_WARN("fail get schema version", K(ret), K(tenant_id));
631
} else if (tmp_tenant_version != tenant_version) {
634
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(OB_SYS_TENANT_ID, tmp_sys_version))) {
635
LOG_WARN("get sys tenant refreshed schema version error", K(ret), "sys_tenant_id", OB_SYS_TENANT_ID);
636
} else if (OB_FAIL(cached_guard.get_schema_version(OB_SYS_TENANT_ID, sys_version))) {
637
LOG_WARN("fail get sys schema version", K(ret));
638
} else if (tmp_sys_version != sys_version) {
647
//获取session上缓存的最新schema guard
648
schema_guard = &(cache_info->get_schema_guard());
649
} else if (OB_FAIL(cache_info->refresh_tenant_schema_guard(tenant_id))) {
650
LOG_WARN("refresh tenant schema guard failed", K(ret), K(tenant_id));
652
//获取session上缓存的最新schema guard
653
schema_guard = &(cache_info->get_schema_guard());
654
if (OB_FAIL(schema_guard->get_schema_version(tenant_id, tenant_version))) {
655
LOG_WARN("fail get schema version", K(ret), K(tenant_id));
656
} else if (OB_FAIL(schema_guard->get_schema_version(OB_SYS_TENANT_ID, sys_version))) {
657
LOG_WARN("fail get sys schema version", K(ret));
667
OB_INLINE int ObMPQuery::do_process(ObSQLSessionInfo &session,
668
bool has_more_result,
669
bool force_sync_resp,
670
bool &async_resp_used,
671
bool &need_disconnect)
673
int ret = OB_SUCCESS;
674
ObAuditRecordData &audit_record = session.get_raw_audit_record();
675
audit_record.try_cnt_++;
676
bool is_diagnostics_stmt = false;
677
bool need_response_error = true;
678
const ObString &sql = ctx_.multi_stmt_item_.get_sql();
679
const bool enable_perf_event = lib::is_diagnose_info_enabled();
680
const bool enable_sql_audit =
681
GCONF.enable_sql_audit && session.get_local_ob_enable_sql_audit();
682
single_process_timestamp_ = ObTimeUtility::current_time();
684
* 注意req_timeinfo_guard一定要放在result前面
687
ObReqTimeGuard req_timeinfo_guard;
688
ObPhysicalPlan *plan = nullptr;
689
ObSchemaGetterGuard* schema_guard = nullptr;
690
ObTenantCachedSchemaGuardInfo &cached_schema_info = session.get_cached_schema_guard_info();
691
int64_t tenant_version = 0;
692
int64_t sys_version = 0;
693
common::ObSqlInfoGuard si_guard(sql);
694
ObSqlFatalErrExtraInfoGuard extra_info_guard;
695
extra_info_guard.set_cur_sql(sql);
696
extra_info_guard.set_tenant_id(session.get_effective_tenant_id());
697
ObIAllocator &allocator = CURRENT_CONTEXT->get_arena_allocator();
698
SMART_VAR(ObMySQLResultSet, result, session, allocator) {
699
if (OB_FAIL(get_tenant_schema_info_(session.get_effective_tenant_id(),
704
LOG_WARN("get tenant schema info error", K(ret), K(session));
705
} else if (OB_FAIL(session.update_query_sensitive_system_variable(*schema_guard))) {
706
LOG_WARN("update query sensitive system vairable in session failed", K(ret));
707
} else if (OB_FAIL(update_transmission_checksum_flag(session))) {
708
LOG_WARN("update transmisson checksum flag failed", K(ret));
709
} else if (OB_ISNULL(gctx_.sql_engine_)) {
710
ret = OB_ERR_UNEXPECTED;
711
LOG_ERROR("invalid sql engine", K(ret), K(gctx_));
713
session.set_current_execution_id(GCTX.sql_engine_->get_execution_id());
714
result.get_exec_context().set_need_disconnect(true);
715
ctx_.schema_guard_ = schema_guard;
716
retry_ctrl_.set_tenant_local_schema_version(tenant_version);
717
retry_ctrl_.set_sys_local_schema_version(sys_version);
718
extra_info_guard.set_exec_context(&(result.get_exec_context()));
721
ObWaitEventStat total_wait_desc;
722
ObDiagnoseSessionInfo *di = NULL;
724
if (enable_perf_event) {
725
di = ObDiagnoseSessionInfo::get_local_diagnose_info();
727
ObMaxWaitGuard max_wait_guard(enable_perf_event ? &audit_record.exec_record_.max_wait_event_ : NULL, di);
728
ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di);
729
if (enable_perf_event) {
730
audit_record.exec_record_.record_start(di);
732
result.set_has_more_result(has_more_result);
733
ObTaskExecutorCtx &task_ctx = result.get_exec_context().get_task_exec_ctx();
734
task_ctx.schema_service_ = gctx_.schema_service_;
735
task_ctx.set_query_tenant_begin_schema_version(retry_ctrl_.get_tenant_local_schema_version());
736
task_ctx.set_query_sys_begin_schema_version(retry_ctrl_.get_sys_local_schema_version());
737
task_ctx.set_min_cluster_version(GET_MIN_CLUSTER_VERSION());
738
ctx_.retry_times_ = retry_ctrl_.get_retry_times();
739
ctx_.enable_sql_resource_manage_ = true;
740
//storage::ObPartitionService* ps = static_cast<storage::ObPartitionService *> (GCTX.par_ser_);
741
//bool is_read_only = false;
744
} else if (OB_ISNULL(ctx_.schema_guard_)) {
745
ret = OB_INVALID_ARGUMENT;
746
LOG_WARN("newest schema is NULL", K(ret));
747
} else if (OB_FAIL(set_session_active(sql, session, single_process_timestamp_))) {
748
LOG_WARN("fail to set session active", K(ret));
749
} else if (OB_FAIL(gctx_.sql_engine_->stmt_query(sql, ctx_, result))) {
750
exec_start_timestamp_ = ObTimeUtility::current_time();
751
if (!THIS_WORKER.need_retry()) {
752
int cli_ret = OB_SUCCESS;
753
retry_ctrl_.test_and_save_retry_state(gctx_, ctx_, result, ret, cli_ret);
754
if (OB_ERR_PROXY_REROUTE == ret) {
755
LOG_DEBUG("run stmt_query failed, check if need retry",
756
K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K(sql));
758
LOG_WARN("run stmt_query failed, check if need retry",
759
K(ret), K(cli_ret), K(retry_ctrl_.need_retry()),
760
"sql", ctx_.is_sensitive_ ? ObString(OB_MASKED_STR) : sql);
763
if (OB_ERR_PROXY_REROUTE == ret) {
764
// 该错误码在下层是由编译阶段被设置,async_resp_used标志一定为false
765
// 所以此时可以同步回包,设置need_response_error
766
// 向客户端返回一个error包,表示需要二次路由
767
need_response_error = true;
768
} else if (ctx_.multi_stmt_item_.is_batched_multi_stmt()) {
769
// batch execute with error,should not response error packet
770
need_response_error = false;
771
} else if (OB_BATCHED_MULTI_STMT_ROLLBACK == ret) {
772
need_response_error = false;
775
retry_ctrl_.set_packet_retry(ret);
776
session.get_retry_info_for_update().set_last_query_retry_err(ret);
777
session.get_retry_info_for_update().inc_retry_cnt();
781
exec_start_timestamp_ = ObTimeUtility::current_time();
782
result.get_exec_context().set_plan_start_time(exec_start_timestamp_);
783
// 本分支内如果出错,全部会在response_result内部处理妥当
785
need_response_error = false;
786
is_diagnostics_stmt = ObStmt::is_diagnostic_stmt(result.get_literal_stmt_type());
787
ctx_.is_show_trace_stmt_ = ObStmt::is_show_trace_stmt(result.get_literal_stmt_type());
788
plan = result.get_physical_plan();
789
extra_info_guard.set_cur_plan(plan);
791
if (get_is_com_filed_list()) {
792
result.set_is_com_filed_list();
793
result.set_wildcard_string(wild_str_);
798
//TODO shengle, confirm whether 4.0 is required
799
//} else if (OB_FAIL(fill_feedback_session_info(*result, session))) {
800
//need_response_error = true;
801
//LOG_WARN("failed to fill session info", K(ret));
802
} else if (OB_FAIL(response_result(result,
805
ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
806
if (OB_ISNULL(plan_ctx)) {
807
LOG_ERROR("execute query fail, and plan_ctx is NULL", K(ret));
809
if (OB_TRANSACTION_SET_VIOLATION != ret && OB_REPLICA_NOT_READABLE != ret) {
810
LOG_WARN("execute query fail", K(ret), "timeout_timestamp",
811
plan_ctx->get_timeout_timestamp());
817
int tmp_ret = OB_SUCCESS;
818
tmp_ret = OB_E(EventTable::EN_PRINT_QUERY_SQL) OB_SUCCESS;
819
if (OB_SUCCESS != tmp_ret) {
820
LOG_INFO("query info:", K(sql_),
821
"sess_id", result.get_session().get_sessid(),
822
"trans_id", result.get_session().get_tx_id());
826
exec_end_timestamp_ = ObTimeUtility::current_time();
828
// some statistics must be recorded for plan stat, even though sql audit disabled
829
bool first_record = (1 == audit_record.try_cnt_);
830
ObExecStatUtils::record_exec_timestamp(*this, first_record, audit_record.exec_timestamp_);
831
audit_record.exec_timestamp_.update_stage_time();
833
if (enable_perf_event) {
834
audit_record.exec_record_.record_end(di);
835
record_stat(result.get_stmt_type(), exec_end_timestamp_);
836
audit_record.exec_record_.wait_time_end_ = total_wait_desc.time_waited_;
837
audit_record.exec_record_.wait_count_end_ = total_wait_desc.total_waits_;
838
audit_record.update_event_stage_state();
841
if (enable_perf_event && !THIS_THWORKER.need_retry()
842
&& OB_NOT_NULL(result.get_physical_plan())) {
843
const int64_t time_cost = exec_end_timestamp_ - get_receive_timestamp();
844
ObSQLUtils::record_execute_time(result.get_physical_plan()->get_plan_type(), time_cost);
848
// 2. 没有给客户端返回结果,本次执行没有副作用
849
// 3. need_retry(result, ret):schema 或 location cache 失效
851
if (OB_UNLIKELY(retry_ctrl_.need_retry())) {
852
if (OB_TRANSACTION_SET_VIOLATION != ret && OB_REPLICA_NOT_READABLE != ret && OB_TRY_LOCK_ROW_CONFLICT != ret) {
854
LOG_WARN("try to execute again",
856
N_TYPE, result.get_stmt_type(),
857
"retry_type", retry_ctrl_.get_retry_type(),
858
"timeout_remain", THIS_WORKER.get_timeout_remain());
862
// 首个plan执行完成后立即freeze partition hit
863
// partition_hit一旦freeze后,后继的try_set_bool操作都不生效
864
if (OB_LIKELY(NULL != result.get_physical_plan())) {
865
session.partition_hit().freeze();
868
// store the warning message from the most recent statement in the current session
869
if ((OB_SUCC(ret) && is_diagnostics_stmt) || async_resp_used) {
870
// If diagnostic stmt execute successfully, it dosen't clear the warning message.
871
// Or if it response to client asynchronously, it doesn't clear the warning message here,
872
// but will do it in the callback thread.
873
session.update_show_warnings_buf();
875
session.set_show_warnings_buf(ret); // TODO: 挪个地方性能会更好,减少部分wb拷贝
878
if (OB_FAIL(ret) && !async_resp_used && need_response_error && is_conn_valid() && !THIS_WORKER.need_retry()) {
879
if (OB_ERR_PROXY_REROUTE == ret) {
880
LOG_DEBUG("query should be rerouted", K(ret), K(async_resp_used));
882
LOG_WARN("query failed", K(ret), K(session),
883
"sql", ctx_.is_sensitive_ ? ObString(OB_MASKED_STR) : sql,
884
K(retry_ctrl_.need_retry()));
886
// 当need_retry=false时,可能给客户端回过包了,可能还没有回过任何包。
887
// 不过,可以确定:这个请求出错了,还没处理完。如果不是已经交给异步EndTrans收尾,
888
// 则需要在下面回复一个error_packet作为收尾。否则后面没人帮忙发错误包给客户端了,
890
bool is_partition_hit = session.get_err_final_partition_hit(ret);
891
int err = send_error_packet(ret, NULL, is_partition_hit, (void *)ctx_.get_reroute_info());
892
if (OB_SUCCESS != err) { // 发送error包
893
LOG_WARN("send error packet failed", K(ret), K(err));
899
audit_record.status_ = (0 == ret || OB_ITER_END == ret)
900
? REQUEST_SUCC : (ret);
901
if (enable_sql_audit) {
902
audit_record.seq_ = 0; //don't use now
903
audit_record.execution_id_ = session.get_current_execution_id();
904
audit_record.client_addr_ = session.get_peer_addr();
905
audit_record.user_client_addr_ = session.get_user_client_addr();
906
audit_record.user_group_ = THIS_WORKER.get_group_id();
907
MEMCPY(audit_record.sql_id_, ctx_.sql_id_, (int32_t)sizeof(audit_record.sql_id_));
909
audit_record.plan_type_ = plan->get_plan_type();
910
audit_record.table_scan_ = plan->contain_table_scan();
911
audit_record.plan_id_ = plan->get_plan_id();
912
audit_record.plan_hash_ = plan->get_plan_hash_value();
913
audit_record.rule_name_ = const_cast<char *>(plan->get_rule_name().ptr());
914
audit_record.rule_name_len_ = plan->get_rule_name().length();
915
audit_record.partition_hit_ = session.partition_hit().get_bool();
917
if (OB_FAIL(ret) && audit_record.trans_id_ == 0) {
918
// normally trans_id is set in the `start-stmt` phase,
919
// if `start-stmt` hasn't run, set trans_id from session if an active txn exist
920
audit_record.trans_id_ = session.get_tx_id();
922
audit_record.affected_rows_ = result.get_affected_rows();
923
audit_record.return_rows_ = result.get_return_rows();
924
audit_record.partition_cnt_ = result.get_exec_context()
926
.get_related_tablet_cnt();
927
audit_record.expected_worker_cnt_ = result.get_exec_context()
929
.get_expected_worker_cnt();
930
audit_record.used_worker_cnt_ = result.get_exec_context()
932
.get_admited_worker_cnt();
934
audit_record.is_executor_rpc_ = false;
935
audit_record.is_inner_sql_ = false;
936
audit_record.is_hit_plan_cache_ = result.get_is_from_plan_cache();
937
audit_record.is_multi_stmt_ = session.get_capability().cap_flags_.OB_CLIENT_MULTI_STATEMENTS;
938
audit_record.is_batched_multi_stmt_ = ctx_.multi_stmt_item_.is_batched_multi_stmt();
940
OZ (store_params_value_to_str(allocator, session, result.get_ps_params()));
941
audit_record.params_value_ = params_value_;
942
audit_record.params_value_len_ = params_value_len_;
943
audit_record.is_perf_event_closed_ = !lib::is_diagnose_info_enabled();
945
ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
946
if (OB_ISNULL(plan_ctx)) {
949
audit_record.consistency_level_ = plan_ctx->get_consistency_level();
952
//update v$sql statistics
953
if (session.get_local_ob_enable_plan_cache()
954
&& !retry_ctrl_.need_retry()) {
955
ObIArray<ObTableRowCount> *table_row_count_list = NULL;
956
ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
957
if (OB_ISNULL(plan_ctx)) {
960
table_row_count_list = &(plan_ctx->get_table_row_count_list());
961
audit_record.table_scan_stat_ = plan_ctx->get_table_scan_stat();
964
if (!(ctx_.self_add_plan_) && ctx_.plan_cache_hit_) {
965
plan->update_plan_stat(audit_record,
966
false, // false mean not first update plan stat
967
result.get_exec_context().get_is_evolution(),
968
table_row_count_list);
969
plan->update_cache_access_stat(audit_record.table_scan_stat_);
970
} else if (ctx_.self_add_plan_ && !ctx_.plan_cache_hit_) {
971
plan->update_plan_stat(audit_record,
973
result.get_exec_context().get_is_evolution(),
974
table_row_count_list);
975
plan->update_cache_access_stat(audit_record.table_scan_stat_);
976
} else if (ctx_.self_add_plan_ && ctx_.plan_cache_hit_) {
977
// spm evolution plan first execute
978
plan->update_plan_stat(audit_record,
980
result.get_exec_context().get_is_evolution(),
981
table_row_count_list);
982
plan->update_cache_access_stat(audit_record.table_scan_stat_);
986
// reset thread waring buffer in sync mode
987
if (!async_resp_used) {
988
clear_wb_content(session);
991
need_disconnect = (result.get_exec_context().need_disconnect()
992
&& !is_query_killed_return(ret));//明确是kill query时,不应该断连接
993
if (need_disconnect) {
994
LOG_WARN("need disconnect", K(ret), K(need_disconnect));
996
bool is_need_retry = THIS_THWORKER.need_retry() ||
997
RETRY_TYPE_NONE != retry_ctrl_.get_retry_type();
999
if (!is_need_retry) {
1000
(void)ObSQLUtils::handle_plan_baseline(audit_record, plan, ret, ctx_);
1003
(void)ObSQLUtils::handle_audit_record(is_need_retry, EXECUTE_LOCAL, session,
1004
ctx_.is_sensitive_);
1005
#ifdef OB_BUILD_AUDIT_SECURITY
1006
// 对于触发重试的语句不需要进行审计,以免一条语句被审计多次
1007
if (!retry_ctrl_.need_retry() && !async_resp_used) {
1008
(void)ObSecurityAuditUtils::handle_security_audit(result,
1011
ObString::make_empty_string(),
1019
int ObMPQuery::store_params_value_to_str(ObIAllocator &allocator,
1020
sql::ObSQLSessionInfo &session,
1021
common::ParamStore ¶ms)
1023
int ret = OB_SUCCESS;
1025
int64_t length = OB_MAX_SQL_LENGTH;
1026
CK (OB_NOT_NULL(params_value_ = static_cast<char *>(allocator.alloc(OB_MAX_SQL_LENGTH))));
1027
for (int64_t i = 0; OB_SUCC(ret) && i < params.count(); ++i) {
1028
const common::ObObjParam ¶m = params.at(i);
1029
if (param.is_ext()) {
1031
params_value_ = NULL;
1032
params_value_len_ = 0;
1035
OZ (param.print_sql_literal(params_value_, length, pos, allocator, TZ_INFO(&session)));
1036
if (i != params.count() - 1) {
1037
OZ (databuff_printf(params_value_, length, pos, allocator, ","));
1042
params_value_ = NULL;
1043
params_value_len_ = 0;
1046
params_value_len_ = pos;
1051
//int ObMPQuery::fill_feedback_session_info(ObMySQLResultSet &result,
1052
// ObSQLSessionInfo &session)
1054
// int ret = OB_SUCCESS;
1055
// ObPhysicalPlan *temp_plan = NULL;
1056
// ObTaskExecutorCtx *temp_task_ctx = NULL;
1057
// ObSchemaGetterGuard *schema_guard = NULL;
1058
// if (session.is_abundant_feedback_support() &&
1059
// NULL != (temp_plan = result.get_physical_plan()) &&
1060
// NULL != (temp_task_ctx = result.get_exec_context().get_task_executor_ctx()) &&
1061
// NULL != (schema_guard = ctx_.schema_guard_) &&
1062
// temp_plan->get_plan_type() == ObPhyPlanType::OB_PHY_PLAN_REMOTE &&
1063
// temp_plan->get_location_type() != ObPhyPlanType::OB_PHY_PLAN_UNCERTAIN &&
1064
// temp_task_ctx->get_table_locations().count() == 1 &&
1065
// temp_task_ctx->get_table_locations().at(0).get_partition_location_list().count() == 1) {
1066
// bool is_cache_hit = false;
1067
// ObFBPartitionParam param;
1068
// //FIXME: should remove ObPartitionKey
1069
// ObPartitionKey partition_key;
1070
// ObPartitionLocation partition_loc;
1071
// const ObTableSchema *table_schema = NULL;
1072
// ObPartitionReplicaLocationIArray &pl_array =
1073
// temp_task_ctx->get_table_locations().at(0).get_partition_location_list();
1074
// if (OB_FAIL(pl_array.at(0).get_partition_key(partition_key))) {
1075
// LOG_WARN("failed to get partition key", K(ret));
1076
// } else if (OB_FAIL(temp_cache->get(partition_key,
1080
// LOG_WARN("failed to get partition location", K(ret));
1081
// } else if (OB_FAIL(schema_guard->get_table_schema(partition_key.get_tenant_id(),
1082
// partition_key.get_table_id(),
1084
// LOG_WARN("failed to get table schema", K(ret), K(partition_key));
1085
// } else if (OB_ISNULL(table_schema)) {
1086
// ret = OB_ERR_UNEXPECTED;
1087
// LOG_WARN("null table schema", K(ret));
1088
// } else if (OB_FAIL(build_fb_partition_param(*table_schema, partition_loc, param))) {
1089
// LOG_WARN("failed to build fb partition pararm", K(ret));
1090
// } else if (OB_FAIL(session.set_partition_location_feedback(param))) {
1091
// LOG_WARN("failed to set partition location feedback", K(param), K(ret));
1092
// } else { /*do nothing*/ }
1093
// } else { /*do nothing*/}
1097
//int ObMPQuery::build_fb_partition_param(
1098
// const ObTableSchema &table_schema,
1099
// const ObPartitionLocation &partition_loc,
1100
// ObFBPartitionParam ¶m) {
1102
// param.schema_version_ = table_schema.get_schema_version();
1103
// int64_t origin_partition_idx = OB_INVALID_ID;
1104
// if (OB_FAIL(param.pl_.assign(partition_loc))) {
1105
// LOG_WARN("fail to assign pl", K(partition_loc), K(ret));
1107
// // when table partition_id to client, we need convert it to
1108
// // real partition idx(e.g. hash partition split)
1109
// else if (OB_FAIL(table_schema.convert_partition_id_to_idx(
1110
// partition_loc.get_partition_id(), origin_partition_idx))) {
1111
// LOG_WARN("fail to convert partition id", K(partition_loc), K(ret));
1113
// param.original_partition_id_ = origin_partition_idx;
1119
int ObMPQuery::check_readonly_stmt(ObMySQLResultSet &result)
1121
int ret = OB_SUCCESS;
1122
bool is_readonly = false;
1123
//在该阶段,show语句会转换为select语句,
1124
//literal_stmt_type若不为stmt::T_NONE,则表示原有的类型show
1125
const stmt::StmtType type = stmt::T_NONE == result.get_literal_stmt_type() ?
1126
result.get_stmt_type() :
1127
result.get_literal_stmt_type();
1128
ObConsistencyLevel consistency = INVALID_CONSISTENCY;
1129
if (OB_FAIL(is_readonly_stmt(result, is_readonly))) {
1130
LOG_WARN("check stmt is readonly fail", K(ret), K(result));
1131
} else if (!is_readonly) {
1132
ret = OB_ERR_READ_ONLY;
1133
LOG_WARN("stmt is not readonly", K(ret), K(result));
1134
} else if (stmt::T_SELECT == type) {
1136
//通过设置/*+read_consistency()*/ hint
1137
//or 指定session级别ob_read_consistency = 2
1139
const int64_t table_count = DAS_CTX(result.get_exec_context()).get_table_loc_list().size();
1140
if (0 == table_count) {
1141
//此处比较特殊,jdbc在发送查询语句时会带上特殊的语句select @@session.tx_read_only;
1142
//为方便obtest测试,需要放开对无table_locatitons的select语句的限制
1143
} else if (OB_FAIL(result.get_read_consistency(consistency))) {
1144
LOG_WARN("get read consistency fail", K(ret));
1145
} else if (WEAK != consistency) {
1146
ret = OB_ERR_READ_ONLY;
1147
LOG_WARN("strong consistency read is not allowed", K(ret), K(type), K(consistency));
1153
int ObMPQuery::is_readonly_stmt(ObMySQLResultSet &result, bool &is_readonly)
1155
int ret = OB_SUCCESS;
1156
is_readonly = false;
1157
const stmt::StmtType type = stmt::T_NONE == result.get_literal_stmt_type() ?
1158
result.get_stmt_type() :
1159
result.get_literal_stmt_type();
1161
case stmt::T_SELECT: {
1162
//对select...for update语句,也需要禁止
1163
ObPhysicalPlan *physical_plan = result.get_physical_plan();
1164
if (NULL == physical_plan) {
1165
ret = OB_ERR_UNEXPECTED;
1166
LOG_WARN("physical_plan should not be null", K(ret));
1167
} else if (physical_plan->has_for_update()) {
1168
is_readonly = false;
1174
case stmt::T_VARIABLE_SET: {
1175
//禁止set @@global.variable语句
1176
if (result.has_global_variable()) {
1177
is_readonly = false;
1183
case stmt::T_EXPLAIN:
1184
case stmt::T_SHOW_TABLES:
1185
case stmt::T_SHOW_DATABASES:
1186
case stmt::T_SHOW_COLUMNS:
1187
case stmt::T_SHOW_VARIABLES:
1188
case stmt::T_SHOW_TABLE_STATUS:
1189
case stmt::T_SHOW_SCHEMA:
1190
case stmt::T_SHOW_CREATE_DATABASE:
1191
case stmt::T_SHOW_CREATE_TABLE:
1192
case stmt::T_SHOW_CREATE_VIEW:
1193
case stmt::T_SHOW_PARAMETERS:
1194
case stmt::T_SHOW_SERVER_STATUS:
1195
case stmt::T_SHOW_INDEXES:
1196
case stmt::T_SHOW_WARNINGS:
1197
case stmt::T_SHOW_ERRORS:
1198
case stmt::T_SHOW_PROCESSLIST:
1199
case stmt::T_SHOW_CHARSET:
1200
case stmt::T_SHOW_COLLATION:
1201
case stmt::T_SHOW_TABLEGROUPS:
1202
case stmt::T_SHOW_STATUS:
1203
case stmt::T_SHOW_TENANT:
1204
case stmt::T_SHOW_CREATE_TENANT:
1205
case stmt::T_SHOW_TRACE:
1206
case stmt::T_SHOW_TRIGGERS:
1207
case stmt::T_SHOW_ENGINES:
1208
case stmt::T_SHOW_PRIVILEGES:
1209
case stmt::T_SHOW_RESTORE_PREVIEW:
1210
case stmt::T_SHOW_GRANTS:
1211
case stmt::T_SHOW_QUERY_RESPONSE_TIME:
1212
case stmt::T_SHOW_RECYCLEBIN:
1213
case stmt::T_SHOW_SEQUENCES:
1215
case stmt::T_USE_DATABASE:
1216
case stmt::T_SET_NAMES: //read only not restrict it
1217
case stmt::T_START_TRANS:
1218
case stmt::T_END_TRANS: {
1223
is_readonly = false;
1229
int ObMPQuery::deserialize()
1231
int ret = OB_SUCCESS;
1234
//OB_ASSERT(req_->get_type() == ObRequest::OB_MYSQL);
1235
if ( (OB_ISNULL(req_)) || (req_->get_type() != ObRequest::OB_MYSQL)) {
1236
ret = OB_INVALID_ARGUMENT;
1237
LOG_ERROR("invalid request", K(ret), K(req_));
1238
} else if (get_is_com_filed_list()) {
1239
if (OB_FAIL(deserialize_com_field_list())) {
1240
LOG_WARN("failed to deserialize com field list", K(ret));
1243
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
1244
sql_.assign_ptr(const_cast<char *>(pkt.get_cdata()), pkt.get_clen()-1);
1250
// return false only if send packet fail.
1251
OB_INLINE int ObMPQuery::response_result(ObMySQLResultSet &result,
1252
bool force_sync_resp,
1253
bool &async_resp_used)
1255
int ret = OB_SUCCESS;
1256
FLTSpanGuard(sql_execute);
1257
//ac = 1时线程新启事务进行oracle临时表数据清理会和clog回调形成死锁, 这里改成同步方式
1258
ObSQLSessionInfo &session = result.get_session();
1259
CHECK_COMPATIBILITY_MODE(&session);
1262
bool need_trans_cb = result.need_end_trans_callback() && (!force_sync_resp);
1264
bool need_trans_cb = result.need_end_trans_callback() &&
1265
(!force_sync_resp) &&
1266
(!ctx_.spm_ctx_.check_execute_status_);
1269
// 通过判断 plan 是否为 null 来确定是 plan 还是 cmd
1270
// 针对 plan 和 cmd 分开处理,逻辑会较为清晰。
1271
if (OB_LIKELY(NULL != result.get_physical_plan())) {
1272
if (need_trans_cb) {
1273
ObAsyncPlanDriver drv(gctx_, ctx_, session, retry_ctrl_, *this);
1274
// NOTE: sql_end_cb必须在drv.response_result()之前初始化好
1275
ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
1276
if (OB_FAIL(sql_end_cb.init(packet_sender_, &session))) {
1277
LOG_WARN("failed to init sql end callback", K(ret));
1278
} else if (OB_FAIL(drv.response_result(result))) {
1279
LOG_WARN("fail response async result", K(ret));
1281
async_resp_used = result.is_async_end_trans_submitted();
1283
// 试点ObQuerySyncDriver
1284
ObSyncPlanDriver drv(gctx_, ctx_, session, retry_ctrl_, *this);
1285
ret = drv.response_result(result);
1288
if (need_trans_cb) {
1289
ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
1290
ObAsyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this);
1291
if (OB_FAIL(sql_end_cb.init(packet_sender_, &session))) {
1292
LOG_WARN("failed to init sql end callback", K(ret));
1293
} else if (OB_FAIL(drv.response_result(result))) {
1294
LOG_WARN("fail response async result", K(ret));
1296
async_resp_used = result.is_async_end_trans_submitted();
1298
ObSyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this);
1299
session.set_pl_query_sender(&drv);
1300
session.set_ps_protocol(result.is_ps_protocol());
1301
ret = drv.response_result(result);
1302
session.set_pl_query_sender(NULL);
1309
inline void ObMPQuery::record_stat(const stmt::StmtType type, const int64_t end_time) const
1311
#define ADD_STMT_STAT(type) \
1312
case stmt::T_##type: \
1313
EVENT_INC(SQL_##type##_COUNT); \
1314
EVENT_ADD(SQL_##type##_TIME, time_cost); \
1316
const int64_t time_cost = end_time - get_receive_timestamp();
1317
if (!THIS_THWORKER.need_retry())
1321
ADD_STMT_STAT(SELECT);
1322
ADD_STMT_STAT(INSERT);
1323
ADD_STMT_STAT(REPLACE);
1324
ADD_STMT_STAT(UPDATE);
1325
ADD_STMT_STAT(DELETE);
1328
EVENT_INC(SQL_OTHER_COUNT);
1329
EVENT_ADD(SQL_OTHER_TIME, time_cost);
1336
int ObMPQuery::deserialize_com_field_list()
1338
int ret = OB_SUCCESS;
1339
//如果设置了只需要返回列定义,说明client传来的是COM_FIELD_LIST命令
1340
/* mysql中的COM_FIELD_LIST命令用于获取table中的列定义,其从client to server的packet为:
1341
* 1 [04] COM_FIELD_LIST
1343
* string[EOF] field wildcard
1344
* 首先是CMD类型,然后是表名,最后是匹配条件
1346
* server to client的packet为下面中的其中一个:
1347
* 1. a ERR_Packet(返回一个错误包)
1348
* 2. one or more Column Definition packets and a closing EOF_Packet(返回n个列定义+EOF)
1350
* 由于普通的select的查询结果已经包含了column定义,同时最大限度复用当前的代码逻辑,因此可以将COM_FIELD_LIST的命令
1352
* select * from table limit 0 ==> 获取filed define ==> 根据field wildcard 按需反回 Column Definition
1354
* 参考:https://dev.mysql.com/doc/internals/en/com-field-list.html
1356
ObIAllocator *alloc = &THIS_WORKER.get_sql_arena_allocator();
1357
if (OB_ISNULL(alloc)) {
1358
ret = OB_ERR_UNEXPECTED;
1359
LOG_WARN("get unexpected null", K(alloc), K(ret));
1361
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
1362
const char *str = pkt.get_cdata();
1363
uint32_t length = pkt.get_clen();
1364
const char *str1 = "select * from ";
1365
const char *str2 = " where 0";
1367
const int64_t str1_len = strlen(str1);
1368
const int64_t str2_len = strlen(str2);
1369
const int pre_size = str1_len + str2_len;
1370
//寻找client传过来的table_name和filed wildcard之间的分隔符(table_name [NULL] filed wildcard)
1371
for (; static_cast<int>(str[i]) != 0 && i < length; ++i) {}
1372
char *dest_str = static_cast<char *>(alloc->alloc(length + pre_size));
1373
if (OB_ISNULL(dest_str)) {
1374
ret = OB_ALLOCATE_MEMORY_FAILED;
1375
LOG_WARN("failed to alloc", K(dest_str));
1377
char *buf = dest_str;
1378
uint32_t real_len = 0;
1379
MEMSET(buf, 0, length + pre_size);
1380
MEMCPY(buf, str1, str1_len);
1381
buf = buf + str1_len;
1382
real_len = real_len + str1_len;
1383
MEMCPY(buf, str, i);
1385
real_len = real_len + i;
1386
MEMCPY(buf, str2, str2_len);
1387
real_len = real_len + str2_len;
1388
sql_.assign_ptr(dest_str, real_len);
1390
if (i + 1 < length - 1) {
1391
wild_str_.assign_ptr(str + i + 1, (length - 1) - (i +1));