oceanbase

Форк
0
/
ob_query_retry_ctrl.cpp 
1257 строк · 54.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "observer/mysql/ob_query_retry_ctrl.h"
16
#include "sql/ob_sql_context.h"
17
#include "sql/resolver/ob_stmt.h"
18
#include "pl/ob_pl.h"
19
#include "storage/tx/ob_trans_define.h"
20
#include "observer/mysql/ob_mysql_result_set.h"
21
#include "observer/ob_server_struct.h"
22
#include "observer/mysql/obmp_query.h"
23

24
namespace oceanbase
25
{
26
using namespace common;
27
using namespace sql;
28
using namespace share::schema;
29
using namespace oceanbase::transaction;
30

31

32
namespace observer
33
{
34

35
common::hash::ObHashMap<int, ObQueryRetryCtrl::RetryFuncs, common::hash::NoPthreadDefendMode> ObQueryRetryCtrl::map_;
36

37
void ObRetryPolicy::try_packet_retry(ObRetryParam &v) const
38
{
39
  const ObMultiStmtItem &multi_stmt_item = v.ctx_.multi_stmt_item_;
40
  if (v.force_local_retry_) {
41
    v.retry_type_ = RETRY_TYPE_LOCAL;
42
  } else if (multi_stmt_item.is_batched_multi_stmt()) {
43
    // in batch optimization, can't do packet retry
44
    v.retry_type_ = RETRY_TYPE_LOCAL;
45
  } else if (multi_stmt_item.is_part_of_multi_stmt() && multi_stmt_item.get_seq_num() > 0) {
46
    // muti stmt,并且不是第一句,不能扔回队列重试,因为前面的无法回滚
47
    v.retry_type_ = RETRY_TYPE_LOCAL;
48
  } else if (!THIS_WORKER.can_retry()) {
49
    // false == THIS_WORKER.can_retry() means throw back to queue disabled by SOME logic
50
    v.retry_type_ = RETRY_TYPE_LOCAL;
51
  } else {
52
    v.retry_type_ = RETRY_TYPE_PACKET;
53
    THIS_WORKER.set_need_retry();
54
  }
55
}
56

57
void ObRetryPolicy::sleep_before_local_retry(ObRetryParam &v,
58
                                             RetrySleepType retry_sleep_type,
59
                                             int64_t base_sleep_us,
60
                                             int64_t timeout_timestamp) const
61
{
62
  int ret = OB_SUCCESS;
63
  int64_t sleep_us = 0;
64
  switch(retry_sleep_type) {
65
    case RETRY_SLEEP_TYPE_LINEAR: {
66
      sleep_us = base_sleep_us * linear_timeout_factor(v.stmt_retry_times_);
67
      break;
68
    }
69
    case RETRY_SLEEP_TYPE_INDEX: {
70
      sleep_us = base_sleep_us * index_timeout_factor(v.stmt_retry_times_);
71
      break;
72
    }
73
    case RETRY_SLEEP_TYPE_NONE: {
74
      sleep_us = 0;
75
      break;
76
    }
77
    default: {
78
      ret = OB_ERR_UNEXPECTED;
79
      LOG_ERROR("unexpected retry sleep type", K(ret), K(base_sleep_us),
80
                K(retry_sleep_type), K(v.stmt_retry_times_), K(timeout_timestamp));
81
      break;
82
    }
83
  }
84
  if (RETRY_SLEEP_TYPE_NONE != retry_sleep_type && OB_SUCC(ret)) {
85
    int64_t remain_us = timeout_timestamp - ObTimeUtility::current_time();
86
    if (sleep_us > remain_us) {
87
      sleep_us = remain_us;
88
    }
89
    if (sleep_us > 0) {
90
      LOG_INFO("will sleep", K(sleep_us), K(remain_us), K(base_sleep_us),
91
               K(retry_sleep_type), K(v.stmt_retry_times_), K(v.err_), K(timeout_timestamp));
92
      THIS_WORKER.sched_wait();
93
      ob_usleep(static_cast<uint32_t>(sleep_us));
94
      THIS_WORKER.sched_run();
95
      if (THIS_WORKER.is_timeout()) {
96
        v.client_ret_ = OB_TIMEOUT;
97
        v.retry_type_ = RETRY_TYPE_NONE;
98
        v.no_more_test_ = true;
99
        LOG_WARN("this worker is timeout after retry sleep. no more retry", K(v));
100
      }
101
    } else {
102
      LOG_INFO("already timeout, do not need sleep", K(sleep_us), K(remain_us), K(base_sleep_us),
103
               K(retry_sleep_type), K(v.stmt_retry_times_), K(timeout_timestamp));
104
    }
105
  }
106
}
107

108
template<bool is_async>
109
class ObRefreshLocationCachePolicy : public ObRetryPolicy
110
{
111
public:
112
  ObRefreshLocationCachePolicy() = default;
113
  ~ObRefreshLocationCachePolicy() = default;
114
  virtual void test(ObRetryParam &v) const override
115
  {
116
    v.result_.force_refresh_location_cache(is_async, v.err_);
117
  }
118
};
119

120
typedef ObRefreshLocationCachePolicy<true> ObRefreshLocationCacheNonblockPolicy;
121
typedef ObRefreshLocationCachePolicy<false> ObRefreshLocationCacheBlockPolicy;
122

123

124
template<RetrySleepType SleepType, int64_t WaitUs>
125
class ObCommonRetryPolicy : public ObRetryPolicy
126
{
127
public:
128
  ObCommonRetryPolicy() = default;
129
  ~ObCommonRetryPolicy() = default;
130
  virtual void test(ObRetryParam &v) const override
131
  {
132
    try_packet_retry(v);
133
    if (RETRY_TYPE_LOCAL == v.retry_type_) {
134
      sleep_before_local_retry(v,
135
                               SleepType,
136
                               WaitUs,
137
                               THIS_WORKER.get_timeout_ts());
138
    }
139
  }
140
};
141

142
typedef ObCommonRetryPolicy<RETRY_SLEEP_TYPE_NONE, 0>                                    ObCommonRetryNoWaitPolicy;
143
typedef ObCommonRetryPolicy<RETRY_SLEEP_TYPE_LINEAR, ObRetryPolicy::WAIT_RETRY_LONG_US>  ObCommonRetryLinearLongWaitPolicy;
144
typedef ObCommonRetryPolicy<RETRY_SLEEP_TYPE_LINEAR, ObRetryPolicy::WAIT_RETRY_SHORT_US> ObCommonRetryLinearShortWaitPolicy;
145
typedef ObCommonRetryPolicy<RETRY_SLEEP_TYPE_INDEX, ObRetryPolicy::WAIT_RETRY_LONG_US>   ObCommonRetryIndexLongWaitPolicy;
146
typedef ObCommonRetryPolicy<RETRY_SLEEP_TYPE_INDEX, ObRetryPolicy::WAIT_RETRY_SHORT_US>  ObCommonRetryIndexShortWaitPolicy;
147

148

149
class ObFastFailRetryPolicy : public ObRetryPolicy
150
{
151
public:
152
  ObFastFailRetryPolicy() = default;
153
  ~ObFastFailRetryPolicy() = default;
154
  virtual void test(ObRetryParam &v) const override
155
  {
156
    if (v.session_.get_retry_info_for_update()
157
        .should_fast_fail(v.session_.get_effective_tenant_id())) {
158
      v.client_ret_ = v.err_;
159
      v.retry_type_ = RETRY_TYPE_NONE;
160
      v.no_more_test_ = true;
161
      LOG_WARN_RET(v.err_, "server down error, fast fail", K(v));
162
    }
163
  }
164
};
165

166
class ObForceLocalRetryPolicy : public ObRetryPolicy
167
{
168
public:
169
  ObForceLocalRetryPolicy() = default;
170
  ~ObForceLocalRetryPolicy() = default;
171
  virtual void test(ObRetryParam &v) const override
172
  {
173
    v.retry_type_ = RETRY_TYPE_LOCAL;
174
  }
175
};
176

177
class ObBatchExecOptRetryPolicy : public ObRetryPolicy
178
{
179
public:
180
  ObBatchExecOptRetryPolicy() = default;
181
  ~ObBatchExecOptRetryPolicy() = default;
182
  virtual void test(ObRetryParam &v) const override
183
  {
184
    if (v.ctx_.is_do_insert_batch_opt()) {
185
      v.retry_type_ = RETRY_TYPE_LOCAL;
186
    } else {
187
      v.retry_type_ = RETRY_TYPE_NONE;
188
    }
189
  }
190
};
191

192
class ObSwitchConsumerGroupRetryPolicy : public ObRetryPolicy
193
{
194
public:
195
  ObSwitchConsumerGroupRetryPolicy() = default;
196
  ~ObSwitchConsumerGroupRetryPolicy() = default;
197
  virtual void test(ObRetryParam &v) const override
198
  {
199
    try_packet_retry(v);
200
    if (RETRY_TYPE_LOCAL == v.retry_type_) {
201
      LOG_WARN_RET(v.err_, "set retry packet failed, retry at local",
202
        K(v.ctx_.multi_stmt_item_.is_part_of_multi_stmt()),
203
        K(v.ctx_.multi_stmt_item_.get_seq_num()));
204
      v.session_.set_group_id_not_expected(true);
205
      v.result_.get_exec_context().set_need_disconnect(false);
206
    }
207
  }
208
};
209

210
class ObBeforeRetryCheckPolicy : public ObRetryPolicy
211
{
212
public:
213
  ObBeforeRetryCheckPolicy() = default;
214
  ~ObBeforeRetryCheckPolicy() = default;
215
  virtual void test(ObRetryParam &v) const override
216
  {
217
    int ret = OB_SUCCESS;
218
    if (v.session_.is_terminate(ret)) {
219
      v.no_more_test_ = true;
220
      v.retry_type_ = RETRY_TYPE_NONE;
221
      // In the kill client session scenario, the server session will be marked
222
      // with the SESSION_KILLED mark. In the retry scenario, there will be an error
223
      // code covering 5066, so the judgment logic is added here.
224
      if (ret == OB_ERR_SESSION_INTERRUPTED && v.err_ == OB_ERR_KILL_CLIENT_SESSION) {
225
        v.client_ret_ = v.err_;
226
      } else{
227
        v.client_ret_ = ret; // session terminated
228
      }
229
      LOG_WARN("execution was terminated", K(ret), K(v.client_ret_), K(v.err_));
230
    } else if (THIS_WORKER.is_timeout()) {
231
      v.no_more_test_ = true;
232
      v.retry_type_ = RETRY_TYPE_NONE;
233
      if (OB_ERR_INSUFFICIENT_PX_WORKER == v.err_ ||
234
          OB_ERR_EXCLUSIVE_LOCK_CONFLICT == v.err_ ||
235
          OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT == v.err_) {
236
        v.client_ret_ = v.err_;
237
      } else if (is_try_lock_row_err(v.session_.get_retry_info().get_last_query_retry_err())) {
238
        // timeout caused by locking, should return OB_ERR_EXCLUSIVE_LOCK_CONFLICT
239
        v.client_ret_ = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
240
      } else {
241
        v.client_ret_ = OB_TIMEOUT;
242
      }
243
      LOG_WARN("this worker is timeout, do not need retry", K(v),
244
               K(THIS_WORKER.get_timeout_ts()), K(v.result_.get_stmt_type()),
245
               K(v.session_.get_retry_info().get_last_query_retry_err()));
246
      if (v.session_.get_retry_info().is_rpc_timeout() || is_transaction_rpc_timeout_err(v.err_)) {
247
        // rpc超时了,可能是location cache不对,异步刷新location cache
248
        v.result_.force_refresh_location_cache(true, v.err_); // 非阻塞
249
        LOG_WARN("sql rpc timeout, or trans rpc timeout, maybe location is changed, "
250
                 "refresh location cache non blockly", K(v),
251
                 K(v.session_.get_retry_info().is_rpc_timeout()));
252
      }
253
    }
254
  }
255
};
256

257
class ObStmtTypeRetryPolicy : public ObRetryPolicy
258
{
259
public:
260
  ObStmtTypeRetryPolicy() = default;
261
  ~ObStmtTypeRetryPolicy() = default;
262

263
  bool is_direct_load(ObRetryParam &v) const
264
  {
265
    ObExecContext &exec_ctx = v.result_.get_exec_context();
266
    return exec_ctx.get_table_direct_insert_ctx().get_is_direct();
267
  }
268

269
  bool is_load_local(ObRetryParam &v) const
270
  {
271
    bool bret = false;
272
    const ObICmd *cmd = v.result_.get_cmd();
273
    if (OB_NOT_NULL(cmd) && cmd->get_cmd_type() == stmt::T_LOAD_DATA) {
274
      const ObLoadDataStmt *load_data_stmt = static_cast<const ObLoadDataStmt *>(cmd);
275
      bret = load_data_stmt->get_load_arguments().load_file_storage_ == ObLoadFileLocation::CLIENT_DISK;
276
    }
277
    return bret;
278
  }
279

280
  virtual void test(ObRetryParam &v) const override
281
  {
282
    int err = v.err_;
283
    if (v.result_.is_pl_stmt(v.result_.get_stmt_type()) && !v.session_.get_pl_can_retry()) {
284
      LOG_WARN_RET(err, "current pl can not retry, commit may have occurred",
285
               K(v), K(v.result_.get_stmt_type()));
286
      v.client_ret_ = err;
287
      v.retry_type_ = RETRY_TYPE_NONE;
288
      v.no_more_test_ = true;
289
    } else if (ObStmt::force_skip_retry_stmt(v.result_.get_stmt_type())) {
290
      v.client_ret_ = err;
291
      v.retry_type_ = RETRY_TYPE_NONE;
292
      v.no_more_test_ = true;
293
    } else if (ObStmt::is_ddl_stmt(v.result_.get_stmt_type(), v.result_.has_global_variable())) {
294
      if (is_ddl_stmt_packet_retry_err(err)) {
295
        try_packet_retry(v);
296
      } else {
297
        v.client_ret_ = err;
298
        v.retry_type_ = RETRY_TYPE_NONE;
299
      }
300
      v.no_more_test_ = true;
301
    } else if (is_load_local(v)) {
302
      v.client_ret_ = err;
303
      v.retry_type_ = RETRY_TYPE_NONE;
304
      v.no_more_test_ = true;
305
    } else if (is_direct_load(v)) {
306
      if (is_direct_load_retry_err(err)) {
307
        try_packet_retry(v);
308
      } else {
309
        v.client_ret_ = err;
310
        v.retry_type_ = RETRY_TYPE_NONE;
311
      }
312
      v.no_more_test_ = true;
313
    }
314
  }
315
};
316

317

318
class ObCheckSchemaUpdatePolicy : public ObRetryPolicy
319
{
320
public:
321
  ObCheckSchemaUpdatePolicy() = default;
322
  ~ObCheckSchemaUpdatePolicy() = default;
323
  virtual void test(ObRetryParam &v) const override
324
  {
325
    int ret = OB_SUCCESS;
326
    // 设计讨论参考:
327
    if (NULL == GCTX.schema_service_) {
328
      v.client_ret_ = OB_INVALID_ARGUMENT;
329
      v.retry_type_ = RETRY_TYPE_NONE;
330
      v.no_more_test_ = true;
331
      LOG_WARN("invalid argument", K(v));
332
    } else {
333
      ObSchemaGetterGuard schema_guard;
334
      int64_t local_tenant_version_latest = 0;
335
      int64_t local_sys_version_latest = 0;
336
      if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
337
                  v.session_.get_effective_tenant_id(), schema_guard))) {
338
        // 不需要重试了,同时让它返回get_schema_guard出错的错误码,因为是由它引起不重试的
339
        LOG_WARN("get schema guard failed", K(v), K(ret));
340
        v.client_ret_ = ret;
341
        v.retry_type_ = RETRY_TYPE_NONE;
342
        v.no_more_test_ = true;
343
      } else if (OB_FAIL(schema_guard.get_schema_version(
344
                  v.session_.get_effective_tenant_id(), local_tenant_version_latest))) {
345
        LOG_WARN("fail get tenant schema version", K(v), K(ret));
346
        v.client_ret_ = ret;
347
        v.retry_type_ = RETRY_TYPE_NONE;
348
        v.no_more_test_ = true;
349
      } else if (OB_FAIL(schema_guard.get_schema_version(
350
                  OB_SYS_TENANT_ID, local_sys_version_latest))) {
351
        LOG_WARN("fail get sys schema version", K(v), K(ret));
352
        v.client_ret_ = ret;
353
        v.retry_type_ = RETRY_TYPE_NONE;
354
        v.no_more_test_ = true;
355
      } else {
356
        bool local_schema_not_full = GCTX.schema_service_->is_schema_error_need_retry(
357
                                     &schema_guard, v.session_.get_effective_tenant_id());
358
        int64_t local_tenant_version_start = v.curr_query_tenant_local_schema_version_;
359
        int64_t global_tenant_version_start = v.curr_query_tenant_global_schema_version_;
360
        int64_t local_sys_version_start = v.curr_query_sys_local_schema_version_;
361
        int64_t global_sys_version_start = v.curr_query_sys_global_schema_version_;
362
        // (c1) 需要考虑远端机器的Schema比本地落后,远端机器抛出Schema错误的情景
363
        //      当远端抛出Schema错误的时候,强行将所有Schema错误转化成OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH
364
        //      权限不足也会触发该重试规则,因为远端schema刷新不及时可能误报权限不足,此时是需要重试的
365
        // (c4) 弱一致性读场景,会校验schema版本是否大于等于数据的schema版本,
366
        //      如果schema版本旧,则要求重试;
367
        //      目的是保证:始终采用新schema解析老数据
368
        // (c5) 梳理了OB_SCHEMA_EAGAIN使用的地方,主路径上出现了该错误码的地方需要触发SQL重试
369
        // (c2) 表存在或不存在/数据库存在或不存在/用户存在或不存在,并且local和global版本不等时重试
370
        // (c3) 其它任何sql开始执行时local version比当前local version小导致schema错误的情况
371
        // (c6) For local server, related tenant's schema maybe not refreshed yet when observer restarts or create tenant.
372
        // (c7) For remote server, related tenant's schema maybe not refreshed yet when observer restarts or create tenant.
373
        if ((OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH == v.err_) || // (c1)
374
            (OB_SCHEMA_NOT_UPTODATE == v.err_) || // (c4)
375
            (OB_SCHEMA_EAGAIN == v.err_) || // (c5)
376
            (global_tenant_version_start > local_tenant_version_start) || // (c2)
377
            (global_sys_version_start > local_sys_version_start) || // (c2)
378
            (local_tenant_version_latest > local_tenant_version_start) || // (c3)
379
            (local_sys_version_latest > local_sys_version_start) || // (c3)
380
            (local_schema_not_full) || // (c6)
381
            (OB_ERR_REMOTE_SCHEMA_NOT_FULL == v.err_) // (c7)
382
           ) {
383
          if (v.stmt_retry_times_ < ObQueryRetryCtrl::MAX_SCHEMA_ERROR_LOCAL_RETRY_TIMES) {
384
            v.retry_type_ = RETRY_TYPE_LOCAL;
385
          } else {
386
            try_packet_retry(v);
387
          }
388
          if (RETRY_TYPE_LOCAL == v.retry_type_) {
389
            // 线性重试响应更快
390
            sleep_before_local_retry(v,
391
                                     RETRY_SLEEP_TYPE_LINEAR,
392
                                     WAIT_RETRY_SHORT_US,
393
                                     THIS_WORKER.get_timeout_ts());
394
          }
395
        } else {
396
          // 这里的client_ret不好决定,让它依然返回err
397
          v.client_ret_ = v.err_;
398
          v.retry_type_ = RETRY_TYPE_NONE;
399
          v.no_more_test_ = true;
400
        }
401
      }
402
    }
403
  }
