2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
15
#include "observer/mysql/ob_query_retry_ctrl.h"
16
#include "sql/ob_sql_context.h"
17
#include "sql/resolver/ob_stmt.h"
19
#include "storage/tx/ob_trans_define.h"
20
#include "observer/mysql/ob_mysql_result_set.h"
21
#include "observer/ob_server_struct.h"
22
#include "observer/mysql/obmp_query.h"
26
using namespace common;
28
using namespace share::schema;
29
using namespace oceanbase::transaction;
35
common::hash::ObHashMap<int, ObQueryRetryCtrl::RetryFuncs, common::hash::NoPthreadDefendMode> ObQueryRetryCtrl::map_;
37
void ObRetryPolicy::try_packet_retry(ObRetryParam &v) const
39
const ObMultiStmtItem &multi_stmt_item = v.ctx_.multi_stmt_item_;
40
if (v.force_local_retry_) {
41
v.retry_type_ = RETRY_TYPE_LOCAL;
42
} else if (multi_stmt_item.is_batched_multi_stmt()) {
43
// in batch optimization, can't do packet retry
44
v.retry_type_ = RETRY_TYPE_LOCAL;
45
} else if (multi_stmt_item.is_part_of_multi_stmt() && multi_stmt_item.get_seq_num() > 0) {
46
// muti stmt,并且不是第一句,不能扔回队列重试,因为前面的无法回滚
47
v.retry_type_ = RETRY_TYPE_LOCAL;
48
} else if (!THIS_WORKER.can_retry()) {
49
// false == THIS_WORKER.can_retry() means throw back to queue disabled by SOME logic
50
v.retry_type_ = RETRY_TYPE_LOCAL;
52
v.retry_type_ = RETRY_TYPE_PACKET;
53
THIS_WORKER.set_need_retry();
57
void ObRetryPolicy::sleep_before_local_retry(ObRetryParam &v,
58
RetrySleepType retry_sleep_type,
59
int64_t base_sleep_us,
60
int64_t timeout_timestamp) const
64
switch(retry_sleep_type) {
65
case RETRY_SLEEP_TYPE_LINEAR: {
66
sleep_us = base_sleep_us * linear_timeout_factor(v.stmt_retry_times_);
69
case RETRY_SLEEP_TYPE_INDEX: {
70
sleep_us = base_sleep_us * index_timeout_factor(v.stmt_retry_times_);
73
case RETRY_SLEEP_TYPE_NONE: {
78
ret = OB_ERR_UNEXPECTED;
79
LOG_ERROR("unexpected retry sleep type", K(ret), K(base_sleep_us),
80
K(retry_sleep_type), K(v.stmt_retry_times_), K(timeout_timestamp));
84
if (RETRY_SLEEP_TYPE_NONE != retry_sleep_type && OB_SUCC(ret)) {
85
int64_t remain_us = timeout_timestamp - ObTimeUtility::current_time();
86
if (sleep_us > remain_us) {
90
LOG_INFO("will sleep", K(sleep_us), K(remain_us), K(base_sleep_us),
91
K(retry_sleep_type), K(v.stmt_retry_times_), K(v.err_), K(timeout_timestamp));
92
THIS_WORKER.sched_wait();
93
ob_usleep(static_cast<uint32_t>(sleep_us));
94
THIS_WORKER.sched_run();
95
if (THIS_WORKER.is_timeout()) {
96
v.client_ret_ = OB_TIMEOUT;
97
v.retry_type_ = RETRY_TYPE_NONE;
98
v.no_more_test_ = true;
99
LOG_WARN("this worker is timeout after retry sleep. no more retry", K(v));
102
LOG_INFO("already timeout, do not need sleep", K(sleep_us), K(remain_us), K(base_sleep_us),
103
K(retry_sleep_type), K(v.stmt_retry_times_), K(timeout_timestamp));
108
template<bool is_async>
109
class ObRefreshLocationCachePolicy : public ObRetryPolicy
112
ObRefreshLocationCachePolicy() = default;
113
~ObRefreshLocationCachePolicy() = default;
114
virtual void test(ObRetryParam &v) const override
116
v.result_.force_refresh_location_cache(is_async, v.err_);
120
typedef ObRefreshLocationCachePolicy<true> ObRefreshLocationCacheNonblockPolicy;
121
typedef ObRefreshLocationCachePolicy<false> ObRefreshLocationCacheBlockPolicy;
124
template<RetrySleepType SleepType, int64_t WaitUs>
125
class ObCommonRetryPolicy : public ObRetryPolicy
128
ObCommonRetryPolicy() = default;
129
~ObCommonRetryPolicy() = default;
130
virtual void test(ObRetryParam &v) const override
133
if (RETRY_TYPE_LOCAL == v.retry_type_) {
134
sleep_before_local_retry(v,
137
THIS_WORKER.get_timeout_ts());
142
typedef ObCommonRetryPolicy<RETRY_SLEEP_TYPE_NONE, 0> ObCommonRetryNoWaitPolicy;
143
typedef ObCommonRetryPolicy<RETRY_SLEEP_TYPE_LINEAR, ObRetryPolicy::WAIT_RETRY_LONG_US> ObCommonRetryLinearLongWaitPolicy;
144
typedef ObCommonRetryPolicy<RETRY_SLEEP_TYPE_LINEAR, ObRetryPolicy::WAIT_RETRY_SHORT_US> ObCommonRetryLinearShortWaitPolicy;
145
typedef ObCommonRetryPolicy<RETRY_SLEEP_TYPE_INDEX, ObRetryPolicy::WAIT_RETRY_LONG_US> ObCommonRetryIndexLongWaitPolicy;
146
typedef ObCommonRetryPolicy<RETRY_SLEEP_TYPE_INDEX, ObRetryPolicy::WAIT_RETRY_SHORT_US> ObCommonRetryIndexShortWaitPolicy;
149
class ObFastFailRetryPolicy : public ObRetryPolicy
152
ObFastFailRetryPolicy() = default;
153
~ObFastFailRetryPolicy() = default;
154
virtual void test(ObRetryParam &v) const override
156
if (v.session_.get_retry_info_for_update()
157
.should_fast_fail(v.session_.get_effective_tenant_id())) {
158
v.client_ret_ = v.err_;
159
v.retry_type_ = RETRY_TYPE_NONE;
160
v.no_more_test_ = true;
161
LOG_WARN_RET(v.err_, "server down error, fast fail", K(v));
166
class ObForceLocalRetryPolicy : public ObRetryPolicy
169
ObForceLocalRetryPolicy() = default;
170
~ObForceLocalRetryPolicy() = default;
171
virtual void test(ObRetryParam &v) const override
173
v.retry_type_ = RETRY_TYPE_LOCAL;
177
class ObBatchExecOptRetryPolicy : public ObRetryPolicy
180
ObBatchExecOptRetryPolicy() = default;
181
~ObBatchExecOptRetryPolicy() = default;
182
virtual void test(ObRetryParam &v) const override
184
if (v.ctx_.is_do_insert_batch_opt()) {
185
v.retry_type_ = RETRY_TYPE_LOCAL;
187
v.retry_type_ = RETRY_TYPE_NONE;
192
class ObSwitchConsumerGroupRetryPolicy : public ObRetryPolicy
195
ObSwitchConsumerGroupRetryPolicy() = default;
196
~ObSwitchConsumerGroupRetryPolicy() = default;
197
virtual void test(ObRetryParam &v) const override
200
if (RETRY_TYPE_LOCAL == v.retry_type_) {
201
LOG_WARN_RET(v.err_, "set retry packet failed, retry at local",
202
K(v.ctx_.multi_stmt_item_.is_part_of_multi_stmt()),
203
K(v.ctx_.multi_stmt_item_.get_seq_num()));
204
v.session_.set_group_id_not_expected(true);
205
v.result_.get_exec_context().set_need_disconnect(false);
210
class ObBeforeRetryCheckPolicy : public ObRetryPolicy
213
ObBeforeRetryCheckPolicy() = default;
214
~ObBeforeRetryCheckPolicy() = default;
215
virtual void test(ObRetryParam &v) const override
217
int ret = OB_SUCCESS;
218
if (v.session_.is_terminate(ret)) {
219
v.no_more_test_ = true;
220
v.retry_type_ = RETRY_TYPE_NONE;
221
// In the kill client session scenario, the server session will be marked
222
// with the SESSION_KILLED mark. In the retry scenario, there will be an error
223
// code covering 5066, so the judgment logic is added here.
224
if (ret == OB_ERR_SESSION_INTERRUPTED && v.err_ == OB_ERR_KILL_CLIENT_SESSION) {
225
v.client_ret_ = v.err_;
227
v.client_ret_ = ret; // session terminated
229
LOG_WARN("execution was terminated", K(ret), K(v.client_ret_), K(v.err_));
230
} else if (THIS_WORKER.is_timeout()) {
231
v.no_more_test_ = true;
232
v.retry_type_ = RETRY_TYPE_NONE;
233
if (OB_ERR_INSUFFICIENT_PX_WORKER == v.err_ ||
234
OB_ERR_EXCLUSIVE_LOCK_CONFLICT == v.err_ ||
235
OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT == v.err_) {
236
v.client_ret_ = v.err_;
237
} else if (is_try_lock_row_err(v.session_.get_retry_info().get_last_query_retry_err())) {
238
// timeout caused by locking, should return OB_ERR_EXCLUSIVE_LOCK_CONFLICT
239
v.client_ret_ = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
241
v.client_ret_ = OB_TIMEOUT;
243
LOG_WARN("this worker is timeout, do not need retry", K(v),
244
K(THIS_WORKER.get_timeout_ts()), K(v.result_.get_stmt_type()),
245
K(v.session_.get_retry_info().get_last_query_retry_err()));
246
if (v.session_.get_retry_info().is_rpc_timeout() || is_transaction_rpc_timeout_err(v.err_)) {
247
// rpc超时了,可能是location cache不对,异步刷新location cache
248
v.result_.force_refresh_location_cache(true, v.err_); // 非阻塞
249
LOG_WARN("sql rpc timeout, or trans rpc timeout, maybe location is changed, "
250
"refresh location cache non blockly", K(v),
251
K(v.session_.get_retry_info().is_rpc_timeout()));
257
class ObStmtTypeRetryPolicy : public ObRetryPolicy
260
ObStmtTypeRetryPolicy() = default;
261
~ObStmtTypeRetryPolicy() = default;
263
bool is_direct_load(ObRetryParam &v) const
265
ObExecContext &exec_ctx = v.result_.get_exec_context();
266
return exec_ctx.get_table_direct_insert_ctx().get_is_direct();
269
bool is_load_local(ObRetryParam &v) const
272
const ObICmd *cmd = v.result_.get_cmd();
273
if (OB_NOT_NULL(cmd) && cmd->get_cmd_type() == stmt::T_LOAD_DATA) {
274
const ObLoadDataStmt *load_data_stmt = static_cast<const ObLoadDataStmt *>(cmd);
275
bret = load_data_stmt->get_load_arguments().load_file_storage_ == ObLoadFileLocation::CLIENT_DISK;
280
virtual void test(ObRetryParam &v) const override
283
if (v.result_.is_pl_stmt(v.result_.get_stmt_type()) && !v.session_.get_pl_can_retry()) {
284
LOG_WARN_RET(err, "current pl can not retry, commit may have occurred",
285
K(v), K(v.result_.get_stmt_type()));
287
v.retry_type_ = RETRY_TYPE_NONE;
288
v.no_more_test_ = true;
289
} else if (ObStmt::force_skip_retry_stmt(v.result_.get_stmt_type())) {
291
v.retry_type_ = RETRY_TYPE_NONE;
292
v.no_more_test_ = true;
293
} else if (ObStmt::is_ddl_stmt(v.result_.get_stmt_type(), v.result_.has_global_variable())) {
294
if (is_ddl_stmt_packet_retry_err(err)) {
298
v.retry_type_ = RETRY_TYPE_NONE;
300
v.no_more_test_ = true;
301
} else if (is_load_local(v)) {
303
v.retry_type_ = RETRY_TYPE_NONE;
304
v.no_more_test_ = true;
305
} else if (is_direct_load(v)) {
306
if (is_direct_load_retry_err(err)) {
310
v.retry_type_ = RETRY_TYPE_NONE;
312
v.no_more_test_ = true;
318
class ObCheckSchemaUpdatePolicy : public ObRetryPolicy
321
ObCheckSchemaUpdatePolicy() = default;
322
~ObCheckSchemaUpdatePolicy() = default;
323
virtual void test(ObRetryParam &v) const override
325
int ret = OB_SUCCESS;
327
if (NULL == GCTX.schema_service_) {
328
v.client_ret_ = OB_INVALID_ARGUMENT;
329
v.retry_type_ = RETRY_TYPE_NONE;
330
v.no_more_test_ = true;
331
LOG_WARN("invalid argument", K(v));
333
ObSchemaGetterGuard schema_guard;
334
int64_t local_tenant_version_latest = 0;
335
int64_t local_sys_version_latest = 0;
336
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
337
v.session_.get_effective_tenant_id(), schema_guard))) {
338
// 不需要重试了,同时让它返回get_schema_guard出错的错误码,因为是由它引起不重试的
339
LOG_WARN("get schema guard failed", K(v), K(ret));
341
v.retry_type_ = RETRY_TYPE_NONE;
342
v.no_more_test_ = true;
343
} else if (OB_FAIL(schema_guard.get_schema_version(
344
v.session_.get_effective_tenant_id(), local_tenant_version_latest))) {
345
LOG_WARN("fail get tenant schema version", K(v), K(ret));
347
v.retry_type_ = RETRY_TYPE_NONE;
348
v.no_more_test_ = true;
349
} else if (OB_FAIL(schema_guard.get_schema_version(
350
OB_SYS_TENANT_ID, local_sys_version_latest))) {
351
LOG_WARN("fail get sys schema version", K(v), K(ret));
353
v.retry_type_ = RETRY_TYPE_NONE;
354
v.no_more_test_ = true;
356
bool local_schema_not_full = GCTX.schema_service_->is_schema_error_need_retry(
357
&schema_guard, v.session_.get_effective_tenant_id());
358
int64_t local_tenant_version_start = v.curr_query_tenant_local_schema_version_;
359
int64_t global_tenant_version_start = v.curr_query_tenant_global_schema_version_;
360
int64_t local_sys_version_start = v.curr_query_sys_local_schema_version_;
361
int64_t global_sys_version_start = v.curr_query_sys_global_schema_version_;
362
// (c1) 需要考虑远端机器的Schema比本地落后,远端机器抛出Schema错误的情景
363
// 当远端抛出Schema错误的时候,强行将所有Schema错误转化成OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH
364
// 权限不足也会触发该重试规则,因为远端schema刷新不及时可能误报权限不足,此时是需要重试的
365
// (c4) 弱一致性读场景,会校验schema版本是否大于等于数据的schema版本,
366
// 如果schema版本旧,则要求重试;
367
// 目的是保证:始终采用新schema解析老数据
368
// (c5) 梳理了OB_SCHEMA_EAGAIN使用的地方,主路径上出现了该错误码的地方需要触发SQL重试
369
// (c2) 表存在或不存在/数据库存在或不存在/用户存在或不存在,并且local和global版本不等时重试
370
// (c3) 其它任何sql开始执行时local version比当前local version小导致schema错误的情况
371
// (c6) For local server, related tenant's schema maybe not refreshed yet when observer restarts or create tenant.
372
// (c7) For remote server, related tenant's schema maybe not refreshed yet when observer restarts or create tenant.
373
if ((OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH == v.err_) || // (c1)
374
(OB_SCHEMA_NOT_UPTODATE == v.err_) || // (c4)
375
(OB_SCHEMA_EAGAIN == v.err_) || // (c5)
376
(global_tenant_version_start > local_tenant_version_start) || // (c2)
377
(global_sys_version_start > local_sys_version_start) || // (c2)
378
(local_tenant_version_latest > local_tenant_version_start) || // (c3)
379
(local_sys_version_latest > local_sys_version_start) || // (c3)
380
(local_schema_not_full) || // (c6)
381
(OB_ERR_REMOTE_SCHEMA_NOT_FULL == v.err_) // (c7)
383
if (v.stmt_retry_times_ < ObQueryRetryCtrl::MAX_SCHEMA_ERROR_LOCAL_RETRY_TIMES) {
384
v.retry_type_ = RETRY_TYPE_LOCAL;
388
if (RETRY_TYPE_LOCAL == v.retry_type_) {
390
sleep_before_local_retry(v,
391
RETRY_SLEEP_TYPE_LINEAR,
393
THIS_WORKER.get_timeout_ts());
396
// 这里的client_ret不好决定,让它依然返回err
397
v.client_ret_ = v.err_;
398
v.retry_type_ = RETRY_TYPE_NONE;
399
v.no_more_test_ = true;
406
// if tenant status is abnormal, do not retry sql
407
class ObCheckTenantStatusPolicy : public ObRetryPolicy
410
ObCheckTenantStatusPolicy() = default;
411
~ObCheckTenantStatusPolicy() = default;
412
virtual void test(ObRetryParam &v) const override
414
int ret = OB_SUCCESS;
415
if (OB_ISNULL(GCTX.schema_service_)) {
416
ret = OB_INVALID_ARGUMENT;
417
LOG_TRACE("invalid schema_service", KR(ret), K(v));
419
ObSchemaGetterGuard schema_guard;
420
const ObTenantSchema *tenant_schema = NULL;
421
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
424
LOG_TRACE("get sys tenant schema guard failed", KR(ret), K(v));
425
} else if (OB_FAIL(schema_guard.get_tenant_info(
426
v.session_.get_effective_tenant_id(),
428
LOG_TRACE("fail get tenant info", KR(ret),
429
"tenant_id", v.session_.get_effective_tenant_id(), K(v));
430
} else if (OB_ISNULL(tenant_schema) || !tenant_schema->is_normal()) {
431
// use LOG_TRACE to prevent too much warning during creating tenant
432
LOG_TRACE("tenant status is abnormal, do not retry",
433
"tenant_id", v.session_.get_effective_tenant_id(), KPC(tenant_schema), K(v));
434
// tenant status is abnormal, do not retry and return v.err_
435
v.client_ret_ = v.err_;
436
v.retry_type_ = RETRY_TYPE_NONE;
437
v.no_more_test_ = true;
439
// tenant status is normal, check passed
445
class ObDMLPeerServerStateUncertainPolicy : public ObRetryPolicy
448
ObDMLPeerServerStateUncertainPolicy() = default;
449
~ObDMLPeerServerStateUncertainPolicy() = default;
450
virtual void test(ObRetryParam &v) const override
452
if (OB_ISNULL(v.result_.get_physical_plan())) {
453
// issue#43741246, plan not generated, won't be a remote trans
454
// safe to continue with other retry test
455
} else if (ObStmt::is_dml_write_stmt(v.result_.get_stmt_type())) {
458
bool autocommit = v.session_.get_local_autocommit();
459
ObPhyPlanType plan_type = v.result_.get_physical_plan()->get_plan_type();
460
bool in_transaction = v.session_.is_in_transaction();
461
if (ObSqlTransUtil::is_remote_trans(autocommit, in_transaction, plan_type)) {
462
// 当前observer内部无法进行重试
463
// err是OB_RPC_CONNECT_ERROR
464
v.client_ret_ = v.err_;
465
v.retry_type_ = RETRY_TYPE_NONE;
466
v.no_more_test_ = true;
467
LOG_WARN_RET(v.err_, "server down error, the write dml is remote, don't retry",
468
K(autocommit), K(plan_type), K(in_transaction), K(v));
475
class ObLockRowConflictRetryPolicy : public ObRetryPolicy
478
ObLockRowConflictRetryPolicy() = default;
479
~ObLockRowConflictRetryPolicy() = default;
480
virtual void test(ObRetryParam &v) const override
482
// sql which in pl will local retry first. see ObInnerSQLConnection::process_retry.
483
// sql which not in pl use the same strategy to avoid never getting the lock.
484
if (v.force_local_retry_ || (v.local_retry_times_ <= 1 && !v.result_.is_pl_stmt(v.result_.get_stmt_type()))) {
485
v.retry_type_ = RETRY_TYPE_LOCAL;
487
const ObMultiStmtItem &multi_stmr_item = v.ctx_.multi_stmt_item_;
494
class ObTrxSetViolationRetryPolicy : public ObRetryPolicy
497
ObTrxSetViolationRetryPolicy() = default;
498
~ObTrxSetViolationRetryPolicy() = default;
499
virtual void test(ObRetryParam &v) const override
501
if (ObQueryRetryCtrl::is_isolation_RR_or_SE(v.session_.get_tx_isolation())) {
502
v.client_ret_ = OB_TRANS_CANNOT_SERIALIZE;
503
v.retry_type_ = RETRY_TYPE_NONE;
504
v.no_more_test_ = true;
505
LOG_WARN_RET(v.client_ret_, "transaction cannot serialize", K(v));
510
class ObTrxCannotSerializeRetryPolicy : public ObRetryPolicy
513
ObTrxCannotSerializeRetryPolicy() = default;
514
~ObTrxCannotSerializeRetryPolicy() = default;
515
virtual void test(ObRetryParam &v) const override
517
v.client_ret_ = OB_TRANS_CANNOT_SERIALIZE;
518
v.retry_type_ = RETRY_TYPE_NONE;
519
v.no_more_test_ = true;
520
LOG_WARN_RET(v.client_ret_, "transaction cannot serialize", K(v));
524
class ObPxThreadNotEnoughRetryPolicy : public ObRetryPolicy
527
ObPxThreadNotEnoughRetryPolicy() = default;
528
~ObPxThreadNotEnoughRetryPolicy() = default;
529
virtual void test(ObRetryParam &v) const override
531
if (v.force_local_retry_) {
532
v.retry_type_ = RETRY_TYPE_LOCAL;
535
if (RETRY_TYPE_LOCAL == v.retry_type_) {
536
v.client_ret_ = v.err_;
537
v.retry_type_ = RETRY_TYPE_NONE;
538
v.no_more_test_ = true;
539
LOG_WARN_RET(v.client_ret_, "can not retry local. need to terminate to prevent thread resouce deadlock", K(v));
545
////////// special inner retry policy for inner connection ////////////
547
class ObInnerCommonCheckSchemaPolicy : public ObRetryPolicy
550
ObInnerCommonCheckSchemaPolicy() = default;
551
~ObInnerCommonCheckSchemaPolicy() = default;
552
virtual void test(ObRetryParam &v) const override
554
int ret = OB_SUCCESS;
555
bool local_schema_not_full = GSCHEMASERVICE.is_schema_error_need_retry(
556
NULL, v.session_.get_effective_tenant_id());
557
if (local_schema_not_full || OB_ERR_REMOTE_SCHEMA_NOT_FULL == v.err_) {
558
v.no_more_test_ = true;
559
v.retry_type_ = RETRY_TYPE_LOCAL;
560
sleep_before_local_retry(v,
561
RETRY_SLEEP_TYPE_LINEAR,
563
THIS_WORKER.get_timeout_ts());
568
class ObInnerCheckSchemaPolicy : public ObRetryPolicy
571
ObInnerCheckSchemaPolicy() = default;
572
~ObInnerCheckSchemaPolicy() = default;
573
virtual void test(ObRetryParam &v) const override
575
int ret = OB_SUCCESS;
576
// As DDL is not reentranable in OceanBase, we have to retry those SQL issued by DDL in place
577
// is_user_session=true: create table t1 as select ...
578
// is_ddl=true: create index idx1 on ...
579
if (v.session_.is_user_session() || v.session_.get_ddl_info().is_ddl()) {
580
v.no_more_test_ = true;
581
v.retry_type_ = RETRY_TYPE_LOCAL;
583
v.no_more_test_ = true;
584
v.retry_type_ = RETRY_TYPE_NONE;
585
v.client_ret_ = v.err_;
590
class ObInnerLockRowConflictRetryPolicy : public ObRetryPolicy
593
ObInnerLockRowConflictRetryPolicy() = default;
594
~ObInnerLockRowConflictRetryPolicy() = default;
595
virtual void test(ObRetryParam &v) const override
597
// sql which in pl will local retry first. see ObInnerSQLConnection::process_retry.
598
// sql which not in pl use the same strategy to avoid never getting the lock.
600
if (v.local_retry_times_ <= 1 ||
601
!v.session_.get_pl_can_retry() ||
602
ObSQLUtils::is_in_autonomous_block(v.session_.get_cur_exec_ctx())) {
603
v.no_more_test_ = true;
604
v.retry_type_ = RETRY_TYPE_LOCAL;
606
v.no_more_test_ = true;
607
v.retry_type_ = RETRY_TYPE_NONE;
608
v.client_ret_ = v.err_;
612
v.no_more_test_ = true;
613
v.retry_type_ = RETRY_TYPE_LOCAL;
618
class ObInnerBeforeRetryCheckPolicy: public ObRetryPolicy
621
ObInnerBeforeRetryCheckPolicy() = default;
622
~ObInnerBeforeRetryCheckPolicy() = default;
623
virtual void test(ObRetryParam &v) const override
625
int ret = OB_SUCCESS;
626
// nested transaction already supported In 32x and can only rollback nested sql.
627
// for forigen key, we keep old logic and do not retry. for pl will retry current nested sql.
628
if (is_nested_conn(v) && !is_static_engine_retry(v.err_) && !v.is_from_pl_) {
629
// right now, top session will retry, bug we can do something here like refresh XXX cache.
630
// in future, nested session can retry if nested transaction is supported.
631
v.no_more_test_ = true;
632
v.retry_type_ = RETRY_TYPE_NONE;
633
v.client_ret_ = v.err_;
634
} else if (v.session_.is_terminate(ret)) {
635
v.no_more_test_ = true;
636
v.retry_type_ = RETRY_TYPE_NONE;
637
// In the kill client session scenario, the server session will be marked
638
// with the SESSION_KILLED mark. In the retry scenario, there will be an error
639
// code covering 5066, so the judgment logic is added here.
640
if (ret == OB_ERR_SESSION_INTERRUPTED && v.err_ == OB_ERR_KILL_CLIENT_SESSION) {
641
v.client_ret_ = v.err_;
643
v.client_ret_ = ret; // session terminated
645
LOG_WARN("execution was terminated", K(ret), K(v.client_ret_), K(v.err_));
646
} else if (THIS_WORKER.is_timeout()) {
647
v.no_more_test_ = true;
648
v.retry_type_ = RETRY_TYPE_NONE;
649
v.client_ret_ = OB_TIMEOUT;
653
//is_nested_conn means this connection is triggered by a foreign key or PL object
654
//because session and ObExecContext are linked in SQL engine,
655
//in the inner sql connection stage,
656
//the ObExecContext belonging to the current inner connection has not been linked to session.
657
//for nested SQL, the ObExecContext on the current session belongs to the parent statement
658
//so session.cur_exec_ctx_ is the parent ctx of the nested SQL
659
bool is_nested_conn(ObRetryParam &v) const
661
ObExecContext *parent_ctx = v.session_.get_cur_exec_ctx();
662
bool is_pl_nested = (parent_ctx != nullptr
663
&& ObStmt::is_dml_stmt(parent_ctx->get_sql_ctx()->stmt_type_)
664
&& parent_ctx->get_pl_stack_ctx() != nullptr
665
&& !parent_ctx->get_pl_stack_ctx()->in_autonomous());
666
bool is_fk_nested = (parent_ctx != nullptr && parent_ctx->get_das_ctx().is_fk_cascading_);
667
bool is_online_stat_gathering_nested = (parent_ctx != nullptr && parent_ctx->is_online_stats_gathering());
668
return is_pl_nested || is_fk_nested || is_online_stat_gathering_nested;
672
class ObAutoincCacheNotEqualRetryPolicy: public ObRetryPolicy
675
ObAutoincCacheNotEqualRetryPolicy() = default;
676
~ObAutoincCacheNotEqualRetryPolicy() = default;
677
virtual void test(ObRetryParam &v) const override
679
if (v.stmt_retry_times_ < ObQueryRetryCtrl::MAX_SCHEMA_ERROR_LOCAL_RETRY_TIMES) {
680
v.retry_type_ = RETRY_TYPE_LOCAL;
688
////////// end of policies ////////////
692
void ObQueryRetryCtrl::px_thread_not_enough_proc(ObRetryParam &v)
694
ObRetryObject retry_obj(v);
695
ObPxThreadNotEnoughRetryPolicy thread_not_enough;
696
retry_obj.test(thread_not_enough);
699
void ObQueryRetryCtrl::trx_set_violation_proc(ObRetryParam &v)
701
ObRetryObject retry_obj(v);
702
ObTrxSetViolationRetryPolicy trx_violation;
703
ObCommonRetryLinearShortWaitPolicy retry_short_wait;
704
retry_obj.test(trx_violation).test(retry_short_wait);
707
void ObQueryRetryCtrl::trx_can_not_serialize_proc(ObRetryParam &v)
709
ObRetryObject retry_obj(v);
710
ObTrxCannotSerializeRetryPolicy trx_cannot_serialize;
711
retry_obj.test(trx_cannot_serialize);
714
void ObQueryRetryCtrl::try_lock_row_conflict_proc(ObRetryParam &v)
716
ObRetryObject retry_obj(v);
717
ObLockRowConflictRetryPolicy lock_conflict;
718
retry_obj.test(lock_conflict);
722
void ObQueryRetryCtrl::location_error_proc(ObRetryParam &v)
724
ObRetryObject retry_obj(v);
725
ObFastFailRetryPolicy fast_fail;
726
ObCommonRetryIndexLongWaitPolicy retry_long_wait;
727
retry_obj.test(fast_fail).test(retry_long_wait);
729
if (RETRY_TYPE_LOCAL == v.retry_type_) {
730
ObRefreshLocationCacheBlockPolicy block_refresh; // FIXME: why block?
731
retry_obj.test(block_refresh);
733
ObRefreshLocationCacheNonblockPolicy nonblock_refresh;
734
retry_obj.test(nonblock_refresh);
738
void ObQueryRetryCtrl::nonblock_location_error_proc(ObRetryParam &v)
740
ObRetryObject retry_obj(v);
741
ObFastFailRetryPolicy fast_fail;
742
ObCommonRetryIndexLongWaitPolicy retry_long_wait;
743
ObRefreshLocationCacheNonblockPolicy nonblock_refresh;
744
retry_obj.test(fast_fail).test(retry_long_wait).test(nonblock_refresh);
747
void ObQueryRetryCtrl::location_error_nothing_readable_proc(ObRetryParam &v)
749
// 强一致性读的情况,主不可读了,有可能是invalid servers将主过滤掉了。
750
// 弱一致性读的情况,没有副本可以选择了,有可能是invalid servers将所有副本都过滤掉了。
751
// 为了更好地处理主短暂地断网的情况,将retry info清空(主要是invalid servers清空,
752
// 但是还是要保持inited的状态以便通过防御性检查,所以不能调reset,而是要调clear),然后再重试。
753
v.session_.get_retry_info_for_update().clear();
754
location_error_proc(v);
757
void ObQueryRetryCtrl::peer_server_status_uncertain_proc(ObRetryParam &v)
759
ObRetryObject retry_obj(v);
760
ObFastFailRetryPolicy fast_fail;
761
ObRefreshLocationCacheNonblockPolicy nonblock_refresh;
762
ObDMLPeerServerStateUncertainPolicy check_dml; // will abort check if dml has remote trans
763
ObCommonRetryIndexLongWaitPolicy retry_long_wait;
764
retry_obj.test(fast_fail).test(nonblock_refresh).test(check_dml).test(retry_long_wait);
767
void ObQueryRetryCtrl::schema_error_proc(ObRetryParam &v)
769
ObRetryObject retry_obj(v);
770
ObCheckSchemaUpdatePolicy schema_update_policy;
771
retry_obj.test(schema_update_policy);
774
void ObQueryRetryCtrl::autoinc_cache_not_equal_retry_proc(ObRetryParam &v)
776
ObRetryObject retry_obj(v);
777
ObAutoincCacheNotEqualRetryPolicy autoinc_retry_policy;
778
ObCommonRetryLinearShortWaitPolicy retry_short_wait;
779
retry_obj.test(autoinc_retry_policy).test(retry_short_wait);
782
void ObQueryRetryCtrl::snapshot_discard_proc(ObRetryParam &v)
784
if (ObQueryRetryCtrl::is_isolation_RR_or_SE(v.session_.get_tx_isolation())) {
786
v.client_ret_ = v.err_;
787
v.retry_type_ = RETRY_TYPE_NONE;
788
LOG_WARN_RET(v.client_ret_, "snapshot discarded in serializable isolation should not retry", K(v));
790
// 读到落后太多的备机或者正在回放日志的副本了
791
// 副本不可读类型的错误最多在本线程重试1次。
792
const int64_t MAX_DATA_NOT_READABLE_ERROR_LOCAL_RETRY_TIMES = 1;
793
if (v.stmt_retry_times_ < MAX_DATA_NOT_READABLE_ERROR_LOCAL_RETRY_TIMES) {
794
v.retry_type_ = RETRY_TYPE_LOCAL;
796
ObRetryObject retry_obj(v);
797
ObCommonRetryNoWaitPolicy no_wait_retry;
798
retry_obj.test(no_wait_retry);
803
void ObQueryRetryCtrl::long_wait_retry_proc(ObRetryParam &v)
805
ObRetryObject retry_obj(v);
806
ObCommonRetryIndexLongWaitPolicy long_wait_retry;
807
retry_obj.test(long_wait_retry);
810
void ObQueryRetryCtrl::short_wait_retry_proc(ObRetryParam &v)
812
ObRetryObject retry_obj(v);
813
ObCommonRetryLinearShortWaitPolicy short_wait_retry;
814
retry_obj.test(short_wait_retry);
817
void ObQueryRetryCtrl::force_local_retry_proc(ObRetryParam &v)
819
ObRetryObject retry_obj(v);
820
ObForceLocalRetryPolicy force_local_retry;
821
retry_obj.test(force_local_retry);
824
void ObQueryRetryCtrl::batch_execute_opt_retry_proc(ObRetryParam &v)
826
ObRetryObject retry_obj(v);
827
ObBatchExecOptRetryPolicy batch_opt_retry;
828
retry_obj.test(batch_opt_retry);
831
void ObQueryRetryCtrl::switch_consumer_group_retry_proc(ObRetryParam &v)
833
ObRetryObject retry_obj(v);
834
ObSwitchConsumerGroupRetryPolicy switch_group_retry;
835
retry_obj.test(switch_group_retry);
838
void ObQueryRetryCtrl::timeout_proc(ObRetryParam &v)
841
if (OB_UNLIKELY(v.err_ == OB_TIMEOUT &&
842
ObSpmCacheCtx::STAT_FIRST_EXECUTE_PLAN == v.ctx_.spm_ctx_.spm_stat_ &&
843
v.ctx_.spm_ctx_.need_spm_timeout_)) {
844
const_cast<ObSqlCtx &>(v.ctx_).spm_ctx_.spm_stat_ = ObSpmCacheCtx::STAT_FALLBACK_EXECUTE_PLAN;
845
const_cast<ObSqlCtx &>(v.ctx_).spm_ctx_.need_spm_timeout_ = false;
846
ObRetryObject retry_obj(v);
847
ObForceLocalRetryPolicy force_local_retry;
848
retry_obj.test(force_local_retry);
849
} else if (is_try_lock_row_err(v.session_.get_retry_info().get_last_query_retry_err())) {
850
v.client_ret_ = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
851
v.retry_type_ = RETRY_TYPE_NONE;
854
if (is_try_lock_row_err(v.session_.get_retry_info().get_last_query_retry_err())) {
855
v.client_ret_ = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
856
v.retry_type_ = RETRY_TYPE_NONE;
861
/////// For inner SQL only ///////////////
862
void ObQueryRetryCtrl::inner_try_lock_row_conflict_proc(ObRetryParam &v)
864
ObRetryObject retry_obj(v);
865
ObInnerLockRowConflictRetryPolicy lock_conflict;
866
retry_obj.test(lock_conflict);
869
void ObQueryRetryCtrl::inner_table_location_error_proc(ObRetryParam &v)
871
// 这种情况一般是内部sql执行的时候获取不到location,可能是宕机,
872
// 这里涉及到的是内部表,刷新本sql查询的表的location cache没有意义,因此不刷新。
873
ObRetryObject retry_obj(v);
874
ObCommonRetryIndexLongWaitPolicy retry_long_wait;
875
retry_obj.test(retry_long_wait);
878
void ObQueryRetryCtrl::inner_location_error_proc(ObRetryParam &v)
880
const uint64_t *trace_id = ObCurTraceId::get();
881
bool sql_trigger_by_user_req = (NULL != trace_id && 0 != trace_id[0] && 0 != trace_id[1]);
882
ObRetryObject retry_obj(v);
883
ObCheckTenantStatusPolicy check_tenant;
884
ObRefreshLocationCacheBlockPolicy block_refresh;
885
retry_obj.test(check_tenant);
886
if (true == v.no_more_test_) {
887
// case1: tenant status is abnormal, do not retry
888
} else if (v.session_.get_ddl_info().is_ddl()) {
889
// case2: inner sql ddl need retry (add by shuangcan.yjw)
890
ObCommonRetryIndexLongWaitPolicy retry_long_wait;
891
retry_obj.test(retry_long_wait).test(block_refresh);
892
} else if (sql_trigger_by_user_req) {
893
// case3: sql trigger by user request, e.g. PL
894
ObFastFailRetryPolicy fast_fail; // only enable fast fail for user triggered req
895
ObCommonRetryLinearShortWaitPolicy short_wait_retry;
896
retry_obj.test(fast_fail).test(short_wait_retry).test(block_refresh);
898
// case 4: do nothing for other inner sql
903
void ObQueryRetryCtrl::inner_location_error_nothing_readable_proc(ObRetryParam &v)
905
// 强一致性读的情况,主不可读了,有可能是invalid servers将主过滤掉了。
906
// 弱一致性读的情况,没有副本可以选择了,有可能是invalid servers将所有副本都过滤掉了。
907
// 为了更好地处理主短暂地断网的情况,将retry info清空(主要是invalid servers清空,
908
// 但是还是要保持inited的状态以便通过防御性检查,所以不能调reset,而是要调clear),然后再重试。
909
v.session_.get_retry_info_for_update().clear();
910
inner_location_error_proc(v);
913
void ObQueryRetryCtrl::inner_common_schema_error_proc(ObRetryParam &v)
915
ObRetryObject retry_obj(v);
916
ObInnerCommonCheckSchemaPolicy common_schema_policy;
917
retry_obj.test(common_schema_policy);
921
void ObQueryRetryCtrl::inner_schema_error_proc(ObRetryParam &v)
923
ObRetryObject retry_obj(v);
924
ObInnerCommonCheckSchemaPolicy common_schema_policy;
925
ObInnerCheckSchemaPolicy schema_policy;
926
retry_obj.test(common_schema_policy).test(schema_policy);
929
void ObQueryRetryCtrl::inner_peer_server_status_uncertain_proc(ObRetryParam &v)
931
ObRetryObject retry_obj(v);
932
if (v.session_.get_ddl_info().is_ddl()) {
933
ObFastFailRetryPolicy fast_fail;
934
ObCommonRetryLinearShortWaitPolicy short_wait_retry;
935
retry_obj.test(fast_fail).test(short_wait_retry);
941
/////// system defined common func /////////////
943
void ObQueryRetryCtrl::empty_proc(ObRetryParam &v)
945
// 根据"给用户返回导致不重试的最后一个错误码"的原则,
946
// 这里是err不在重试错误码列表中的情况,需要将client_ret设置为相应的值
947
v.client_ret_ = v.err_;
948
v.retry_type_ = RETRY_TYPE_NONE;
949
if (OB_ERR_PROXY_REROUTE != v.client_ret_) {
950
LOG_DEBUG("no retry handler for this err code, no need retry", K(v),
951
K(THIS_WORKER.get_timeout_ts()), K(v.result_.get_stmt_type()),
952
K(v.session_.get_retry_info().get_last_query_retry_err()));
956
void ObQueryRetryCtrl::before_func(ObRetryParam &v)
958
if (OB_UNLIKELY(v.is_inner_sql_)) {
959
ObRetryObject retry_obj(v);
960
ObInnerBeforeRetryCheckPolicy before_retry;
961
retry_obj.test(before_retry);
963
ObRetryObject retry_obj(v);
964
ObBeforeRetryCheckPolicy before_retry;
965
ObStmtTypeRetryPolicy check_stmt_type;
966
retry_obj.test(before_retry).test(check_stmt_type);
970
void ObQueryRetryCtrl::after_func(ObRetryParam &v)
972
if (OB_TRY_LOCK_ROW_CONFLICT == v.client_ret_
973
|| OB_ERR_PROXY_REROUTE == v.client_ret_
974
|| (v.is_from_pl_ && OB_READ_NOTHING == v.client_ret_)) {
977
// PL 里面的 OB_READ_NOTHING 不打印日志
979
LOG_WARN_RET(v.client_ret_, "[RETRY] check if need retry", K(v), "need_retry", RETRY_TYPE_NONE != v.retry_type_);
981
if (RETRY_TYPE_NONE != v.retry_type_) {
982
v.session_.get_retry_info_for_update().set_last_query_retry_err(v.err_);
983
v.session_.get_retry_info_for_update().inc_retry_cnt();
984
if (OB_UNLIKELY(v.err_ != v.client_ret_)) {
985
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "when need retry, v.client_ret_ must be equal to err", K(v));
988
if (OB_UNLIKELY(OB_SUCCESS == v.client_ret_)) {
989
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "no matter need retry or not, v.client_ret_ should not be OB_SUCCESS", K(v));
993
int ObQueryRetryCtrl::init()
995
int ret = OB_SUCCESS;
997
OX(map_.create(8192, "RetryCtrl", "RetryCtrl"));
1000
// tag: unused, just for organization
1002
// func: processor for obmp* query
1003
// inner_func: processor for inner connection query
1004
// das_func: processor for DAS task retry
1005
#ifndef ERR_RETRY_FUNC
1006
#define ERR_RETRY_FUNC(tag, r, func, inner_func, das_func) \
1007
if (OB_SUCC(ret)) { \
1008
if (OB_SUCCESS != (ret = map_.set_refactored(r, RetryFuncs(func, inner_func, das_func)))) { \
1009
LOG_ERROR("Duplicated error code registered", "code", #r, KR(ret)); \
1014
// register your error code retry handler here, no order required
1016
ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_ERROR, schema_error_proc, empty_proc, nullptr);
1017
ERR_RETRY_FUNC("SCHEMA", OB_TENANT_EXIST, schema_error_proc, empty_proc, nullptr);
1018
ERR_RETRY_FUNC("SCHEMA", OB_TENANT_NOT_EXIST, schema_error_proc, empty_proc, nullptr);
1019
ERR_RETRY_FUNC("SCHEMA", OB_ERR_BAD_DATABASE, schema_error_proc, empty_proc, nullptr);
1020
ERR_RETRY_FUNC("SCHEMA", OB_DATABASE_EXIST, schema_error_proc, empty_proc, nullptr);
1021
ERR_RETRY_FUNC("SCHEMA", OB_TABLEGROUP_NOT_EXIST, schema_error_proc, empty_proc, nullptr);
1022
ERR_RETRY_FUNC("SCHEMA", OB_TABLEGROUP_EXIST, schema_error_proc, empty_proc, nullptr);
1023
ERR_RETRY_FUNC("SCHEMA", OB_TABLE_NOT_EXIST, schema_error_proc, inner_common_schema_error_proc, nullptr);
1024
ERR_RETRY_FUNC("SCHEMA", OB_ERR_TABLE_EXIST, schema_error_proc, empty_proc, nullptr);
1025
ERR_RETRY_FUNC("SCHEMA", OB_ERR_BAD_FIELD_ERROR, schema_error_proc, empty_proc, nullptr);
1026
ERR_RETRY_FUNC("SCHEMA", OB_ERR_COLUMN_DUPLICATE, schema_error_proc, empty_proc, nullptr);
1027
ERR_RETRY_FUNC("SCHEMA", OB_ERR_USER_EXIST, schema_error_proc, empty_proc, nullptr);
1028
ERR_RETRY_FUNC("SCHEMA", OB_ERR_USER_NOT_EXIST, schema_error_proc, empty_proc, nullptr);
1029
ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_PRIVILEGE, schema_error_proc, empty_proc, nullptr);
1030
ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_DB_PRIVILEGE, schema_error_proc, empty_proc, nullptr);
1031
ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_TABLE_PRIVILEGE, schema_error_proc, empty_proc, nullptr);
1032
ERR_RETRY_FUNC("SCHEMA", OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH, schema_error_proc, inner_schema_error_proc, nullptr);
1033
ERR_RETRY_FUNC("SCHEMA", OB_ERR_REMOTE_SCHEMA_NOT_FULL, schema_error_proc, inner_schema_error_proc, nullptr);
1034
ERR_RETRY_FUNC("SCHEMA", OB_ERR_SP_ALREADY_EXISTS, schema_error_proc, empty_proc, nullptr);
1035
ERR_RETRY_FUNC("SCHEMA", OB_ERR_SP_DOES_NOT_EXIST, schema_error_proc, empty_proc, nullptr);
1036
ERR_RETRY_FUNC("SCHEMA", OB_OBJECT_NAME_NOT_EXIST, schema_error_proc, empty_proc, nullptr);
1037
ERR_RETRY_FUNC("SCHEMA", OB_OBJECT_NAME_EXIST, schema_error_proc, empty_proc, nullptr);
1038
ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_EAGAIN, schema_error_proc, inner_schema_error_proc, nullptr);
1039
ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_NOT_UPTODATE, schema_error_proc, inner_schema_error_proc, nullptr);
1040
ERR_RETRY_FUNC("SCHEMA", OB_ERR_PARALLEL_DDL_CONFLICT, schema_error_proc, inner_schema_error_proc, nullptr);
1041
ERR_RETRY_FUNC("SCHEMA", OB_AUTOINC_CACHE_NOT_EQUAL, autoinc_cache_not_equal_retry_proc, autoinc_cache_not_equal_retry_proc, nullptr);
1044
ERR_RETRY_FUNC("LOCATION", OB_LOCATION_LEADER_NOT_EXIST, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1045
ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_LEADER_NOT_EXIST, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1046
ERR_RETRY_FUNC("LOCATION", OB_NO_READABLE_REPLICA, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1047
ERR_RETRY_FUNC("LOCATION", OB_NOT_MASTER, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1048
ERR_RETRY_FUNC("LOCATION", OB_RS_NOT_MASTER, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1049
ERR_RETRY_FUNC("LOCATION", OB_RS_SHUTDOWN, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1050
ERR_RETRY_FUNC("LOCATION", OB_PARTITION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1051
ERR_RETRY_FUNC("LOCATION", OB_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1052
ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_STOPPED, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1053
ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_INIT, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1054
ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_STOPPING, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1055
ERR_RETRY_FUNC("LOCATION", OB_TENANT_NOT_IN_SERVER, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1056
ERR_RETRY_FUNC("LOCATION", OB_TRANS_RPC_TIMEOUT, location_error_proc, inner_location_error_proc, nullptr);
1057
ERR_RETRY_FUNC("LOCATION", OB_USE_DUP_FOLLOW_AFTER_DML, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1058
ERR_RETRY_FUNC("LOCATION", OB_TRANS_STMT_NEED_RETRY, location_error_proc, inner_location_error_proc, nullptr);
1059
ERR_RETRY_FUNC("LOCATION", OB_LS_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1060
// OB_TABLET_NOT_EXIST may be caused by old version schema or incorrect location.
1061
// Just use location_error_proc to retry sql and a new schema guard will be obtained during the retry process.
1062
ERR_RETRY_FUNC("LOCATION", OB_TABLET_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_not_exist_retry_proc);
1063
ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1064
ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_BLOCKED, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1065
ERR_RETRY_FUNC("LOCATION", OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST, location_error_proc,inner_location_error_proc, ObDASRetryCtrl::tablet_not_exist_retry_proc);
1067
ERR_RETRY_FUNC("LOCATION", OB_GET_LOCATION_TIME_OUT, location_error_proc, inner_table_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1072
ERR_RETRY_FUNC("NETWORK", OB_RPC_CONNECT_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc, ObDASRetryCtrl::task_network_retry_proc);
1073
ERR_RETRY_FUNC("NETWORK", OB_RPC_SEND_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc, ObDASRetryCtrl::task_network_retry_proc);
1074
ERR_RETRY_FUNC("NETWORK", OB_RPC_POST_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc, ObDASRetryCtrl::task_network_retry_proc);
1077
ERR_RETRY_FUNC("STORAGE", OB_SNAPSHOT_DISCARDED, snapshot_discard_proc, short_wait_retry_proc, nullptr);
1078
ERR_RETRY_FUNC("STORAGE", OB_DATA_NOT_UPTODATE, long_wait_retry_proc, short_wait_retry_proc, nullptr);
1079
ERR_RETRY_FUNC("STORAGE", OB_REPLICA_NOT_READABLE, long_wait_retry_proc, short_wait_retry_proc, ObDASRetryCtrl::tablet_nothing_readable_proc);
1080
ERR_RETRY_FUNC("STORAGE", OB_PARTITION_IS_SPLITTING, short_wait_retry_proc, short_wait_retry_proc, nullptr);
1081
ERR_RETRY_FUNC("STORAGE", OB_DISK_HUNG, nonblock_location_error_proc, empty_proc, nullptr);
1084
ERR_RETRY_FUNC("TRX", OB_TRY_LOCK_ROW_CONFLICT, try_lock_row_conflict_proc, inner_try_lock_row_conflict_proc, nullptr);
1085
ERR_RETRY_FUNC("TRX", OB_TRANSACTION_SET_VIOLATION, trx_set_violation_proc, trx_set_violation_proc, nullptr);
1086
ERR_RETRY_FUNC("TRX", OB_TRANS_CANNOT_SERIALIZE, trx_can_not_serialize_proc, trx_can_not_serialize_proc, nullptr);
1087
ERR_RETRY_FUNC("TRX", OB_GTS_NOT_READY, short_wait_retry_proc, short_wait_retry_proc, nullptr);
1088
ERR_RETRY_FUNC("TRX", OB_GTI_NOT_READY, short_wait_retry_proc, short_wait_retry_proc, nullptr);
1089
ERR_RETRY_FUNC("TRX", OB_TRANS_WEAK_READ_VERSION_NOT_READY, short_wait_retry_proc, short_wait_retry_proc, nullptr);
1090
ERR_RETRY_FUNC("TRX", OB_SEQ_NO_REORDER_UNDER_PDML, short_wait_retry_proc, short_wait_retry_proc, nullptr);
1093
ERR_RETRY_FUNC("SQL", OB_ERR_INSUFFICIENT_PX_WORKER, px_thread_not_enough_proc, short_wait_retry_proc, nullptr);
1094
// create a new interval part when inserting a row which has no matched part,
1095
// wait and retry, will see new part
1096
ERR_RETRY_FUNC("SQL", OB_NO_PARTITION_FOR_INTERVAL_PART, short_wait_retry_proc, short_wait_retry_proc, nullptr);
1097
ERR_RETRY_FUNC("SQL", OB_BATCHED_MULTI_STMT_ROLLBACK, batch_execute_opt_retry_proc, batch_execute_opt_retry_proc, nullptr);
1098
ERR_RETRY_FUNC("SQL", OB_SQL_RETRY_SPM, force_local_retry_proc, force_local_retry_proc, nullptr);
1099
ERR_RETRY_FUNC("SQL", OB_NEED_SWITCH_CONSUMER_GROUP, switch_consumer_group_retry_proc, empty_proc, nullptr);
1102
ERR_RETRY_FUNC("SQL", OB_TIMEOUT, timeout_proc, timeout_proc, nullptr);
1103
ERR_RETRY_FUNC("SQL", OB_TRANS_TIMEOUT, timeout_proc, timeout_proc, nullptr);
1104
ERR_RETRY_FUNC("SQL", OB_TRANS_STMT_TIMEOUT, timeout_proc, timeout_proc, nullptr);
1109
#undef ERR_RETRY_FUNC
1113
void ObQueryRetryCtrl::destroy()
1115
// don't want to add a lock here
1116
// must ensure calling destroy after all threads exit
1120
ObQueryRetryCtrl::ObQueryRetryCtrl()
1121
: curr_query_tenant_local_schema_version_(0),
1122
curr_query_tenant_global_schema_version_(0),
1123
curr_query_sys_local_schema_version_(0),
1124
curr_query_sys_global_schema_version_(0),
1126
retry_type_(RETRY_TYPE_NONE),
1127
retry_err_code_(OB_SUCCESS)
1131
ObQueryRetryCtrl::~ObQueryRetryCtrl()
1135
int ObQueryRetryCtrl::get_das_retry_func(int err, ObDASRetryCtrl::retry_func &retry_func)
1137
int ret = OB_SUCCESS;
1138
retry_func = nullptr;
1140
if (OB_FAIL(map_.get_refactored(err, funcs))) {
1141
if (OB_HASH_NOT_EXIST == ret) {
1145
retry_func = funcs.element<2>();
1150
int ObQueryRetryCtrl::get_func(int err, bool is_inner_sql, retry_func &func)
1152
int ret = OB_SUCCESS;
1154
if (OB_FAIL(map_.get_refactored(err, funcs))) {
1155
if (OB_HASH_NOT_EXIST == ret) {
1160
func = is_inner_sql ? funcs.element<1>() : funcs.element<0>();
1165
void ObQueryRetryCtrl::test_and_save_retry_state(const ObGlobalContext &gctx,
1166
const ObSqlCtx &ctx,
1167
ObResultSet &result,
1170
bool force_local_retry,
1172
bool is_part_of_pl_sql)
1174
int ret = OB_SUCCESS;
1176
retry_type_ = RETRY_TYPE_NONE;
1177
retry_err_code_ = OB_SUCCESS;
1178
retry_func func = nullptr;
1179
ObSQLSessionInfo *session = result.get_exec_context().get_my_session();
1180
if (OB_ISNULL(session)) {
1181
// this is possible. #issue/43953721
1182
LOG_WARN("session is null in exec_context. maybe OOM. don't retry", K(err));
1183
} else if (OB_FAIL(get_func(err, is_inner_sql, func))) {
1184
// note: if no err proc registered, a default handler
1185
// 'empty_proc' is used as processor func
1186
LOG_WARN("fail get retry func", K(err), K(ret));
1187
} else if (OB_ISNULL(func)) {
1188
client_ret = OB_ERR_UNEXPECTED;
1189
LOG_WARN("invalid retry processor, no retry", K(err));
1191
// you can't tell exact stmt retry times for a SQL in PL as PL may do whole block retry
1192
// so we use retry_times_ as stmt_retry_times for any stmt in PL
1193
// if pl + stmt_retry_times == 0 scene, will cause timeout early.
1194
// So the number of retry times here is at least 1
1195
const int64_t stmt_retry_times =
1196
is_part_of_pl_sql ? (retry_times_ == 0 ? 1 : retry_times_):
1197
session->get_retry_info().get_retry_cnt();
1198
ObRetryParam retry_param(ctx, result, *session,
1199
curr_query_tenant_local_schema_version_,
1200
curr_query_tenant_global_schema_version_,
1201
curr_query_sys_local_schema_version_,
1202
curr_query_sys_global_schema_version_,
1211
// do some common checks in this hook, which is not bond to certain error code
1212
ObQueryRetryCtrl::before_func(retry_param);
1213
// this 'if' check is necessary, as direct call to func may override
1214
// the decision made in 'before_func', which is not what you want.
1215
if (!retry_param.no_more_test_) {
1218
// always execute after func hook to set some states
1219
ObQueryRetryCtrl::after_func(retry_param);
1221
if (RETRY_TYPE_NONE != retry_type_) {
1222
// this retry times only apply to current thread retry.
1223
// reset to 0 after each packet retry
1226
// xiaochu: I don't like the idea 'retry_err_code_', remove it later
1227
if (RETRY_TYPE_NONE != retry_type_) {
1228
retry_err_code_ = client_ret;
1230
if (RETRY_TYPE_NONE != retry_type_) {
1231
result.set_close_fail_callback([this](const int err, int &client_ret)-> void { this->on_close_resultset_fail_(err, client_ret); });
1235
void ObQueryRetryCtrl::on_close_resultset_fail_(const int err, int &client_ret)
1237
// some unretryable error happened in close result set phase
1238
if (OB_SUCCESS != err && RETRY_TYPE_NONE != retry_type_) {
1239
// the txn relative error in close stmt
1240
// thses error will cause the txn must to be rollbacked
1241
// and can not accept new request any more, so if retry
1242
// current stmt, it must be failed, hence we cancel retry
1243
if (OB_TRANS_NEED_ROLLBACK == err ||
1244
OB_TRANS_INVALID_STATE == err ||
1245
OB_TRANS_HAS_DECIDED == err) {
1246
retry_type_ = RETRY_TYPE_NONE;
1247
// also clear the packet retry
1248
THIS_WORKER.unset_need_retry();
1249
// rewrite the client error code
1250
// when decide to cancel the retry, return an unretryable error
1251
// is better, because it won't leak the internal error to user