2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
15
#include "observer/mysql/obmp_stmt_prepare.h"
17
#include "lib/worker.h"
18
#include "lib/oblog/ob_log.h"
19
#include "lib/stat/ob_session_stat.h"
20
#include "rpc/ob_request.h"
21
#include "rpc/obmysql/ob_mysql_packet.h"
22
#include "rpc/obmysql/packet/ompk_prepare.h"
23
#include "rpc/obmysql/packet/ompk_field.h"
24
#include "observer/mysql/ob_mysql_result_set.h"
25
#include "observer/mysql/ob_async_plan_driver.h"
26
#include "observer/mysql/ob_sync_cmd_driver.h"
27
#include "observer/mysql/ob_sync_plan_driver.h"
28
#include "rpc/obmysql/obsm_struct.h"
29
#include "observer/omt/ob_tenant.h"
30
#include "share/schema/ob_schema_getter_guard.h"
31
#include "sql/ob_sql_context.h"
32
#include "sql/session/ob_sql_session_info.h"
33
#include "sql/ob_sql.h"
34
#include "observer/ob_req_time_service.h"
35
#include "observer/mysql/obmp_utils.h"
41
using namespace common;
43
using namespace obmysql;
49
ObMPStmtPrepare::ObMPStmtPrepare(const ObGlobalContext &gctx)
51
retry_ctrl_(/*ctx_.retry_info_*/),
54
single_process_timestamp_(0),
55
exec_start_timestamp_(0),
56
exec_end_timestamp_(0)
58
ctx_.exec_type_ = MpQuery;
61
int ObMPStmtPrepare::deserialize()
64
if ((OB_ISNULL(req_)) || (req_->get_type() != ObRequest::OB_MYSQL)) {
65
ret = OB_INVALID_ARGUMENT;
66
LOG_ERROR("invalid request", K(ret), K(req_));
68
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
69
sql_.assign_ptr(const_cast<char *>(pkt.get_cdata()), pkt.get_clen()-1);
75
int ObMPStmtPrepare::before_process()
78
if (OB_FAIL(ObMPBase::before_process())) {
79
LOG_WARN("failed to pre processing packet", K(ret));
80
} else if (0 == sql_.case_compare("call dbms_output.get_line(?, ?)")) {
82
} else if (!GCONF._ob_enable_prepared_statement) {
83
ret = OB_NOT_SUPPORTED;
84
LOG_USER_ERROR(OB_NOT_SUPPORTED,
85
"while parameter _ob_enable_prepared_statement is disabled, prepared statement");
86
send_error_packet(ret, NULL);
92
int ObMPStmtPrepare::multiple_query_check(ObSQLSessionInfo &session,
94
bool &force_sync_resp,
95
bool &need_response_error)
98
if (OB_UNLIKELY(1 == session.get_capability().cap_flags_.OB_CLIENT_MULTI_STATEMENTS)) {
99
ObSEArray<ObString, 1> queries;
100
ObParser parser(THIS_WORKER.get_allocator(),
101
session.get_sql_mode(), session.get_charsets4parser());
102
bool parse_fail = false;
103
ObMPParseStat parse_stat;
104
force_sync_resp = true;
105
/* MySQL处理Multi-Stmt出错时候的行为:
106
* 遇到首次运行失败(包括解析或执行)的SQL后,停止读取后继数据
108
* (1) select 1; selct 2; select 3;
109
* select 1执行成功,selct 2报语法错误,select 3不被执行
110
* (2) select 1; drop table not_exists_table; select 3;
111
* select 1执行成功,drop table not_exists_table报表不存在错误,select 3不被执行
114
* split_multiple_stmt是根据分号来分割语句,但有可能遇到“语法错误”,
115
* 这里说的“语法错误”不是说select写成了selct,而是“token”级别的语法错误,例如语句
116
* select 1;`select 2; select 3;
117
* 上面`和'都没有形成闭合的字符串token,token parser会报告语法错误
118
* 上面的例子中,得到的queries.count() 等于 2,分别为select 1和 `select 2; select 3;
120
ret = parser.split_multiple_stmt(sql, queries, parse_stat, false, true);
121
if (OB_SUCC(ret)) { // ret=SUCC,并不意味着parse就一定成功,可能最后一个query是parse失败的
122
if (OB_UNLIKELY(queries.count() <= 0)) {
123
LOG_ERROR("emtpy query count. client would have suspended. never be here!",
124
K(sql), K(parse_fail));
125
} else if (queries.count() > 1) {
126
ret = OB_NOT_SUPPORTED;
127
need_response_error = true;
128
LOG_WARN("can't not prepare multi stmt", K(ret), K(queries.count()));
130
if (OB_UNLIKELY(parse_stat.parse_fail_ && (0 == parse_stat.fail_query_idx_)
131
&& ObSQLUtils::check_need_disconnect_parser_err(parse_stat.fail_ret_))) {
132
// 进入本分支,说明在multi_query中的某条query parse失败,如果不是语法错,则进入该分支
133
// 如果当前query_count 为1, 则不断连接;如果大于1,
134
// 则需要在发错误包之后断连接,防止客户端一直在等接下来的回包
136
ret = parse_stat.fail_ret_;
137
need_response_error = true;
141
// 进入本分支,说明push_back出错,OOM,委托外层代码返回错误码
143
need_response_error = true;
144
LOG_WARN("need response error", K(ret));
150
int ObMPStmtPrepare::process()
152
int ret = OB_SUCCESS;
153
ObSQLSessionInfo *sess = NULL;
154
bool need_response_error = true;
155
bool async_resp_used = false; // 由事务提交线程异步回复客户端
156
int64_t query_timeout = 0;
157
ObSMConnection *conn = get_conn();
158
bool need_disconnect = true;
160
if (OB_ISNULL(req_) || OB_ISNULL(conn)) {
161
ret = OB_ERR_UNEXPECTED;
162
LOG_WARN("req or conn is null", K_(req), K(conn), K(ret));
163
} else if (OB_UNLIKELY(!conn->is_in_authed_phase())) {
164
ret = OB_ERR_NO_PRIVILEGE;
165
LOG_WARN("receive sql without session", K_(sql), K(ret));
166
} else if (OB_ISNULL(conn->tenant_)) {
167
ret = OB_ERR_UNEXPECTED;
168
LOG_ERROR("invalid tenant", K_(sql), K(conn->tenant_), K(ret));
169
} else if (OB_FAIL(get_session(sess))) {
170
LOG_WARN("get session fail", K_(sql), K(ret));
171
} else if (OB_ISNULL(sess)) {
172
ret = OB_ERR_UNEXPECTED;
173
LOG_WARN("session is NULL or invalid", K_(sql), K(sess), K(ret));
174
} else if (OB_FAIL(update_transmission_checksum_flag(*sess))) {
175
LOG_WARN("update transmisson checksum flag failed", K(ret));
177
ObSQLSessionInfo &session = *sess;
178
THIS_WORKER.set_session(sess);
179
lib::CompatModeGuard g(sess->get_compatibility_mode() == ORACLE_MODE ?
180
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL);
181
ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
182
session.set_current_trace_id(ObCurTraceId::get_trace_id());
183
session.init_use_rich_format();
184
session.get_raw_audit_record().request_memory_used_ = 0;
185
observer::ObProcessMallocCallback pmcb(0,
186
session.get_raw_audit_record().request_memory_used_);
187
lib::ObMallocCallbackGuard guard(pmcb);
188
session.set_proxy_version(get_proxy_version());
189
int64_t tenant_version = 0;
190
int64_t sys_version = 0;
191
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
192
int64_t packet_len = pkt.get_clen();
193
if (OB_UNLIKELY(!session.is_valid())) {
194
ret = OB_ERR_UNEXPECTED;
195
LOG_ERROR("invalid session", K_(sql), K(ret));
196
} else if (OB_FAIL(process_kill_client_session(session))) {
197
LOG_WARN("client session has been killed", K(ret));
198
} else if (OB_UNLIKELY(session.is_zombie())) {
199
ret = OB_ERR_SESSION_INTERRUPTED;
200
LOG_WARN("session has been killed", K(session.get_session_state()), K_(sql),
201
K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(), K(ret));
202
} else if (OB_FAIL(session.get_query_timeout(query_timeout))) {
203
LOG_WARN("fail to get query timeout", K_(sql), K(ret));
204
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
205
session.get_effective_tenant_id(), tenant_version))) {
206
LOG_WARN("fail get tenant broadcast version", K(ret));
207
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
208
OB_SYS_TENANT_ID, sys_version))) {
209
LOG_WARN("fail get tenant broadcast version", K(ret));
210
} else if (pkt.exist_trace_info()
211
&& OB_FAIL(session.update_sys_variable(SYS_VAR_OB_TRACE_INFO,
212
pkt.get_trace_info()))) {
213
LOG_WARN("fail to update trace info", K(ret));
214
} else if (FALSE_IT(session.set_txn_free_route(pkt.txn_free_route()))) {
215
} else if (OB_FAIL(process_extra_info(session, pkt, need_response_error))) {
216
LOG_WARN("fail get process extra info", K(ret));
217
} else if (FALSE_IT(session.post_sync_session_info())) {
218
} else if (OB_UNLIKELY(packet_len > session.get_max_packet_size())) {
219
ret = OB_ERR_NET_PACKET_TOO_LARGE;
220
need_disconnect = false;
221
LOG_WARN("packet too large than allowd for the session", K_(sql), K(ret));
222
} else if (OB_FAIL(sql::ObFLTUtils::init_flt_info(pkt.get_extra_info(), session,
223
conn->proxy_cap_flags_.is_full_link_trace_support()))) {
224
LOG_WARN("failed to init flt extra info", K(ret));
226
FLTSpanGuard(ps_prepare);
227
FLT_SET_TAG(log_trace_id, ObCurTraceId::get_trace_id_str(),
228
receive_ts, get_receive_timestamp(),
229
client_info, session.get_client_info(),
230
module_name, session.get_module_name(),
231
action_name, session.get_action_name(),
232
sess_id, session.get_sessid());
233
THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
234
retry_ctrl_.set_tenant_global_schema_version(tenant_version);
235
retry_ctrl_.set_sys_global_schema_version(sys_version);
236
session.partition_hit().reset();
237
session.set_pl_can_retry(true);
239
bool has_more = false;
240
bool force_sync_resp = false;
241
need_disconnect = false;
242
need_response_error = false;
243
if (OB_FAIL(multiple_query_check(session, sql_, force_sync_resp, need_response_error))) {
244
need_disconnect = OB_NOT_SUPPORTED == ret ? false : true;
245
LOG_WARN("check multiple query fail.", K(ret));
247
ret = process_prepare_stmt(ObMultiStmtItem(false, 0, sql_), session, has_more, force_sync_resp, async_resp_used);
251
//if (OB_EAGAIN == ret) {
252
//large query, do nothing
254
if (is_conn_valid()) {//The memory of sql sting is invalid if conn_valid_ has ben set false.
255
LOG_WARN("execute sql failed", "sql_id", ctx_.sql_id_, K_(sql), K(ret));
257
LOG_WARN("execute sql failed", K(ret));
262
if (!session.get_in_transaction()) {
263
// transcation ends, end trace
267
if (OB_FAIL(ret) && is_conn_valid()) {
268
if (need_response_error) {
269
send_error_packet(ret, NULL);
271
if (need_disconnect) {
273
LOG_WARN("disconnect connection when process query", K(ret));
277
session.set_last_trace_id(ObCurTraceId::get_trace_id());
278
THIS_WORKER.set_session(NULL);
279
revert_session(sess); //current ignore revert session ret
284
int ObMPStmtPrepare::process_prepare_stmt(const ObMultiStmtItem &multi_stmt_item,
285
ObSQLSessionInfo &session,
286
bool has_more_result,
287
bool force_sync_resp,
288
bool &async_resp_used)
290
int ret = OB_SUCCESS;
291
bool need_response_error = true;
292
int64_t tenant_version = 0;
293
int64_t sys_version = 0;
296
ObSessionStatEstGuard stat_est_guard(get_conn()->tenant_->id(), session.get_sessid());
297
if (OB_FAIL(init_process_var(ctx_, multi_stmt_item, session))) {
298
LOG_WARN("init process var faield.", K(ret), K(multi_stmt_item));
300
const bool enable_trace_log = lib::is_trace_log_enabled();
301
if (enable_trace_log) {
302
ObThreadLogLevelUtils::init(session.get_log_id_level_map());
304
if (OB_FAIL(check_and_refresh_schema(session.get_login_tenant_id(),
305
session.get_effective_tenant_id()))) {
306
LOG_WARN("failed to check_and_refresh_schema", K(ret));
307
} else if (OB_FAIL(session.update_timezone_info())) {
308
LOG_WARN("fail to update time zone info", K(ret));
310
ctx_.self_add_plan_ = false;
311
ctx_.is_prepare_protocol_ = true; //set to prepare protocol
312
ctx_.is_prepare_stage_ = true;
313
need_response_error = false;
315
share::schema::ObSchemaGetterGuard schema_guard;
316
retry_ctrl_.clear_state_before_each_retry(session.get_retry_info_for_update());
317
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
318
session.get_effective_tenant_id(), schema_guard))) {
319
LOG_WARN("get schema guard failed", K(ret));
320
} else if (OB_FAIL(schema_guard.get_schema_version(
321
session.get_effective_tenant_id(), tenant_version))) {
322
LOG_WARN("fail get schema version", K(ret));
323
} else if (OB_FAIL(schema_guard.get_schema_version(
324
OB_SYS_TENANT_ID, sys_version))) {
325
LOG_WARN("fail get sys schema version", K(ret));
327
ctx_.schema_guard_ = &schema_guard;
328
retry_ctrl_.set_tenant_local_schema_version(tenant_version);
329
retry_ctrl_.set_sys_local_schema_version(sys_version);
332
ret = do_process(session,
336
session.set_session_in_retry(retry_ctrl_.need_retry());
338
} while (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type());
339
if (OB_SUCC(ret) && retry_ctrl_.get_retry_times() > 0) {
340
LOG_TRACE("sql retry succeed", K(ret),
341
"retry_times", retry_ctrl_.get_retry_times(), K(multi_stmt_item));
344
if (enable_trace_log) {
345
ObThreadLogLevelUtils::clear();
349
//对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret
350
int tmp_ret = OB_SUCCESS;
352
tmp_ret = do_after_process(session, ctx_, async_resp_used);
353
tmp_ret = record_flt_trace(session);
354
// need_response_error这个变量保证仅在
355
// do { do_process } while(retry) 之前出错才会
356
// 走到send_error_packet逻辑
357
// 所以无需考虑当前为sync还是async模式
358
if (!OB_SUCC(ret) && need_response_error && is_conn_valid()) {
359
send_error_packet(ret, NULL);
365
int ObMPStmtPrepare::check_and_refresh_schema(uint64_t login_tenant_id,
366
uint64_t effective_tenant_id)
368
int ret = OB_SUCCESS;
369
int64_t local_version = 0;
370
int64_t last_version = 0;
372
if (login_tenant_id != effective_tenant_id) {
375
} else if (OB_ISNULL(gctx_.schema_service_)) {
376
ret = OB_INVALID_ARGUMENT;
377
LOG_WARN("null schema service", K(ret), K(gctx_));
379
if (OB_ISNULL(ctx_.session_info_)) {
380
ret = OB_INVALID_ARGUMENT;
381
LOG_WARN("invalid session info", K(ret), K(ctx_.session_info_));
382
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(effective_tenant_id, local_version))) {
383
LOG_WARN("fail to get tenant refreshed schema version", K(ret));
384
} else if (OB_FAIL(ctx_.session_info_->get_ob_last_schema_version(last_version))) {
385
LOG_WARN("failed to get_sys_variable", K(OB_SV_LAST_SCHEMA_VERSION));
386
} else if (local_version >= last_version) {
388
} else if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(effective_tenant_id, last_version))) {
389
LOG_WARN("failed to refresh schema", K(ret), K(effective_tenant_id), K(last_version));
395
int ObMPStmtPrepare::do_process(ObSQLSessionInfo &session,
396
const bool has_more_result,
397
const bool force_sync_resp,
398
bool &async_resp_used)
400
int ret = OB_SUCCESS;
401
ObAuditRecordData &audit_record = session.get_raw_audit_record();
402
audit_record.try_cnt_++;
403
const bool enable_perf_event = lib::is_diagnose_info_enabled();
404
const bool enable_sql_audit = GCONF.enable_sql_audit
405
&& session.get_local_ob_enable_sql_audit();
406
single_process_timestamp_ = ObTimeUtility::current_time();
407
bool is_diagnostics_stmt = false;
408
bool need_response_error = true;
409
const ObString &sql = ctx_.multi_stmt_item_.get_sql();
410
ObPsStmtId inner_stmt_id = OB_INVALID_ID;
413
* 注意req_timeinfo_guard一定要放在result前面
416
ObReqTimeGuard req_timeinfo_guard;
417
SMART_VAR(ObMySQLResultSet, result, session, THIS_WORKER.get_allocator()) {
418
ObWaitEventStat total_wait_desc;
419
ObDiagnoseSessionInfo *di = ObDiagnoseSessionInfo::get_local_diagnose_info();
421
ObMaxWaitGuard max_wait_guard(enable_perf_event ? &audit_record.exec_record_.max_wait_event_ : NULL, di);
422
ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di);
423
if (enable_perf_event) {
424
audit_record.exec_record_.record_start(di);
426
result.set_has_more_result(has_more_result);
427
ObTaskExecutorCtx *task_ctx = result.get_exec_context().get_task_executor_ctx();
428
int64_t execution_id = 0;
429
if (OB_ISNULL(task_ctx)) {
430
ret = OB_ERR_UNEXPECTED;
431
LOG_ERROR("task executor ctx can not be NULL", K(task_ctx), K(ret));
433
task_ctx->set_query_tenant_begin_schema_version(retry_ctrl_.get_tenant_global_schema_version());
434
task_ctx->set_query_sys_begin_schema_version(retry_ctrl_.get_sys_global_schema_version());
435
task_ctx->set_min_cluster_version(GET_MIN_CLUSTER_VERSION());
436
ctx_.retry_times_ = retry_ctrl_.get_retry_times();
437
if (OB_ISNULL(ctx_.schema_guard_)) {
438
ret = OB_INVALID_ARGUMENT;
439
LOG_WARN("newest schema is NULL", K(ret));
440
} else if (OB_FAIL(result.init())) {
441
LOG_WARN("result set init failed", K(ret));
442
} else if (OB_ISNULL(gctx_.sql_engine_)) {
443
ret = OB_ERR_UNEXPECTED;
444
LOG_ERROR("invalid sql engine", K(ret), K(gctx_));
445
} else if (FALSE_IT(execution_id = gctx_.sql_engine_->get_execution_id())) {
447
} else if (OB_FAIL(set_session_active(sql, session, ObTimeUtil::current_time(), obmysql::ObMySQLCmd::COM_STMT_PREPARE))) {
448
LOG_WARN("fail to set session active", K(ret));
449
} else if (OB_FAIL(gctx_.sql_engine_->stmt_prepare(sql, ctx_, result, false/*is_inner_sql*/))) {
450
exec_start_timestamp_ = ObTimeUtility::current_time();
451
int cli_ret = OB_SUCCESS;
452
retry_ctrl_.test_and_save_retry_state(gctx_, ctx_, result, ret, cli_ret);
453
LOG_WARN("run stmt_query failed, check if need retry",
454
K(ret), K(cli_ret), K(retry_ctrl_.need_retry()), K(sql));
456
} else if (common::OB_INVALID_ID != result.get_statement_id()
457
&& OB_FAIL(session.get_inner_ps_stmt_id(result.get_statement_id(), inner_stmt_id))) {
458
ret = OB_ERR_UNEXPECTED;
459
LOG_WARN("ps : get inner stmt id fail.", K(ret), K(result.get_statement_id()));
462
exec_start_timestamp_ = ObTimeUtility::current_time();
464
// 本分支内如果出错,全部会在response_result内部处理妥当
466
need_response_error = false;
467
is_diagnostics_stmt = ObStmt::is_diagnostic_stmt(result.get_literal_stmt_type());
468
ctx_.is_show_trace_stmt_ = ObStmt::is_show_trace_stmt(result.get_literal_stmt_type());
469
session.set_current_execution_id(execution_id);
472
if (OB_SUCC(ret) && OB_FAIL(response_result(result,
476
ObPhysicalPlanCtx *plan_ctx = result.get_exec_context().get_physical_plan_ctx();
477
if (OB_ISNULL(plan_ctx)) {
478
LOG_ERROR("execute query fail, and plan_ctx is NULL", K(ret));
480
LOG_WARN("execute query fail", K(ret), "timeout_timestamp",
481
plan_ctx->get_timeout_timestamp());
485
exec_end_timestamp_ = ObTimeUtility::current_time();
487
// some statistics must be recorded for plan stat, even though sql audit disabled
488
bool first_record = (1 == audit_record.try_cnt_);
489
ObExecStatUtils::record_exec_timestamp(*this, first_record, audit_record.exec_timestamp_);
490
audit_record.exec_timestamp_.update_stage_time();
492
if (enable_perf_event) {
493
audit_record.exec_record_.record_end(di);
494
audit_record.exec_record_.wait_time_end_ = total_wait_desc.time_waited_;
495
audit_record.exec_record_.wait_count_end_ = total_wait_desc.total_waits_;
496
audit_record.update_event_stage_state();
497
if (!THIS_THWORKER.need_retry()) {
498
const int64_t time_cost = exec_end_timestamp_ - get_receive_timestamp();
499
EVENT_INC(SQL_PS_PREPARE_COUNT);
500
EVENT_ADD(SQL_PS_PREPARE_TIME, time_cost);
509
// 2. 没有给客户端返回结果,本次执行没有副作用
510
// 3. need_retry(result, ret):schema 或 location cache 失效
512
if (OB_UNLIKELY(retry_ctrl_.need_retry())) {
513
LOG_WARN("try to execute again",
515
N_TYPE, result.get_stmt_type(),
516
"retry_type", retry_ctrl_.get_retry_type(),
517
"timeout_remain", THIS_WORKER.get_timeout_remain());
519
// 首个plan执行完成后立即freeze partition hit
520
// partition_hit一旦freeze后,后继的try_set_bool操作都不生效
521
if (OB_LIKELY(NULL != result.get_physical_plan())) {
522
session.partition_hit().freeze();
525
// store the warning message from the most recent statement in the current session
526
if (OB_SUCC(ret) && is_diagnostics_stmt) {
527
// if diagnostic stmt execute successfully, it dosen't clear the warning message
528
session.update_show_warnings_buf();
530
session.set_show_warnings_buf(ret); // TODO: 挪个地方性能会更好,减少部分wb拷贝
533
if (!OB_SUCC(ret) && !async_resp_used && need_response_error && is_conn_valid() && !THIS_WORKER.need_retry()) {
534
LOG_WARN("query failed", K(ret), K(retry_ctrl_.need_retry()), K_(sql));
535
// 当need_retry=false时,可能给客户端回过包了,可能还没有回过任何包。
536
// 不过,可以确定:这个请求出错了,还没处理完。如果不是已经交给异步EndTrans收尾,
537
// 则需要在下面回复一个error_packet作为收尾。否则后面没人帮忙发错误包给客户端了,
539
bool is_partition_hit = session.get_err_final_partition_hit(ret);
540
int err = send_error_packet(ret, NULL, is_partition_hit);
541
if (OB_SUCCESS != err) { // 发送error包
542
LOG_WARN("send error packet failed", K(ret), K(err));
546
if (enable_sql_audit) {
547
audit_record.status_ = ret;
548
audit_record.client_addr_ = session.get_peer_addr();
549
audit_record.user_client_addr_ = session.get_user_client_addr();
550
audit_record.user_group_ = THIS_WORKER.get_group_id();
551
audit_record.ps_stmt_id_ = result.get_statement_id();
552
audit_record.ps_inner_stmt_id_ = inner_stmt_id;
553
audit_record.is_perf_event_closed_ = !lib::is_diagnose_info_enabled();
555
bool need_retry = (THIS_THWORKER.need_retry()
556
|| RETRY_TYPE_NONE != retry_ctrl_.get_retry_type());
557
ObSQLUtils::handle_audit_record(need_retry, EXECUTE_PS_PREPARE, session, ctx_.is_sensitive_);
560
// reset thread waring buffer in sync mode
561
if (!async_resp_used) {
562
clear_wb_content(session);
567
// return false only if send packet fail.
568
int ObMPStmtPrepare::response_result(
569
ObMySQLResultSet &result,
570
ObSQLSessionInfo &session,
571
bool force_sync_resp,
572
bool &async_resp_used)
574
int ret = OB_SUCCESS;
575
UNUSED(force_sync_resp);
576
UNUSED(async_resp_used);
577
// const ObMySQLRawPacket &packet = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
578
if (OB_FAIL(send_prepare_packet(result))) {
579
LOG_WARN("send prepare packet failed", K(ret));
580
} else if (OB_FAIL(send_param_packet(session, result))) {
581
LOG_WARN("send param packet failed", K(ret));
582
} else if (OB_FAIL(send_column_packet(session, result))) {
583
LOG_WARN("send column packet failed", K(ret));
588
int ObMPStmtPrepare::send_prepare_packet(const ObMySQLResultSet &result)
590
int ret = OB_SUCCESS;
591
OMPKPrepare prepare_packet;
592
const ParamsFieldIArray *params = result.get_param_fields();
593
const ColumnsFieldIArray *columns = result.get_field_columns();
594
if (OB_ISNULL(params) || OB_ISNULL(columns)) {
595
ret = OB_INVALID_ARGUMENT;
596
LOG_WARN("invalid argument", K(ret), K(columns), K(params));
598
prepare_packet.set_statement_id(static_cast<uint32_t>(result.get_statement_id()));
599
prepare_packet.set_column_num(static_cast<uint16_t>(result.get_field_cnt()));
600
prepare_packet.set_warning_count(static_cast<uint16_t>(result.get_warning_count()));
601
if (OB_ISNULL(result.get_param_fields())) {
602
ret = OB_INVALID_ARGUMENT;
603
LOG_WARN("invalid argument", K(ret), K(result.get_param_fields()));
605
prepare_packet.set_param_num(
606
static_cast<uint16_t>(result.get_param_fields()->count()));
610
if (OB_SUCC(ret) && OB_FAIL(response_packet(prepare_packet, const_cast<ObSQLSessionInfo *>(&result.get_session())))) {
611
LOG_WARN("response packet failed", K(ret));
614
if (OB_SUCC(ret) && need_send_extra_ok_packet() && columns->count() == 0 && params->count() == 0) {
616
if (OB_FAIL(send_ok_packet(*(const_cast<ObSQLSessionInfo *>(&result.get_session())), ok_param))) {
617
LOG_WARN("fail to send ok packet", K(ret));
623
int ObMPStmtPrepare::send_column_packet(const ObSQLSessionInfo &session,
624
ObMySQLResultSet &result)
626
int ret = OB_SUCCESS;
627
const ColumnsFieldIArray *columns = result.get_field_columns();
628
if (OB_ISNULL(columns)) {
629
ret = OB_INVALID_ARGUMENT;
630
LOG_WARN("invalid argument", K(ret), K(columns));
631
} else if (columns->count() > 0) {
633
ret = result.next_field(field);
634
while (OB_SUCC(ret)) {
636
if (OB_FAIL(response_packet(fp, const_cast<ObSQLSessionInfo *>(&session)))) {
637
LOG_WARN("response packet fail", K(ret));
639
LOG_DEBUG("response field succ", K(field));
640
ret = result.next_field(field);
643
if (OB_ITER_END == ret) {
647
if (OB_FAIL(packet_sender_.update_last_pkt_pos())) {
648
LOG_WARN("failed to update last packet pos", K(ret));
650
if (need_send_extra_ok_packet()) {
652
if (OB_FAIL(send_eof_packet(session, result, &ok_param))) {
653
LOG_WARN("send eof field failed", K(ret));
656
if (OB_FAIL(send_eof_packet(session, result))) {
657
LOG_WARN("send eof field failed", K(ret));
666
int ObMPStmtPrepare::send_param_packet(const ObSQLSessionInfo &session,
667
ObMySQLResultSet &result)
669
int ret = OB_SUCCESS;
670
const ParamsFieldIArray *params = result.get_param_fields();
671
const ColumnsFieldIArray *columns = result.get_field_columns();
672
if (OB_ISNULL(params) || OB_ISNULL(columns)) {
673
ret = OB_INVALID_ARGUMENT;
674
LOG_WARN("invalid argument", K(ret), K(columns), K(params));
675
} else if (params->count() > 0) {
677
ret = result.next_param(field);
678
while (OB_SUCC(ret)) {
680
if (OB_FAIL(response_packet(fp, const_cast<ObSQLSessionInfo *>(&session)))) {
681
LOG_DEBUG("response packet fail", K(ret));
683
// LOG_INFO("response field succ", K(field));
684
ret = result.next_param(field);
687
if (OB_ITER_END == ret) {
691
if (need_send_extra_ok_packet() && columns->count() == 0) {
693
if (OB_FAIL(send_eof_packet(session, result, &ok_param))) {
694
LOG_WARN("send eof field failed", K(ret));
697
if (OB_FAIL(send_eof_packet(session, result))) {
698
LOG_WARN("send eof field failed", K(ret));
706
} //end of namespace observer
707
} //end of namespace oceanbase