404
};
405

406
// if tenant status is abnormal, do not retry sql
407
class ObCheckTenantStatusPolicy : public ObRetryPolicy
408
{
409
public:
410
  ObCheckTenantStatusPolicy() = default;
411
  ~ObCheckTenantStatusPolicy() = default;
412
  virtual void test(ObRetryParam &v) const override
413
  {
414
    int ret = OB_SUCCESS;
415
    if (OB_ISNULL(GCTX.schema_service_)) {
416
      ret = OB_INVALID_ARGUMENT;
417
      LOG_TRACE("invalid schema_service", KR(ret), K(v));
418
    } else {
419
      ObSchemaGetterGuard schema_guard;
420
      const ObTenantSchema *tenant_schema = NULL;
421
      if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
422
          OB_SYS_TENANT_ID,
423
          schema_guard))) {
424
        LOG_TRACE("get sys tenant schema guard failed", KR(ret), K(v));
425
      } else if (OB_FAIL(schema_guard.get_tenant_info(
426
          v.session_.get_effective_tenant_id(),
427
          tenant_schema))) {
428
        LOG_TRACE("fail get tenant info", KR(ret),
429
            "tenant_id", v.session_.get_effective_tenant_id(), K(v));
430
      } else if (OB_ISNULL(tenant_schema) || !tenant_schema->is_normal()) {
431
        // use LOG_TRACE to prevent too much warning during creating tenant
432
        LOG_TRACE("tenant status is abnormal, do not retry",
433
            "tenant_id", v.session_.get_effective_tenant_id(), KPC(tenant_schema), K(v));
434
        // tenant status is abnormal, do not retry and return v.err_
435
        v.client_ret_ = v.err_;
436
        v.retry_type_ = RETRY_TYPE_NONE;
437
        v.no_more_test_ = true;
438
      } else {
439
        // tenant status is normal, check passed
440
      }
441
    }
442
  }
443
};
444

445
class ObDMLPeerServerStateUncertainPolicy : public ObRetryPolicy
446
{
447
public:
448
  ObDMLPeerServerStateUncertainPolicy() = default;
449
  ~ObDMLPeerServerStateUncertainPolicy() = default;
450
  virtual void test(ObRetryParam &v) const override
451
  {
452
    if (OB_ISNULL(v.result_.get_physical_plan())) {
453
      // issue#43741246, plan not generated, won't be a remote trans
454
      // safe to continue with other retry test
455
    } else if (ObStmt::is_dml_write_stmt(v.result_.get_stmt_type())) {
456
      // bugfix:
457
      // bugfix:
458
      bool autocommit = v.session_.get_local_autocommit();
459
      ObPhyPlanType plan_type = v.result_.get_physical_plan()->get_plan_type();
460
      bool in_transaction = v.session_.is_in_transaction();
461
      if (ObSqlTransUtil::is_remote_trans(autocommit, in_transaction, plan_type)) {
462
        // 当前observer内部无法进行重试
463
        // err是OB_RPC_CONNECT_ERROR
464
        v.client_ret_ = v.err_;
465
        v.retry_type_ = RETRY_TYPE_NONE;
466
        v.no_more_test_ = true;
467
        LOG_WARN_RET(v.err_, "server down error, the write dml is remote, don't retry",
468
                 K(autocommit), K(plan_type), K(in_transaction), K(v));
469
      }
470
    }
471
  }
472
};
473

474

475
class ObLockRowConflictRetryPolicy : public ObRetryPolicy
476
{
477
public:
478
  ObLockRowConflictRetryPolicy() = default;
479
  ~ObLockRowConflictRetryPolicy() = default;
480
  virtual void test(ObRetryParam &v) const override
481
  {
482
    // sql which in pl will local retry first. see ObInnerSQLConnection::process_retry.
483
    // sql which not in pl use the same strategy to avoid never getting the lock.
484
    if (v.force_local_retry_ || (v.local_retry_times_ <= 1 && !v.result_.is_pl_stmt(v.result_.get_stmt_type()))) {
485
      v.retry_type_ = RETRY_TYPE_LOCAL;
486
    } else {
487
      const ObMultiStmtItem &multi_stmr_item = v.ctx_.multi_stmt_item_;
488
      try_packet_retry(v);
489
    }
490
  }
491
};
492

493

494
class ObTrxSetViolationRetryPolicy : public ObRetryPolicy
495
{
496
public:
497
  ObTrxSetViolationRetryPolicy() = default;
498
  ~ObTrxSetViolationRetryPolicy() = default;
499
  virtual void test(ObRetryParam &v) const override
500
  {
501
    if (ObQueryRetryCtrl::is_isolation_RR_or_SE(v.session_.get_tx_isolation())) {
502
      v.client_ret_ = OB_TRANS_CANNOT_SERIALIZE;
503
      v.retry_type_ = RETRY_TYPE_NONE;
504
      v.no_more_test_ = true;
505
      LOG_WARN_RET(v.client_ret_, "transaction cannot serialize", K(v));
506
    }
507
  }
508
};
509

510
class ObTrxCannotSerializeRetryPolicy : public ObRetryPolicy
511
{
512
public:
513
  ObTrxCannotSerializeRetryPolicy() = default;
514
  ~ObTrxCannotSerializeRetryPolicy() = default;
515
  virtual void test(ObRetryParam &v) const override
516
  {
517
    v.client_ret_ = OB_TRANS_CANNOT_SERIALIZE;
518
    v.retry_type_ = RETRY_TYPE_NONE;
519
    v.no_more_test_ = true;
520
    LOG_WARN_RET(v.client_ret_, "transaction cannot serialize", K(v));
521
  }
522
};
523

524
class ObPxThreadNotEnoughRetryPolicy : public ObRetryPolicy
525
{
526
public:
527
  ObPxThreadNotEnoughRetryPolicy() = default;
528
  ~ObPxThreadNotEnoughRetryPolicy() = default;
529
  virtual void test(ObRetryParam &v) const override
530
  {
531
    if (v.force_local_retry_) {
532
      v.retry_type_ = RETRY_TYPE_LOCAL;
533
    } else {
534
      try_packet_retry(v);
535
      if (RETRY_TYPE_LOCAL == v.retry_type_) {
536
        v.client_ret_ = v.err_;
537
        v.retry_type_ = RETRY_TYPE_NONE;
538
        v.no_more_test_ = true;
539
        LOG_WARN_RET(v.client_ret_, "can not retry local. need to terminate to prevent thread resouce deadlock", K(v));
540
      }
541
    }
542
  }
543
};
544

545
////////// special inner retry policy for inner connection ////////////
546
//
547
class ObInnerCommonCheckSchemaPolicy : public ObRetryPolicy
548
{
549
public:
550
  ObInnerCommonCheckSchemaPolicy() = default;
551
  ~ObInnerCommonCheckSchemaPolicy() = default;
552
  virtual void test(ObRetryParam &v) const override
553
  {
554
    int ret = OB_SUCCESS;
555
    bool local_schema_not_full = GSCHEMASERVICE.is_schema_error_need_retry(
556
                                 NULL, v.session_.get_effective_tenant_id());
557
    if (local_schema_not_full || OB_ERR_REMOTE_SCHEMA_NOT_FULL == v.err_) {
558
      v.no_more_test_ = true;
559
      v.retry_type_ = RETRY_TYPE_LOCAL;
560
      sleep_before_local_retry(v,
561
                          RETRY_SLEEP_TYPE_LINEAR,
562
                          WAIT_RETRY_SHORT_US,
563
                          THIS_WORKER.get_timeout_ts());
564
    }
565
  }
566
};
567

568
class ObInnerCheckSchemaPolicy : public ObRetryPolicy
569
{
570
public:
571
  ObInnerCheckSchemaPolicy() = default;
572
  ~ObInnerCheckSchemaPolicy() = default;
573
  virtual void test(ObRetryParam &v) const override
574
  {
575
    int ret = OB_SUCCESS;
576
    // As DDL is not reentranable in OceanBase, we have to retry those SQL issued by DDL in place
577
    // is_user_session=true: create table t1 as select ...
578
    // is_ddl=true: create index idx1 on ...
579
    if (v.session_.is_user_session() || v.session_.get_ddl_info().is_ddl()) {
580
      v.no_more_test_ = true;
581
      v.retry_type_ = RETRY_TYPE_LOCAL;
582
    } else {
583
      v.no_more_test_ = true;
584
      v.retry_type_ = RETRY_TYPE_NONE;
585
      v.client_ret_ = v.err_;
586
    }
587
  }
588
};
589

590
class ObInnerLockRowConflictRetryPolicy : public ObRetryPolicy
591
{
592
public:
593
  ObInnerLockRowConflictRetryPolicy() = default;
594
  ~ObInnerLockRowConflictRetryPolicy() = default;
595
  virtual void test(ObRetryParam &v) const override
596
  {
597
    // sql which in pl will local retry first. see ObInnerSQLConnection::process_retry.
598
    // sql which not in pl use the same strategy to avoid never getting the lock.
599
    if (v.is_from_pl_) {
600
      if (v.local_retry_times_ <= 1 ||
601
          !v.session_.get_pl_can_retry() ||
602
          ObSQLUtils::is_in_autonomous_block(v.session_.get_cur_exec_ctx())) {
603
        v.no_more_test_ = true;
604
        v.retry_type_ = RETRY_TYPE_LOCAL;
605
      } else {
606
        v.no_more_test_ = true;
607
        v.retry_type_ = RETRY_TYPE_NONE;
608
        v.client_ret_ = v.err_;
609
      }
610
    } else {
611
      // for DDL etc
612
      v.no_more_test_ = true;
613
      v.retry_type_ = RETRY_TYPE_LOCAL;
614
    }
615
  }
616
};
617

618
class ObInnerBeforeRetryCheckPolicy: public ObRetryPolicy
619
{
620
public:
621
  ObInnerBeforeRetryCheckPolicy() = default;
622
  ~ObInnerBeforeRetryCheckPolicy() = default;
623
  virtual void test(ObRetryParam &v) const override
624
  {
625
    int ret = OB_SUCCESS;
626
    // nested transaction already supported In 32x and can only rollback nested sql.
627
    // for forigen key, we keep old logic and do not retry. for pl will retry current nested sql.
628
    if (is_nested_conn(v) && !is_static_engine_retry(v.err_) && !v.is_from_pl_) {
629
      // right now, top session will retry, bug we can do something here like refresh XXX cache.
630
      // in future, nested session can retry if nested transaction is supported.
631
      v.no_more_test_ = true;
632
      v.retry_type_ = RETRY_TYPE_NONE;
633
      v.client_ret_ = v.err_;
634
    } else if (v.session_.is_terminate(ret)) {
635
      v.no_more_test_ = true;
636
      v.retry_type_ = RETRY_TYPE_NONE;
637
      // In the kill client session scenario, the server session will be marked
638
      // with the SESSION_KILLED mark. In the retry scenario, there will be an error
639
      // code covering 5066, so the judgment logic is added here.
640
      if (ret == OB_ERR_SESSION_INTERRUPTED && v.err_ == OB_ERR_KILL_CLIENT_SESSION) {
641
        v.client_ret_ = v.err_;
642
      } else{
643
        v.client_ret_ = ret; // session terminated
644
      }
645
      LOG_WARN("execution was terminated", K(ret), K(v.client_ret_), K(v.err_));
646
    } else if (THIS_WORKER.is_timeout()) {
647
      v.no_more_test_ = true;
648
      v.retry_type_ = RETRY_TYPE_NONE;
649
      v.client_ret_ = OB_TIMEOUT;
650
    }
651
  }
652
private:
653
  //is_nested_conn means this connection is triggered by a foreign key or PL object
654
  //because session and ObExecContext are linked in SQL engine,
655
  //in the inner sql connection stage,
656
  //the ObExecContext belonging to the current inner connection has not been linked to session.
657
  //for nested SQL, the ObExecContext on the current session belongs to the parent statement
658
  //so session.cur_exec_ctx_ is the parent ctx of the nested SQL
659
  bool is_nested_conn(ObRetryParam &v) const
660
  {
661
    ObExecContext *parent_ctx = v.session_.get_cur_exec_ctx();
662
    bool is_pl_nested = (parent_ctx != nullptr
663
                         && ObStmt::is_dml_stmt(parent_ctx->get_sql_ctx()->stmt_type_)
664
                         && parent_ctx->get_pl_stack_ctx() != nullptr
665
                         && !parent_ctx->get_pl_stack_ctx()->in_autonomous());
666
    bool is_fk_nested = (parent_ctx != nullptr && parent_ctx->get_das_ctx().is_fk_cascading_);
667
    bool is_online_stat_gathering_nested = (parent_ctx != nullptr && parent_ctx->is_online_stats_gathering());
668
    return is_pl_nested || is_fk_nested || is_online_stat_gathering_nested;
669
  }
670
};
671

672
class ObAutoincCacheNotEqualRetryPolicy: public ObRetryPolicy
673
{
674
public:
675
  ObAutoincCacheNotEqualRetryPolicy() = default;
676
  ~ObAutoincCacheNotEqualRetryPolicy() = default;
677
  virtual void test(ObRetryParam &v) const override
678
  {
679
    if (v.stmt_retry_times_ < ObQueryRetryCtrl::MAX_SCHEMA_ERROR_LOCAL_RETRY_TIMES) {
680
      v.retry_type_ = RETRY_TYPE_LOCAL;
681
    } else {
682
      try_packet_retry(v);
683
    }
684
  }
685
};
686

687

688
////////// end of policies ////////////
689

690

691

692
void ObQueryRetryCtrl::px_thread_not_enough_proc(ObRetryParam &v)
693
{
694
  ObRetryObject retry_obj(v);
695
  ObPxThreadNotEnoughRetryPolicy thread_not_enough;
696
  retry_obj.test(thread_not_enough);
697
}
698

699
void ObQueryRetryCtrl::trx_set_violation_proc(ObRetryParam &v)
700
{
701
  ObRetryObject retry_obj(v);
702
  ObTrxSetViolationRetryPolicy trx_violation;
703
  ObCommonRetryLinearShortWaitPolicy retry_short_wait;
704
  retry_obj.test(trx_violation).test(retry_short_wait);
705
}
706

707
void ObQueryRetryCtrl::trx_can_not_serialize_proc(ObRetryParam &v)
708
{
709
  ObRetryObject retry_obj(v);
710
  ObTrxCannotSerializeRetryPolicy trx_cannot_serialize;
711
  retry_obj.test(trx_cannot_serialize);
712
}
713

714
void ObQueryRetryCtrl::try_lock_row_conflict_proc(ObRetryParam &v)
715
{
716
  ObRetryObject retry_obj(v);
717
  ObLockRowConflictRetryPolicy lock_conflict;
718
  retry_obj.test(lock_conflict);
719
}
720

721

722
void ObQueryRetryCtrl::location_error_proc(ObRetryParam &v)
723
{
724
  ObRetryObject retry_obj(v);
725
  ObFastFailRetryPolicy fast_fail;
726
  ObCommonRetryIndexLongWaitPolicy retry_long_wait;
727
  retry_obj.test(fast_fail).test(retry_long_wait);
728

729
  if (RETRY_TYPE_LOCAL == v.retry_type_) {
730
    ObRefreshLocationCacheBlockPolicy block_refresh; // FIXME: why block?
731
    retry_obj.test(block_refresh);
732
  } else {
733
    ObRefreshLocationCacheNonblockPolicy nonblock_refresh;
734
    retry_obj.test(nonblock_refresh);
735
  }
736
}
737

738
void ObQueryRetryCtrl::nonblock_location_error_proc(ObRetryParam &v)
739
{
740
  ObRetryObject retry_obj(v);
741
  ObFastFailRetryPolicy fast_fail;
742
  ObCommonRetryIndexLongWaitPolicy retry_long_wait;
743
  ObRefreshLocationCacheNonblockPolicy nonblock_refresh;
744
  retry_obj.test(fast_fail).test(retry_long_wait).test(nonblock_refresh);
745
}
746

747
void ObQueryRetryCtrl::location_error_nothing_readable_proc(ObRetryParam &v)
748
{
749
  // 强一致性读的情况,主不可读了,有可能是invalid servers将主过滤掉了。
750
  // 弱一致性读的情况,没有副本可以选择了,有可能是invalid servers将所有副本都过滤掉了。
751
  // 为了更好地处理主短暂地断网的情况,将retry info清空(主要是invalid servers清空,
752
  // 但是还是要保持inited的状态以便通过防御性检查,所以不能调reset,而是要调clear),然后再重试。
753
  v.session_.get_retry_info_for_update().clear();
754
  location_error_proc(v);
755
}
756

757
void ObQueryRetryCtrl::peer_server_status_uncertain_proc(ObRetryParam &v)
758
{
759
  ObRetryObject retry_obj(v);
760
  ObFastFailRetryPolicy fast_fail;
761
  ObRefreshLocationCacheNonblockPolicy nonblock_refresh;
762
  ObDMLPeerServerStateUncertainPolicy check_dml; // will abort check if dml has remote trans
763
  ObCommonRetryIndexLongWaitPolicy retry_long_wait;
764
  retry_obj.test(fast_fail).test(nonblock_refresh).test(check_dml).test(retry_long_wait);
765
}
766

767
void ObQueryRetryCtrl::schema_error_proc(ObRetryParam &v)
768
{
769
  ObRetryObject retry_obj(v);
770
  ObCheckSchemaUpdatePolicy schema_update_policy;
771
  retry_obj.test(schema_update_policy);
772
}
773

774
void ObQueryRetryCtrl::autoinc_cache_not_equal_retry_proc(ObRetryParam &v)
775
{
776
  ObRetryObject retry_obj(v);
777
  ObAutoincCacheNotEqualRetryPolicy autoinc_retry_policy;
778
  ObCommonRetryLinearShortWaitPolicy retry_short_wait;
779
  retry_obj.test(autoinc_retry_policy).test(retry_short_wait);
780
}
781

782
void ObQueryRetryCtrl::snapshot_discard_proc(ObRetryParam &v)
783
{
784
  if (ObQueryRetryCtrl::is_isolation_RR_or_SE(v.session_.get_tx_isolation())) {
785
    // see:
786
    v.client_ret_ = v.err_;
787
    v.retry_type_ = RETRY_TYPE_NONE;
788
    LOG_WARN_RET(v.client_ret_, "snapshot discarded in serializable isolation should not retry", K(v));
789
  } else {
790
    // 读到落后太多的备机或者正在回放日志的副本了
791
    // 副本不可读类型的错误最多在本线程重试1次。
792
    const int64_t MAX_DATA_NOT_READABLE_ERROR_LOCAL_RETRY_TIMES = 1;
793
    if (v.stmt_retry_times_ < MAX_DATA_NOT_READABLE_ERROR_LOCAL_RETRY_TIMES) {
794
      v.retry_type_ = RETRY_TYPE_LOCAL;
795
    } else {
796
      ObRetryObject retry_obj(v);
797
      ObCommonRetryNoWaitPolicy no_wait_retry;
798
      retry_obj.test(no_wait_retry);
799
    }
800
  }
801
}
802

803
void ObQueryRetryCtrl::long_wait_retry_proc(ObRetryParam &v)
804
{
805
  ObRetryObject retry_obj(v);
806
  ObCommonRetryIndexLongWaitPolicy long_wait_retry;
807
  retry_obj.test(long_wait_retry);
808
}
809

810
void ObQueryRetryCtrl::short_wait_retry_proc(ObRetryParam &v)
811
{
812
  ObRetryObject retry_obj(v);
813
  ObCommonRetryLinearShortWaitPolicy short_wait_retry;
814
  retry_obj.test(short_wait_retry);
815
}
816

817
void ObQueryRetryCtrl::force_local_retry_proc(ObRetryParam &v)
818
{
819
  ObRetryObject retry_obj(v);
820
  ObForceLocalRetryPolicy force_local_retry;
821
  retry_obj.test(force_local_retry);
822
}
823

824
void ObQueryRetryCtrl::batch_execute_opt_retry_proc(ObRetryParam &v)
825
{
826
  ObRetryObject retry_obj(v);
827
  ObBatchExecOptRetryPolicy batch_opt_retry;
828
  retry_obj.test(batch_opt_retry);
829
}
830

831
void ObQueryRetryCtrl::switch_consumer_group_retry_proc(ObRetryParam &v)
832
{
833
  ObRetryObject retry_obj(v);
834
  ObSwitchConsumerGroupRetryPolicy switch_group_retry;
835
  retry_obj.test(switch_group_retry);
836
}
837

838
void ObQueryRetryCtrl::timeout_proc(ObRetryParam &v)
839
{
840
#ifdef OB_BUILD_SPM
841
  if (OB_UNLIKELY(v.err_ == OB_TIMEOUT &&
842
                  ObSpmCacheCtx::STAT_FIRST_EXECUTE_PLAN == v.ctx_.spm_ctx_.spm_stat_ &&
843
                  v.ctx_.spm_ctx_.need_spm_timeout_)) {
844
    const_cast<ObSqlCtx &>(v.ctx_).spm_ctx_.spm_stat_ = ObSpmCacheCtx::STAT_FALLBACK_EXECUTE_PLAN;
845
    const_cast<ObSqlCtx &>(v.ctx_).spm_ctx_.need_spm_timeout_ = false;
846
    ObRetryObject retry_obj(v);
847
    ObForceLocalRetryPolicy force_local_retry;
848
    retry_obj.test(force_local_retry);
849
  } else if (is_try_lock_row_err(v.session_.get_retry_info().get_last_query_retry_err())) {
850
    v.client_ret_ = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
851
    v.retry_type_ = RETRY_TYPE_NONE;
852
  }
853
#else
854
  if (is_try_lock_row_err(v.session_.get_retry_info().get_last_query_retry_err())) {
855
    v.client_ret_ = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
856
    v.retry_type_ = RETRY_TYPE_NONE;
857
  }
858
#endif
859
}
860

861
/////// For inner SQL only ///////////////
862
void ObQueryRetryCtrl::inner_try_lock_row_conflict_proc(ObRetryParam &v)
863
{
864
  ObRetryObject retry_obj(v);
865
  ObInnerLockRowConflictRetryPolicy lock_conflict;
866
  retry_obj.test(lock_conflict);
867
}
868

869
void ObQueryRetryCtrl::inner_table_location_error_proc(ObRetryParam &v)
870
{
871
  // 这种情况一般是内部sql执行的时候获取不到location,可能是宕机,
872
  // 这里涉及到的是内部表,刷新本sql查询的表的location cache没有意义,因此不刷新。
873
  ObRetryObject retry_obj(v);
874
  ObCommonRetryIndexLongWaitPolicy retry_long_wait;
875
  retry_obj.test(retry_long_wait);
876
}
877

878
void ObQueryRetryCtrl::inner_location_error_proc(ObRetryParam &v)
879
{
880
  const uint64_t *trace_id = ObCurTraceId::get();
881
  bool sql_trigger_by_user_req = (NULL != trace_id && 0 != trace_id[0] && 0 != trace_id[1]);
882
  ObRetryObject retry_obj(v);
883
  ObCheckTenantStatusPolicy check_tenant;
884
  ObRefreshLocationCacheBlockPolicy block_refresh;
885
  retry_obj.test(check_tenant);
886
  if (true == v.no_more_test_) {
887
    // case1: tenant status is abnormal, do not retry
888
  } else if (v.session_.get_ddl_info().is_ddl()) {
889
    // case2: inner sql ddl need retry (add by shuangcan.yjw)
890
    ObCommonRetryIndexLongWaitPolicy retry_long_wait;
891
    retry_obj.test(retry_long_wait).test(block_refresh);
892
  } else if (sql_trigger_by_user_req) {
893
    // case3: sql trigger by user request, e.g. PL
894
    ObFastFailRetryPolicy fast_fail; // only enable fast fail for user triggered req
895
    ObCommonRetryLinearShortWaitPolicy short_wait_retry;
896
    retry_obj.test(fast_fail).test(short_wait_retry).test(block_refresh);
897
  } else {
898
    // case 4: do nothing for other inner sql
899
    empty_proc(v);
900
  }
901
}
902

903
void ObQueryRetryCtrl::inner_location_error_nothing_readable_proc(ObRetryParam &v)
904
{
905
  // 强一致性读的情况,主不可读了,有可能是invalid servers将主过滤掉了。
906
  // 弱一致性读的情况,没有副本可以选择了,有可能是invalid servers将所有副本都过滤掉了。
907
  // 为了更好地处理主短暂地断网的情况,将retry info清空(主要是invalid servers清空,
908
  // 但是还是要保持inited的状态以便通过防御性检查,所以不能调reset,而是要调clear),然后再重试。
909
  v.session_.get_retry_info_for_update().clear();
910
  inner_location_error_proc(v);
911
}
912

913
void ObQueryRetryCtrl::inner_common_schema_error_proc(ObRetryParam &v)
914
{
915
  ObRetryObject retry_obj(v);
916
  ObInnerCommonCheckSchemaPolicy common_schema_policy;
917
  retry_obj.test(common_schema_policy);
918
}
919

920

921
void ObQueryRetryCtrl::inner_schema_error_proc(ObRetryParam &v)
922
{
923
  ObRetryObject retry_obj(v);
924
  ObInnerCommonCheckSchemaPolicy common_schema_policy;
925
  ObInnerCheckSchemaPolicy schema_policy;
926
  retry_obj.test(common_schema_policy).test(schema_policy);
927
}
928

929
void ObQueryRetryCtrl::inner_peer_server_status_uncertain_proc(ObRetryParam &v)
930
{
931
  ObRetryObject retry_obj(v);
932
  if (v.session_.get_ddl_info().is_ddl()) {
933
    ObFastFailRetryPolicy fast_fail;
934
    ObCommonRetryLinearShortWaitPolicy short_wait_retry;
935
    retry_obj.test(fast_fail).test(short_wait_retry);
936
  } else {
937
    empty_proc(v);
938
  }
939
}
940

941
/////// system defined common func /////////////
942

943
void ObQueryRetryCtrl::empty_proc(ObRetryParam &v)
944
{
945
  // 根据"给用户返回导致不重试的最后一个错误码"的原则,
946
  // 这里是err不在重试错误码列表中的情况,需要将client_ret设置为相应的值
947
  v.client_ret_ = v.err_;
948
  v.retry_type_ = RETRY_TYPE_NONE;
949
  if (OB_ERR_PROXY_REROUTE != v.client_ret_) {
950
    LOG_DEBUG("no retry handler for this err code, no need retry", K(v),
951
             K(THIS_WORKER.get_timeout_ts()), K(v.result_.get_stmt_type()),
952
             K(v.session_.get_retry_info().get_last_query_retry_err()));
953
  }
954
}
955

956
void ObQueryRetryCtrl::before_func(ObRetryParam &v)
957
{
958
  if (OB_UNLIKELY(v.is_inner_sql_)) {
959
    ObRetryObject retry_obj(v);
960
    ObInnerBeforeRetryCheckPolicy before_retry;
961
    retry_obj.test(before_retry);
962
  } else {
963
    ObRetryObject retry_obj(v);
964
    ObBeforeRetryCheckPolicy before_retry;
965
    ObStmtTypeRetryPolicy check_stmt_type;
966
    retry_obj.test(before_retry).test(check_stmt_type);
967
  }
968
}
969

970
void ObQueryRetryCtrl::after_func(ObRetryParam &v)
971
{
972
  if (OB_TRY_LOCK_ROW_CONFLICT == v.client_ret_
973
        || OB_ERR_PROXY_REROUTE == v.client_ret_
974
        || (v.is_from_pl_ && OB_READ_NOTHING == v.client_ret_)) {
975
    //锁冲突不打印了,避免日志刷屏
976
    // 二次路由不打印
977
    // PL 里面的 OB_READ_NOTHING 不打印日志
978
  } else {
979
    LOG_WARN_RET(v.client_ret_, "[RETRY] check if need retry", K(v), "need_retry", RETRY_TYPE_NONE != v.retry_type_);
980
  }
981
  if (RETRY_TYPE_NONE != v.retry_type_) {
982
    v.session_.get_retry_info_for_update().set_last_query_retry_err(v.err_);
983
    v.session_.get_retry_info_for_update().inc_retry_cnt();
984
    if (OB_UNLIKELY(v.err_ != v.client_ret_)) {
985
      LOG_ERROR_RET(OB_ERR_UNEXPECTED, "when need retry, v.client_ret_ must be equal to err", K(v));
986
    }
987
  }
988
  if (OB_UNLIKELY(OB_SUCCESS == v.client_ret_)) {
989
    LOG_ERROR_RET(OB_ERR_UNEXPECTED, "no matter need retry or not, v.client_ret_ should not be OB_SUCCESS", K(v));
990
  }
991
}
992

993
int ObQueryRetryCtrl::init()
994
{
995
  int ret = OB_SUCCESS;
996

997
  OX(map_.create(8192, "RetryCtrl", "RetryCtrl"));
998

999
  // Macro parameters:
1000
  //  tag: unused, just for organization
1001
  //  r: error code
1002
  //  func: processor for obmp* query
1003
  //  inner_func: processor for inner connection query
1004
  //  das_func: processor for DAS task retry
1005
#ifndef ERR_RETRY_FUNC
1006
#define ERR_RETRY_FUNC(tag, r, func, inner_func, das_func) \
1007
  if (OB_SUCC(ret)) { \
1008
    if (OB_SUCCESS != (ret = map_.set_refactored(r, RetryFuncs(func, inner_func, das_func)))) { \
1009
      LOG_ERROR("Duplicated error code registered", "code", #r, KR(ret)); \
1010
    } \
1011
  }
1012
#endif
1013

1014
  // register your error code retry handler here, no order required
1015
  /* schema */
1016
  ERR_RETRY_FUNC("SCHEMA",   OB_SCHEMA_ERROR,                    schema_error_proc,          empty_proc,                                           nullptr);
1017
  ERR_RETRY_FUNC("SCHEMA",   OB_TENANT_EXIST,                    schema_error_proc,          empty_proc,                                           nullptr);
1018
  ERR_RETRY_FUNC("SCHEMA",   OB_TENANT_NOT_EXIST,                schema_error_proc,          empty_proc,                                           nullptr);
1019
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_BAD_DATABASE,                schema_error_proc,          empty_proc,                                           nullptr);
1020
  ERR_RETRY_FUNC("SCHEMA",   OB_DATABASE_EXIST,                  schema_error_proc,          empty_proc,                                           nullptr);
1021
  ERR_RETRY_FUNC("SCHEMA",   OB_TABLEGROUP_NOT_EXIST,            schema_error_proc,          empty_proc,                                           nullptr);
1022
  ERR_RETRY_FUNC("SCHEMA",   OB_TABLEGROUP_EXIST,                schema_error_proc,          empty_proc,                                           nullptr);
1023
  ERR_RETRY_FUNC("SCHEMA",   OB_TABLE_NOT_EXIST,                 schema_error_proc,          inner_common_schema_error_proc,                       nullptr);
1024
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_TABLE_EXIST,                 schema_error_proc,          empty_proc,                                           nullptr);
1025
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_BAD_FIELD_ERROR,             schema_error_proc,          empty_proc,                                           nullptr);
1026
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_COLUMN_DUPLICATE,            schema_error_proc,          empty_proc,                                           nullptr);
1027
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_USER_EXIST,                  schema_error_proc,          empty_proc,                                           nullptr);
1028
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_USER_NOT_EXIST,              schema_error_proc,          empty_proc,                                           nullptr);
1029
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_NO_PRIVILEGE,                schema_error_proc,          empty_proc,                                           nullptr);
1030
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_NO_DB_PRIVILEGE,             schema_error_proc,          empty_proc,                                           nullptr);
1031
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_NO_TABLE_PRIVILEGE,          schema_error_proc,          empty_proc,                                           nullptr);
1032
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH,  schema_error_proc,          inner_schema_error_proc,                              nullptr);
1033
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_REMOTE_SCHEMA_NOT_FULL,      schema_error_proc,          inner_schema_error_proc,                              nullptr);
1034
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_SP_ALREADY_EXISTS,           schema_error_proc,          empty_proc,                                           nullptr);
1035
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_SP_DOES_NOT_EXIST,           schema_error_proc,          empty_proc,                                           nullptr);
1036
  ERR_RETRY_FUNC("SCHEMA",   OB_OBJECT_NAME_NOT_EXIST,           schema_error_proc,          empty_proc,                                           nullptr);
1037
  ERR_RETRY_FUNC("SCHEMA",   OB_OBJECT_NAME_EXIST,               schema_error_proc,          empty_proc,                                           nullptr);
1038
  ERR_RETRY_FUNC("SCHEMA",   OB_SCHEMA_EAGAIN,                   schema_error_proc,          inner_schema_error_proc,                              nullptr);
1039
  ERR_RETRY_FUNC("SCHEMA",   OB_SCHEMA_NOT_UPTODATE,             schema_error_proc,          inner_schema_error_proc,                              nullptr);
1040
  ERR_RETRY_FUNC("SCHEMA",   OB_ERR_PARALLEL_DDL_CONFLICT,       schema_error_proc,          inner_schema_error_proc,                              nullptr);
1041
  ERR_RETRY_FUNC("SCHEMA",   OB_AUTOINC_CACHE_NOT_EQUAL,         autoinc_cache_not_equal_retry_proc, autoinc_cache_not_equal_retry_proc, nullptr);
1042

1043
  /* location */
1044
  ERR_RETRY_FUNC("LOCATION", OB_LOCATION_LEADER_NOT_EXIST,       location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1045
  ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_LEADER_NOT_EXIST,    location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1046
  ERR_RETRY_FUNC("LOCATION", OB_NO_READABLE_REPLICA,             location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc);
1047
  ERR_RETRY_FUNC("LOCATION", OB_NOT_MASTER,                      location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1048
  ERR_RETRY_FUNC("LOCATION", OB_RS_NOT_MASTER,                   location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1049
  ERR_RETRY_FUNC("LOCATION", OB_RS_SHUTDOWN,                     location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1050
  ERR_RETRY_FUNC("LOCATION", OB_PARTITION_NOT_EXIST,             location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1051
  ERR_RETRY_FUNC("LOCATION", OB_LOCATION_NOT_EXIST,              location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1052
  ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_STOPPED,            location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1053
  ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_INIT,                  location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1054
  ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_STOPPING,              location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1055
  ERR_RETRY_FUNC("LOCATION", OB_TENANT_NOT_IN_SERVER,            location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1056
  ERR_RETRY_FUNC("LOCATION", OB_TRANS_RPC_TIMEOUT,               location_error_proc,        inner_location_error_proc,                            nullptr);
1057
  ERR_RETRY_FUNC("LOCATION", OB_USE_DUP_FOLLOW_AFTER_DML,        location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1058
  ERR_RETRY_FUNC("LOCATION", OB_TRANS_STMT_NEED_RETRY,           location_error_proc,        inner_location_error_proc,                            nullptr);
1059
  ERR_RETRY_FUNC("LOCATION", OB_LS_NOT_EXIST,                    location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1060
  // OB_TABLET_NOT_EXIST may be caused by old version schema or incorrect location.
1061
  // Just use location_error_proc to retry sql and a new schema guard will be obtained during the retry process.
1062
  ERR_RETRY_FUNC("LOCATION", OB_TABLET_NOT_EXIST,                location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_not_exist_retry_proc);
1063
  ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_NOT_EXIST,           location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1064
  ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_BLOCKED,            location_error_proc,        inner_location_error_proc,                            ObDASRetryCtrl::tablet_location_retry_proc);
1065
  ERR_RETRY_FUNC("LOCATION", OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST, location_error_proc,inner_location_error_proc,                            ObDASRetryCtrl::tablet_not_exist_retry_proc);
1066

1067
  ERR_RETRY_FUNC("LOCATION", OB_GET_LOCATION_TIME_OUT,           location_error_proc,        inner_table_location_error_proc,                      ObDASRetryCtrl::tablet_location_retry_proc);
1068

1069

1070

1071
  /* network */
1072
  ERR_RETRY_FUNC("NETWORK",  OB_RPC_CONNECT_ERROR,               peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc,       ObDASRetryCtrl::task_network_retry_proc);
1073
  ERR_RETRY_FUNC("NETWORK",  OB_RPC_SEND_ERROR,                  peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc,       ObDASRetryCtrl::task_network_retry_proc);
1074
  ERR_RETRY_FUNC("NETWORK",  OB_RPC_POST_ERROR,                  peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc,       ObDASRetryCtrl::task_network_retry_proc);
1075

1076
  /* storage */
1077
  ERR_RETRY_FUNC("STORAGE",  OB_SNAPSHOT_DISCARDED,              snapshot_discard_proc,         short_wait_retry_proc,                             nullptr);
1078
  ERR_RETRY_FUNC("STORAGE",  OB_DATA_NOT_UPTODATE,               long_wait_retry_proc,          short_wait_retry_proc,                             nullptr);
1079
  ERR_RETRY_FUNC("STORAGE",  OB_REPLICA_NOT_READABLE,            long_wait_retry_proc,          short_wait_retry_proc,                             ObDASRetryCtrl::tablet_nothing_readable_proc);
1080
  ERR_RETRY_FUNC("STORAGE",  OB_PARTITION_IS_SPLITTING,          short_wait_retry_proc,         short_wait_retry_proc,                             nullptr);
1081
  ERR_RETRY_FUNC("STORAGE",  OB_DISK_HUNG,                       nonblock_location_error_proc,  empty_proc,                                        nullptr);
1082

1083
  /* trx */
1084
  ERR_RETRY_FUNC("TRX",      OB_TRY_LOCK_ROW_CONFLICT,           try_lock_row_conflict_proc, inner_try_lock_row_conflict_proc,                     nullptr);
1085
  ERR_RETRY_FUNC("TRX",      OB_TRANSACTION_SET_VIOLATION,       trx_set_violation_proc,     trx_set_violation_proc,                               nullptr);
1086
  ERR_RETRY_FUNC("TRX",      OB_TRANS_CANNOT_SERIALIZE,          trx_can_not_serialize_proc, trx_can_not_serialize_proc,                           nullptr);
1087
  ERR_RETRY_FUNC("TRX",      OB_GTS_NOT_READY,                   short_wait_retry_proc,      short_wait_retry_proc,                                nullptr);
1088
  ERR_RETRY_FUNC("TRX",      OB_GTI_NOT_READY,                   short_wait_retry_proc,      short_wait_retry_proc,                                nullptr);
1089
  ERR_RETRY_FUNC("TRX",      OB_TRANS_WEAK_READ_VERSION_NOT_READY, short_wait_retry_proc,    short_wait_retry_proc,                                nullptr);
1090
  ERR_RETRY_FUNC("TRX",      OB_SEQ_NO_REORDER_UNDER_PDML,       short_wait_retry_proc,      short_wait_retry_proc,                                nullptr);
1091

1092
  /* sql */
1093
  ERR_RETRY_FUNC("SQL",      OB_ERR_INSUFFICIENT_PX_WORKER,      px_thread_not_enough_proc,  short_wait_retry_proc,                                nullptr);
1094
  // create a new interval part when inserting a row which has no matched part,
1095
  // wait and retry, will see new part
1096
  ERR_RETRY_FUNC("SQL",      OB_NO_PARTITION_FOR_INTERVAL_PART,  short_wait_retry_proc,             short_wait_retry_proc,                         nullptr);
1097
  ERR_RETRY_FUNC("SQL",      OB_BATCHED_MULTI_STMT_ROLLBACK,     batch_execute_opt_retry_proc,      batch_execute_opt_retry_proc,                  nullptr);
1098
  ERR_RETRY_FUNC("SQL",      OB_SQL_RETRY_SPM,                   force_local_retry_proc,            force_local_retry_proc,                        nullptr);
1099
  ERR_RETRY_FUNC("SQL",      OB_NEED_SWITCH_CONSUMER_GROUP,      switch_consumer_group_retry_proc,  empty_proc,                                    nullptr);
1100

1101
  /* timeout */
1102
  ERR_RETRY_FUNC("SQL",      OB_TIMEOUT,                         timeout_proc,                timeout_proc,                                        nullptr);
1103
  ERR_RETRY_FUNC("SQL",      OB_TRANS_TIMEOUT,                   timeout_proc,                timeout_proc,                                        nullptr);
1104
  ERR_RETRY_FUNC("SQL",      OB_TRANS_STMT_TIMEOUT,              timeout_proc,                timeout_proc,                                        nullptr);
1105

1106
  /* ddl */
1107

1108

1109
#undef ERR_RETRY_FUNC
1110
  return ret;
1111
}
1112

1113
void ObQueryRetryCtrl::destroy()
1114
{
1115
  // don't want to add a lock here
1116
  // must ensure calling destroy after all threads exit
1117
  map_.destroy();
1118
}
1119

1120
ObQueryRetryCtrl::ObQueryRetryCtrl()
1121
  : curr_query_tenant_local_schema_version_(0),
1122
    curr_query_tenant_global_schema_version_(0),
1123
    curr_query_sys_local_schema_version_(0),
1124
    curr_query_sys_global_schema_version_(0),
1125
    retry_times_(0),
1126
    retry_type_(RETRY_TYPE_NONE),
1127
    retry_err_code_(OB_SUCCESS)
1128
{
1129
}
1130

1131
ObQueryRetryCtrl::~ObQueryRetryCtrl()
1132
{
1133
}
1134

1135
int ObQueryRetryCtrl::get_das_retry_func(int err, ObDASRetryCtrl::retry_func &retry_func)
1136
{
1137
  int ret = OB_SUCCESS;
1138
  retry_func = nullptr;
1139
  RetryFuncs funcs;
1140
  if (OB_FAIL(map_.get_refactored(err, funcs))) {
1141
    if (OB_HASH_NOT_EXIST == ret) {
1142
      ret = OB_SUCCESS;
1143
    }
1144
  } else {
1145
    retry_func = funcs.element<2>();
1146
  }
1147
  return ret;
1148
}
1149

1150
int ObQueryRetryCtrl::get_func(int err, bool is_inner_sql, retry_func &func)
1151
{
1152
  int ret = OB_SUCCESS;
1153
  RetryFuncs funcs;
1154
  if (OB_FAIL(map_.get_refactored(err, funcs))) {
1155
    if (OB_HASH_NOT_EXIST == ret) {
1156
      func = empty_proc;
1157
      ret = OB_SUCCESS;
1158
    }
1159
  } else {
1160
    func = is_inner_sql ? funcs.element<1>() : funcs.element<0>();
1161
  }
1162
  return ret;
1163
}
1164

1165
void ObQueryRetryCtrl::test_and_save_retry_state(const ObGlobalContext &gctx,
1166
                                                 const ObSqlCtx &ctx,
1167
                                                 ObResultSet &result,
1168
                                                 int err,
1169
                                                 int &client_ret,
1170
                                                 bool force_local_retry,
1171
                                                 bool is_inner_sql,
1172
                                                 bool is_part_of_pl_sql)
1173
{
1174
  int ret = OB_SUCCESS;
1175
  client_ret = err;
1176
  retry_type_ = RETRY_TYPE_NONE;
1177
  retry_err_code_ = OB_SUCCESS;
1178
  retry_func func = nullptr;
1179
  ObSQLSessionInfo *session = result.get_exec_context().get_my_session();
1180
  if (OB_ISNULL(session)) {
1181
    // this is possible. #issue/43953721
1182
    LOG_WARN("session is null in exec_context. maybe OOM. don't retry", K(err));
1183
  } else if (OB_FAIL(get_func(err, is_inner_sql, func))) {
1184
    // note: if no err proc registered, a default handler
1185
    // 'empty_proc' is used as processor func
1186
    LOG_WARN("fail get retry func", K(err), K(ret));
1187
  } else if (OB_ISNULL(func)) {
1188
    client_ret = OB_ERR_UNEXPECTED;
1189
    LOG_WARN("invalid retry processor, no retry", K(err));
1190
  } else {
1191
    // you can't tell exact stmt retry times for a SQL in PL as PL may do whole block retry
1192
    // so we use retry_times_ as stmt_retry_times for any stmt in PL
1193
    // if pl + stmt_retry_times == 0 scene, will cause timeout early.
1194
    // So the number of retry times here is at least 1
1195
    const int64_t stmt_retry_times =
1196
        is_part_of_pl_sql ? (retry_times_ == 0 ? 1 : retry_times_):
1197
        session->get_retry_info().get_retry_cnt();
1198
    ObRetryParam retry_param(ctx, result, *session,
1199
                             curr_query_tenant_local_schema_version_,
1200
                             curr_query_tenant_global_schema_version_,
1201
                             curr_query_sys_local_schema_version_,
1202
                             curr_query_sys_global_schema_version_,
1203
                             force_local_retry,
1204
                             is_inner_sql,
1205
                             is_part_of_pl_sql,
1206
                             stmt_retry_times,
1207
                             retry_times_,
1208
                             err,
1209
                             retry_type_,
1210
                             client_ret);
1211
    // do some common checks in this hook, which is not bond to certain error code
1212
    ObQueryRetryCtrl::before_func(retry_param);
1213
    // this 'if' check is necessary, as direct call to func may override
1214
    // the decision made in 'before_func', which is not what you want.
1215
    if (!retry_param.no_more_test_) {
1216
      func(retry_param);
1217
    }
1218
    // always execute after func hook to set some states
1219
    ObQueryRetryCtrl::after_func(retry_param);
1220
  }
1221
  if (RETRY_TYPE_NONE != retry_type_) {
1222
    // this retry times only apply to current thread retry.
1223
    // reset to 0 after each packet retry
1224
    retry_times_++;
1225
  }
1226
  // xiaochu: I don't like the idea 'retry_err_code_', remove it later
1227
  if (RETRY_TYPE_NONE != retry_type_) {
1228
    retry_err_code_ = client_ret;
1229
  }
1230
  if (RETRY_TYPE_NONE != retry_type_) {
1231
    result.set_close_fail_callback([this](const int err, int &client_ret)-> void { this->on_close_resultset_fail_(err, client_ret); });
1232
  }
1233
}
1234

1235
void ObQueryRetryCtrl::on_close_resultset_fail_(const int err, int &client_ret)
1236
{
1237
  // some unretryable error happened in close result set phase
1238
  if (OB_SUCCESS != err && RETRY_TYPE_NONE != retry_type_) {
1239
    // the txn relative error in close stmt
1240
    // thses error will cause the txn must to be rollbacked
1241
    // and can not accept new request any more, so if retry
1242
    // current stmt, it must be failed, hence we cancel retry
1243
    if (OB_TRANS_NEED_ROLLBACK == err ||
1244
        OB_TRANS_INVALID_STATE == err ||
1245
        OB_TRANS_HAS_DECIDED == err) {
1246
      retry_type_ = RETRY_TYPE_NONE;
1247
      // also clear the packet retry
1248
      THIS_WORKER.unset_need_retry();
1249
      // rewrite the client error code
1250
      // when decide to cancel the retry, return an unretryable error
1251
      // is better, because it won't leak the internal error to user
1252
      client_ret = err;
1253
    }
1254
  }
1255
}
1256
}/* ns observer*/
1257
}/* ns oceanbase */
1258

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.