oceanbase

Форк
0
/
ob_disaster_recovery_task_mgr.cpp 
1507 строк · 53.1 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "ob_disaster_recovery_task_mgr.h"
16

17
#include "lib/lock/ob_mutex.h"
18
#include "lib/stat/ob_diagnose_info.h"
19
#include "lib/profile/ob_trace_id.h"
20
#include "lib/alloc/ob_malloc_allocator.h"
21
#include "share/ob_debug_sync.h"
22
#include "share/ob_srv_rpc_proxy.h"
23
#include "share/config/ob_server_config.h"
24
#include "ob_disaster_recovery_task_executor.h"
25
#include "rootserver/ob_root_balancer.h"
26
#include "ob_rs_event_history_table_operator.h"
27
#include "share/ob_rpc_struct.h"
28
#include "observer/ob_server_struct.h"
29
#include "sql/executor/ob_executor_rpc_proxy.h"
30
#include "rootserver/ob_disaster_recovery_task.h"    // for ObDRTaskType
31
#include "share/ob_share_util.h"                     // for ObShareUtil
32
#include "lib/lock/ob_tc_rwlock.h"                   // for common::RWLock
33
#include "rootserver/ob_disaster_recovery_task.h"
34
#include "rootserver/tenant_snapshot/ob_tenant_snapshot_util.h" // for ObTenantSnapshotUtil
35
#include "share/inner_table/ob_inner_table_schema_constants.h"
36
#include "share/ob_all_server_tracer.h"
37
#include "storage/tablelock/ob_lock_inner_connection_util.h" // for ObInnerConnectionLockUtil
38
#include "observer/ob_inner_sql_connection.h"
39

40
namespace oceanbase
41
{
42
using namespace common;
43
using namespace lib;
44
using namespace obrpc;
45
using namespace transaction::tablelock;
46
using namespace share;
47

48
namespace rootserver
49
{
50
ObDRTaskQueue::ObDRTaskQueue() : inited_(false),
51
                                 config_(nullptr),
52
                                 task_alloc_(),
53
                                 wait_list_(),
54
                                 schedule_list_(),
55
                                 task_map_(),
56
                                 rpc_proxy_(nullptr),
57
                                 priority_(ObDRTaskPriority::MAX_PRI)
58
{
59
}
60

61
//TODO@jingyu.cr: need to make clear resue() and reset()
62
ObDRTaskQueue::~ObDRTaskQueue()
63
{
64
  reuse();
65
}
66

67
void ObDRTaskQueue::reuse()
68
{
69
  while (!wait_list_.is_empty()) {
70
    ObDRTask *t = wait_list_.remove_first();
71
    remove_task_from_map_and_free_it_(t);
72
  }
73
  while (!schedule_list_.is_empty()) {
74
    ObDRTask *t = schedule_list_.remove_first();
75
    remove_task_from_map_and_free_it_(t);
76
  }
77
  task_map_.clear();
78
}
79

80
void ObDRTaskQueue::reset()
81
{
82
  wait_list_.reset();
83
  schedule_list_.reset();
84
  task_map_.clear();
85
}
86

87
void ObDRTaskQueue::free_task_(ObDRTask *&task)
88
{
89
  if (OB_NOT_NULL(task)) {
90
    task->~ObDRTask();
91
    task_alloc_.free(task);
92
    task = nullptr;
93
  }
94
}
95

96
void ObDRTaskQueue::remove_task_from_map_and_free_it_(ObDRTask *&task)
97
{
98
  if (OB_NOT_NULL(task)) {
99
    task_map_.erase_refactored(task->get_task_key());
100
    free_task_(task);
101
  }
102
}
103

104
int ObDRTaskQueue::init(
105
    common::ObServerConfig &config,
106
    const int64_t bucket_num,
107
    obrpc::ObSrvRpcProxy *rpc_proxy,
108
    ObDRTaskPriority priority)
109
{
110
  int ret = OB_SUCCESS;
111
  if (OB_UNLIKELY(inited_)) {
112
    ret = OB_INIT_TWICE;
113
    LOG_WARN("init twice", KR(ret));
114
  } else if (OB_UNLIKELY(bucket_num <= 0)
115
             || OB_ISNULL(rpc_proxy)
116
             || (ObDRTaskPriority::LOW_PRI != priority && ObDRTaskPriority::HIGH_PRI != priority)) {
117
    ret = OB_INVALID_ARGUMENT;
118
    LOG_WARN("invalid argument", KR(ret), K(bucket_num), KP(rpc_proxy), K(priority));
119
  } else if (OB_FAIL(task_map_.create(bucket_num, "DRTaskMap"))) {
120
    LOG_WARN("fail to create task map", KR(ret), K(bucket_num));
121
  } else if (OB_FAIL(task_alloc_.init(
122
          ObMallocAllocator::get_instance(), OB_MALLOC_MIDDLE_BLOCK_SIZE,
123
          ObMemAttr(common::OB_SERVER_TENANT_ID, "DRTaskAlloc")))) {
124
    LOG_WARN("fail to init task allocator", KR(ret));
125
  } else {
126
    config_ = &config;
127
    rpc_proxy_ = rpc_proxy;
128
    priority_ = priority;
129
    inited_ = true;
130
  }
131
  return ret;
132
}
133

134
int ObDRTaskQueue::check_task_in_scheduling(
135
    const ObDRTaskKey &task_key,
136
    bool &task_in_scheduling) const
137
{
138
  int ret = OB_SUCCESS;
139
  if (OB_UNLIKELY(!inited_)) {
140
    ret = OB_NOT_INIT;
141
    LOG_WARN("not init", KR(ret));
142
  } else if (OB_UNLIKELY(!task_key.is_valid())) {
143
    ret = OB_INVALID_ARGUMENT;
144
    LOG_WARN("invalid argument", KR(ret), K(task_key));
145
  } else {
146
    ObDRTask *task = nullptr;
147
    int tmp_ret = task_map_.get_refactored(task_key, task);
148
    if (OB_SUCCESS == tmp_ret) {
149
      if (OB_ISNULL(task)) {
150
        ret = OB_ERR_UNEXPECTED;
151
        LOG_WARN("a null task ptr getted from task_map", KR(ret), K(task_key));
152
      } else {
153
        task_in_scheduling = task->in_schedule();
154
      }
155
    } else if (OB_HASH_NOT_EXIST == tmp_ret) {
156
      // task not exist means task not executing
157
      task_in_scheduling = false;
158
    } else {
159
      ret = tmp_ret;
160
      LOG_WARN("fail to get from map", KR(ret), K(task_key));
161
    }
162
  }
163
  return ret;
164
}
165

166
int ObDRTaskQueue::check_task_exist(
167
    const ObDRTaskKey &task_key,
168
    bool &task_exist)
169
{
170
  int ret = OB_SUCCESS;
171
  if (OB_UNLIKELY(!inited_)) {
172
    ret = OB_NOT_INIT;
173
    LOG_WARN("not init", KR(ret));
174
  } else if (OB_UNLIKELY(!task_key.is_valid())) {
175
    ret = OB_INVALID_ARGUMENT;
176
    LOG_WARN("invalid argument", KR(ret), K(task_key));
177
  } else {
178
    ObDRTask *task = nullptr;
179
    int tmp_ret = task_map_.get_refactored(task_key, task);
180
    if (OB_SUCCESS == tmp_ret) {
181
      task_exist = true;
182
    } else if (OB_HASH_NOT_EXIST == tmp_ret) {
183
      task_exist = false;
184
    } else {
185
      ret = OB_ERR_UNEXPECTED;
186
      LOG_WARN("fail to get from task_map", KR(ret), K(task_key));
187
    }
188
  }
189
  return ret;
190
}
191

192
int ObDRTaskQueue::push_task_in_wait_list(
193
    ObDRTaskMgr &task_mgr,
194
    const ObDRTaskQueue &sibling_queue,
195
    const ObDRTask &task,
196
    bool &has_task_in_schedule)
197
{
198
  int ret = OB_SUCCESS;
199
  has_task_in_schedule = false;
200
  if (OB_UNLIKELY(!inited_)) {
201
    ret = OB_NOT_INIT;
202
    LOG_WARN("not init", KR(ret));
203
  } else {
204
    const ObDRTaskKey &task_key = task.get_task_key();
205
    ObDRTask *task_ptr = nullptr;
206
    int tmp_ret = task_map_.get_refactored(task_key, task_ptr);
207
    if (OB_HASH_NOT_EXIST == tmp_ret) {
208
      if (OB_FAIL(do_push_task_in_wait_list_(task_mgr, sibling_queue, task, has_task_in_schedule))) {
209
        LOG_WARN("fail to push back", KR(ret));
210
      }
211
    } else if (OB_SUCCESS == tmp_ret) {
212
      ret = OB_ENTRY_EXIST;
213
      LOG_INFO("disaster recovery task exist", KR(ret), K(task_key), K(task), KP(this));
214
    } else {
215
      ret = tmp_ret;
216
      LOG_WARN("fail to check task exist", KR(ret), K(task_key));
217
    }
218
  }
219
  return ret;
220
}
221

222
int ObDRTaskQueue::push_task_in_schedule_list(
223
    ObDRTask &task)
224
{
225
  // STEP 1: push task into schedule list
226
  // STEP 2: push task into task_map
227
  // STEP 3: set task in schedule
228
  int ret = OB_SUCCESS;
229
  void *raw_ptr = nullptr;
230
  ObDRTask *new_task = nullptr;
231

232
  if (OB_UNLIKELY(!inited_)) {
233
    ret = OB_NOT_INIT;
234
    LOG_WARN("not init", KR(ret));
235
  } else if (OB_ISNULL(raw_ptr = task_alloc_.alloc(task.get_clone_size()))) {
236
    ret = OB_ALLOCATE_MEMORY_FAILED;
237
    LOG_WARN("fail to allocate task", KR(ret));
238
  } else if (OB_FAIL(task.clone(raw_ptr, new_task))) {
239
    LOG_WARN("fail to clone task", KR(ret), K(task));
240
  } else if (OB_ISNULL(new_task)) {
241
    ret = OB_ERR_UNEXPECTED;
242
    LOG_WARN("new_task is nullptr", KR(ret));
243
  } else if (OB_FAIL(task_map_.set_refactored(new_task->get_task_key(), new_task))) {
244
    LOG_WARN("fail to set map", KR(ret), "task_key", new_task->get_task_key());
245
  } else {
246
    // set schedule_time for this task
247
    new_task->set_schedule();
248
    if (!schedule_list_.add_last(new_task)) {
249
      ret = OB_ERR_UNEXPECTED;
250
      LOG_WARN("fail to add task to schedule list", KR(ret), KPC(new_task));
251
    } else {
252
      FLOG_INFO("[DRTASK_NOTICE] finish add task into schedule list", KPC(new_task));
253
    }
254
  }
255

256
  if (OB_FAIL(ret)) {
257
    if (OB_NOT_NULL(new_task)) {
258
      remove_task_from_map_and_free_it_(new_task);
259
    } else if (OB_NOT_NULL(raw_ptr)) {
260
      task_alloc_.free(raw_ptr);
261
      raw_ptr = nullptr;
262
    }
263
  }
264
  return ret;
265
}
266

267
int ObDRTaskQueue::do_push_task_in_wait_list_(
268
    ObDRTaskMgr &task_mgr,
269
    const ObDRTaskQueue &sibling_queue,
270
    const ObDRTask &task,
271
    bool &has_task_in_schedule)
272
{
273
  int ret = OB_SUCCESS;
274
  void *raw_ptr = nullptr;
275
  ObDRTask *new_task = nullptr;
276
  if (OB_UNLIKELY(!inited_)) {
277
    ret = OB_NOT_INIT;
278
    LOG_WARN("not init", KR(ret));
279
  } else if (OB_ISNULL(config_)) {
280
    ret = OB_ERR_UNEXPECTED;
281
    LOG_WARN("config_ ptr is null", KR(ret), KP(config_));
282
  } else if (OB_ISNULL(raw_ptr = task_alloc_.alloc(
283
            task.get_clone_size()))) {
284
    ret = OB_ALLOCATE_MEMORY_FAILED;
285
    LOG_WARN("fail to alloc task", KR(ret), "size", task.get_clone_size());
286
  } else if (OB_FAIL(task.clone(raw_ptr, new_task))) {
287
    LOG_WARN("fail to clone task", KR(ret), K(task));
288
  } else if (OB_ISNULL(new_task)) {
289
    ret = OB_ERR_UNEXPECTED;
290
    LOG_WARN("new task ptr is null", KR(ret));
291
  } else if (!wait_list_.add_last(new_task)) {
292
    ret = OB_ERR_UNEXPECTED;
293
    LOG_WARN("fail to add new task to wait list", KR(ret), "task", *new_task);
294
  } else {
295
    has_task_in_schedule = false;
296
    bool sibling_in_schedule = false;
297
    if (OB_FAIL(sibling_queue.check_task_in_scheduling(
298
            new_task->get_task_key(), sibling_in_schedule))) {
299
      LOG_WARN("fail to check has in schedule task", KR(ret),
300
               "task_key", new_task->get_task_key());
301
    } else if (OB_FAIL(task_map_.set_refactored(
302
            new_task->get_task_key(), new_task))) {
303
      LOG_WARN("fail to set map", KR(ret), "task_key", new_task->get_task_key());
304
    } else if (task_mgr.get_reach_concurrency_limit() && !sibling_in_schedule) {
305
      task_mgr.clear_reach_concurrency_limit();
306
    }
307
    if (OB_SUCC(ret)) {
308
      has_task_in_schedule = sibling_in_schedule;
309
      if (OB_FAIL(set_sibling_in_schedule(new_task->get_task_key(), sibling_in_schedule))) {
310
        LOG_WARN("fail to set sibling in schedule", KR(ret),
311
                 "task_key", new_task->get_task_key(), K(sibling_in_schedule));
312
      }
313
    }
314

315
    if (OB_FAIL(ret)) {
316
      wait_list_.remove(new_task);
317
    } else {
318
      LOG_INFO("success to push a task in waiting list", K(task), K(has_task_in_schedule));
319
    }
320
  }
321

322
  if (OB_FAIL(ret)) {
323
    if (OB_NOT_NULL(new_task)) {
324
      remove_task_from_map_and_free_it_(new_task);
325
    } else if (OB_NOT_NULL(raw_ptr)) {
326
      task_alloc_.free(raw_ptr);
327
      raw_ptr = nullptr;
328
    }
329
  }
330
  return ret;
331
}
332

333
int ObDRTaskQueue::pop_task(
334
    ObDRTask *&task)
335
{
336
  int ret = OB_SUCCESS;
337
  task = nullptr;
338
  if (OB_UNLIKELY(!inited_)) {
339
    ret = OB_NOT_INIT;
340
    LOG_WARN("task queue not init", KR(ret));
341
  } else if (OB_ISNULL(config_)) {
342
    ret = OB_ERR_UNEXPECTED;
343
    LOG_WARN("config_ ptr is null", KR(ret), KP(config_));
344
  } else {
345
    DLIST_FOREACH(t, wait_list_) {
346
      if (t->is_sibling_in_schedule()) {
347
        // task can not pop
348
        LOG_INFO("can not pop this task because a sibling task already in schedule", KPC(t));
349
      } else {
350
        task = t;
351
        break;
352
      }
353
    }
354
    if (OB_NOT_NULL(task)) {
355
      // when task not empty, we move it from wait to schedule list,
356
      LOG_INFO("a task from queue to pop found", KPC(task));
357
      wait_list_.remove(task);
358
      if (!schedule_list_.add_last(task)) {
359
        ret = OB_ERR_UNEXPECTED;
360
        LOG_WARN("fail to add task to schedule list", KR(ret));
361
      } else {
362
        task->set_schedule();
363
        LOG_INFO("success to set task in schedule normally", KPC(task));
364
      }
365
      // if fail to add to schedule list, clean it directly
366
      if (OB_FAIL(ret)) {
367
        remove_task_from_map_and_free_it_(task);
368
      }
369
    }
370
  }
371
  return ret;
372
}
373

374
int ObDRTaskQueue::get_task(
375
    const share::ObTaskId &task_id,
376
    const ObDRTaskKey &task_key,
377
    ObDRTask *&task)
378
{
379
  int ret = OB_SUCCESS;
380
  if (OB_UNLIKELY(!inited_)) {
381
    ret = OB_NOT_INIT;
382
    LOG_WARN("not init", KR(ret));
383
  } else {
384
    ObDRTask *my_task = nullptr;
385
    int tmp_ret = task_map_.get_refactored(task_key, my_task);
386
    if (OB_HASH_NOT_EXIST == tmp_ret) {
387
      task = nullptr;
388
    } else if (OB_SUCCESS == tmp_ret) {
389
      if (OB_ISNULL(my_task)) {
390
        ret = OB_ERR_UNEXPECTED;
391
        LOG_WARN("my_task ptr is null", KR(ret), KP(my_task));
392
      } else if (my_task->get_task_id() == task_id) {
393
        task = my_task;
394
      } else {
395
        task = nullptr;
396
      }
397
    } else {
398
      ret = tmp_ret;
399
      LOG_WARN("fail to get task from map", KR(ret), K(task_key), K(task_id));
400
    }
401
  }
402
  return ret;
403
}
404

405
int ObDRTaskQueue::check_task_need_cleaning_(
406
    const ObDRTask &task,
407
    bool &need_cleanning,
408
    ObDRTaskRetComment &ret_comment)
409
{
410
  int ret = OB_SUCCESS;
411
  // do not clean this task by default
412
  // need_cleanning = true under these cases
413
  //   (1) server not exist
414
  //   (2) server is permanant offline
415
  //   (3) rpc ls_check_dr_task_exist successfully told us task not exist
416
  //   (4) task is timeout while any failure during whole procedure
417
  need_cleanning = false;
418
  Bool task_exist = false;
419
  const ObAddr &dst_server = task.get_dst_server();
420
  share::ObServerInfoInTable server_info;
421
  if (OB_UNLIKELY(!inited_)) {
422
    ret = OB_NOT_INIT;
423
    LOG_WARN("task queue not init", KR(ret));
424
  } else if (OB_ISNULL(rpc_proxy_)) {
425
    ret = OB_ERR_UNEXPECTED;
426
    LOG_WARN("some ptr is null", KR(ret), KP(rpc_proxy_));
427
  } else if (OB_FAIL(SVR_TRACER.get_server_info(dst_server, server_info))) {
428
    LOG_WARN("fail to get server_info", KR(ret), "server", dst_server);
429
     // case 1. server not exist
430
    if (OB_ENTRY_NOT_EXIST == ret) {
431
      ret = OB_SUCCESS;
432
      FLOG_INFO("the reason to clean this task: server not exist", K(task));
433
      need_cleanning = true;
434
      ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_SERVER_NOT_EXIST;
435
    }
436
  } else if (server_info.is_permanent_offline()) {
437
    // case 2. server is permanant offline
438
    FLOG_INFO("the reason to clean this task: server permanent offline", K(task), K(server_info));
439
    need_cleanning = true;
440
    ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_SERVER_PERMANENT_OFFLINE;
441
  } else if (server_info.is_alive()) {
442
    ObDRTaskExistArg arg;
443
    arg.task_id_ = task.get_task_id();
444
    arg.tenant_id_ = task.get_tenant_id();
445
    arg.ls_id_ = task.get_ls_id();
446
    if (OB_FAIL(rpc_proxy_->to(task.get_dst_server()).by(task.get_tenant_id())
447
                .ls_check_dr_task_exist(arg, task_exist))) {
448
      LOG_WARN("fail to check task exist", KR(ret), "tenant_id", task.get_tenant_id(),
449
               "task_id", task.get_task_id(), "dst", task.get_dst_server());
450
    } else if (!task_exist) {
451
      // case 3. rpc ls_check_dr_task_exist successfully told us task not exist
452
      FLOG_INFO("the reason to clean this task: task not running", K(task));
453
      need_cleanning = true;
454
      ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_TASK_NOT_RUNNING;
455
    }
456
  } else if (server_info.is_temporary_offline()) {
457
    ret = OB_SERVER_NOT_ALIVE;
458
    LOG_WARN("server status is not alive, task may be cleanned later", KR(ret), "server", task.get_dst_server(), K(server_info), K(task));
459
  } else {
460
    ret = OB_ERR_UNEXPECTED;
461
    LOG_WARN("unexpected server status", KR(ret), "server", task.get_dst_server(), K(server_info), K(task));
462
  }
463

464
  // case 4. task is timeout while any OB_FAIL occurs
465
  if (OB_FAIL(ret) && task.is_already_timeout()) {
466
    FLOG_INFO("the reason to clean this task: task is timeout", KR(ret), K(task));
467
    ret = OB_SUCCESS;
468
    need_cleanning = true;
469
    ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_TASK_TIMEOUT;
470
  }
471
  return ret;
472
}
473

474
int ObDRTaskQueue::handle_not_in_progress_task(
475
    ObDRTaskMgr &task_mgr)
476
{
477
  int ret = OB_SUCCESS;
478
  if (OB_UNLIKELY(!inited_)) {
479
    ret = OB_NOT_INIT;
480
    LOG_WARN("task queue not init", KR(ret));
481
  } else {
482
    const int ret_code = OB_LS_REPLICA_TASK_RESULT_UNCERTAIN;
483
    ObDRTaskRetComment ret_comment = ObDRTaskRetComment::MAX;
484
    bool need_cleaning = false;
485
    DLIST_FOREACH(t, schedule_list_) {
486
      int tmp_ret = OB_SUCCESS;
487
      need_cleaning = false;
488
      DEBUG_SYNC(BEFORE_CHECK_CLEAN_DRTASK);
489
      if (OB_SUCCESS != (tmp_ret = check_task_need_cleaning_(*t, need_cleaning, ret_comment))) {
490
        LOG_WARN("fail to check this task exist for cleaning", KR(tmp_ret), KPC(t));
491
      } else if (need_cleaning
492
                 && OB_SUCCESS != (tmp_ret = task_mgr.async_add_cleaning_task_to_updater(
493
                                         t->get_task_id(),
494
                                         t->get_task_key(),
495
                                         ret_code,
496
                                         true,/*need_record_event*/
497
                                         ret_comment))) {
498
        LOG_WARN("do execute over failed", KR(tmp_ret), KPC(t), K(ret_comment));
499
      }
500
    }
501
  }
502
  return ret;
503
}
504

505
int ObDRTaskQueue::finish_schedule(
506
    ObDRTask *task)
507
{
508
  int ret = OB_SUCCESS;
509
  if (OB_UNLIKELY(!inited_)) {
510
    ret = OB_NOT_INIT;
511
    LOG_WARN("task queue not init", KR(ret));
512
  } else if (OB_ISNULL(task)) {
513
    ret = OB_INVALID_ARGUMENT;
514
    LOG_WARN("invalid argument", KR(ret), KP(task));
515
  } else if (OB_UNLIKELY(!task->in_schedule())) {
516
    ret = OB_STATE_NOT_MATCH;
517
    LOG_WARN("task state not match", KR(ret), KPC(task));
518
  } else {
519
    // remove from schedule_list_
520
    schedule_list_.remove(task);
521
    FLOG_INFO("[DRTASK_NOTICE] success to finish schedule task", KR(ret), KPC(task));
522
    remove_task_from_map_and_free_it_(task);
523
  }
524
  return ret;
525
}
526

527
int ObDRTaskQueue::set_sibling_in_schedule(
528
    const ObDRTask &task,
529
    const bool in_schedule)
530
{
531
  int ret = OB_SUCCESS;
532
  if (OB_UNLIKELY(!inited_)) {
533
    ret = OB_NOT_INIT;
534
    LOG_WARN("not init", KR(ret));
535
  } else {
536
    const ObDRTaskKey &task_key = task.get_task_key();
537
    ObDRTask *my_task = nullptr;
538
    int tmp_ret = task_map_.get_refactored(task_key, my_task);
539
    if (OB_HASH_NOT_EXIST == tmp_ret) {
540
      // bypass
541
    } else if (OB_SUCCESS == tmp_ret) {
542
      if (OB_ISNULL(my_task)) {
543
        ret = OB_ERR_UNEXPECTED;
544
        LOG_WARN("my_task ptr is null", KR(ret), K(task));
545
      } else {
546
        my_task->set_sibling_in_schedule(in_schedule);
547
      }
548
    } else {
549
      ret = tmp_ret;
550
      LOG_WARN("fail to get task from map", KR(ret), K(task_key));
551
    }
552
  }
553
  return ret;
554
}
555

556
int ObDRTaskQueue::set_sibling_in_schedule(
557
    const ObDRTaskKey &task_key,
558
    const bool in_schedule)
559
{
560
  int ret = OB_SUCCESS;
561
  if (OB_UNLIKELY(!inited_)) {
562
    ret = OB_NOT_INIT;
563
    LOG_WARN("not init", KR(ret));
564
  } else if (OB_UNLIKELY(!task_key.is_valid())) {
565
    ret = OB_INVALID_ARGUMENT;
566
    LOG_WARN("invalid argument", KR(ret), K(task_key));
567
  } else {
568
    ObDRTask *my_task = nullptr;
569
    int tmp_ret = task_map_.get_refactored(task_key, my_task);
570
    if (OB_HASH_NOT_EXIST == tmp_ret) {
571
      // bypass
572
    } else if (OB_SUCCESS == tmp_ret) {
573
      if (OB_ISNULL(my_task)) {
574
        ret = OB_ERR_UNEXPECTED;
575
        LOG_WARN("my_task ptr is null", KR(ret), K(task_key), KP(my_task));
576
      } else {
577
        my_task->set_sibling_in_schedule(in_schedule);
578
      }
579
    } else {
580
      ret = tmp_ret;
581
      LOG_WARN("fail to get task from map", KR(ret), K(task_key));
582
    }
583
  }
584
  return ret;
585
}
586

587
int ObDRTaskQueue::dump_statistic() const
588
{
589
  int ret = OB_SUCCESS;
590
  if (OB_UNLIKELY(!inited_)) {
591
    ret = OB_NOT_INIT;
592
    LOG_WARN("not init", KR(ret));
593
  } else {
594
    DLIST_FOREACH(t, schedule_list_) {
595
      FLOG_INFO("[DRTASK_NOTICE] tasks in schedule list", "priority", get_priority_str(), "task_key", t->get_task_key(),
596
                "task_id", t->get_task_id(), "task_type", t->get_disaster_recovery_task_type());
597
    }
598
    DLIST_FOREACH(t, wait_list_) {
599
      FLOG_INFO("[DRTASK_NOTICE] tasks in wait list", "priority", get_priority_str(), "task_key", t->get_task_key(),
600
                "task_id", t->get_task_id(), "task_type", t->get_disaster_recovery_task_type());
601
    }
602
  }
603
  return ret;
604
}
605

606
int ObDRTaskMgr::init(
607
    const common::ObAddr &server,
608
    common::ObServerConfig &config,
609
    ObDRTaskExecutor &task_executor,
610
    obrpc::ObSrvRpcProxy *rpc_proxy,
611
    common::ObMySQLProxy *sql_proxy,
612
    share::schema::ObMultiVersionSchemaService *schema_service)
613
{
614
  int ret = OB_SUCCESS;
615
  static const int64_t thread_count = 1;
616
  if (OB_UNLIKELY(inited_ || !stopped_)) {
617
    ret = OB_INIT_TWICE;
618
    LOG_WARN("init twice", KR(ret), K(inited_), K_(stopped));
619
  } else if (OB_UNLIKELY(!server.is_valid())
620
          || OB_ISNULL(rpc_proxy)
621
          || OB_ISNULL(sql_proxy)
622
          || OB_ISNULL(schema_service)) {
623
    ret = OB_INVALID_ARGUMENT;
624
    LOG_WARN("invalid argument", KR(ret), K(server), KP(rpc_proxy),
625
             KP(sql_proxy), KP(schema_service));
626
  } else if (OB_FAIL(cond_.init(ObWaitEventIds::REBALANCE_TASK_MGR_COND_WAIT))) {
627
    LOG_WARN("fail to init cond", KR(ret));
628
  } else if (OB_FAIL(create(thread_count, "DRTaskMgr"))) {
629
    LOG_WARN("fail to create disaster recovery task mgr", KR(ret));
630
  } else {
631
    config_ = &config;
632
    self_ = server;
633
    task_executor_ = &task_executor;
634
    rpc_proxy_ = rpc_proxy;
635
    sql_proxy_ = sql_proxy;
636
    schema_service_ = schema_service;
637
    if (OB_FAIL(high_task_queue_.init(
638
            config, TASK_QUEUE_LIMIT, rpc_proxy_, ObDRTaskPriority::HIGH_PRI))) {
639
      LOG_WARN("fail to init high priority task queue", KR(ret));
640
    } else if (OB_FAIL(low_task_queue_.init(
641
            config, TASK_QUEUE_LIMIT, rpc_proxy_, ObDRTaskPriority::LOW_PRI))) {
642
      LOG_WARN("fail to init low priority task queue", KR(ret));
643
    } else if (OB_FAIL(disaster_recovery_task_table_updater_.init(sql_proxy, this))) {
644
      LOG_WARN("fail to init a ObDRTaskTableUpdater", KR(ret));
645
    } else {
646
      inited_ = true;
647
    }
648
  }
649
  return ret;
650
}
651

652
int ObDRTaskMgr::start()
653
{
654
  int ret = OB_SUCCESS;
655
  if (OB_UNLIKELY(!inited_)) {
656
    ret = OB_NOT_INIT;
657
    LOG_WARN("task mgr not inited", KR(ret), K(inited_));
658
  } else if (OB_UNLIKELY(!stopped_)) {
659
    ret = OB_ERR_UNEXPECTED;
660
    LOG_WARN("can not start ObDRTaskMgr twice", KR(ret), K_(stopped));
661
  } else if (OB_ISNULL(sql_proxy_)) {
662
    ret = OB_INVALID_ARGUMENT;
663
    LOG_WARN("invalid argument", KR(ret), KP(sql_proxy_));
664
  } else if (OB_FAIL(ObRsReentrantThread::start())) {
665
    LOG_WARN("fail to start ObRsReentrantThread", KR(ret));
666
  } else if (OB_FAIL(disaster_recovery_task_table_updater_.start())) {
667
    LOG_WARN("fail to start disaster_recovery_task_table_updater", KR(ret));
668
  } else {
669
    stopped_ = false;
670
    FLOG_INFO("success to start ObDRTaskMgr");
671
  }
672
  return ret;
673
}
674

675
void ObDRTaskMgr::stop()
676
{
677
  loaded_ = false;
678
  stopped_ = true;
679
  ObRsReentrantThread::stop();
680
  disaster_recovery_task_table_updater_.stop();
681
  ObThreadCondGuard guard(cond_);
682
  cond_.broadcast();
683
  for (int64_t i = 0; i < static_cast<int64_t>(ObDRTaskPriority::MAX_PRI); ++i) {
684
    queues_[i].reuse();
685
  }
686
  FLOG_INFO("success to stop ObDRTaskMgr");
687
}
688

689
void ObDRTaskMgr::wait()
690
{
691
  ObRsReentrantThread::wait();
692
  disaster_recovery_task_table_updater_.wait();
693
}
694

695
int ObDRTaskMgr::check_inner_stat_() const
696
{
697
  int ret = OB_SUCCESS;
698
  if (OB_UNLIKELY(!inited_ || stopped_ || !loaded_)) {
699
    ret = OB_NOT_INIT;
700
    LOG_WARN("ObDRTaskMgr is not inited or is stopped or not loaded", KR(ret), K_(inited), K_(stopped), K_(loaded));
701
  }
702
  return ret;
703
}
704

705
void ObDRTaskMgr::run3()
706
{
707
  FLOG_INFO("Disaster recovery task mgr start");
708
  if (OB_UNLIKELY(!inited_ || stopped_)) {
709
    LOG_WARN_RET(OB_NOT_INIT, "ObDRTaskMgr not init", K(inited_), K_(stopped));
710
  } else {
711
    int64_t last_dump_ts = ObTimeUtility::current_time();
712
    int64_t last_check_task_in_progress_ts = ObTimeUtility::current_time();
713
    int ret = OB_SUCCESS;
714
    int tmp_ret = OB_SUCCESS;
715
    while (!stop_) {
716
      // thread detect
717
      if (!loaded_ && OB_FAIL(load_task_to_schedule_list_())) {
718
        LOG_WARN("fail to load task infos into schedule list, will retry until success", KR(ret));
719
      } else {
720
        update_last_run_timestamp();
721

722
        common::ObArenaAllocator allocator;
723
        ObDRTask *task = nullptr;
724
        if (OB_FAIL(try_pop_task(allocator, task))) {
725
          LOG_WARN("fail to try pop task", KR(ret));
726
        } else if (OB_NOT_NULL(task)) {
727
          const ObAddr &dst_server = task->get_dst_server();
728
          share::ObServerInfoInTable server_info;
729
          if (OB_FAIL(SVR_TRACER.get_server_info(dst_server, server_info))) {
730
            LOG_WARN("fail to get server_info", KR(ret), K(dst_server));
731
          } else if (server_info.is_permanent_offline()) {
732
            // dest server permanent offline, do not execute this task, just clean it
733
            LOG_INFO("[DRTASK_NOTICE] dest server is permanent offline, task can not execute", K(dst_server), K(server_info));
734
            ObThreadCondGuard guard(cond_);
735
            if (OB_SUCCESS != (tmp_ret = async_add_cleaning_task_to_updater(
736
                                  task->get_task_id(),
737
                                  task->get_task_key(),
738
                                  OB_REBALANCE_TASK_CANT_EXEC,
739
                                  false/*need_record_event*/,
740
                                  ObDRTaskRetComment::CANNOT_EXECUTE_DUE_TO_SERVER_PERMANENT_OFFLINE,
741
                                  false/*reach_data_copy_concurrency*/))) {
742
              LOG_WARN("fail to do execute over", KR(tmp_ret), KPC(task));
743
            }
744
          } else {
745
            if (OB_SUCCESS != (tmp_ret = task->log_execute_start())) {
746
              LOG_WARN("fail to log task start", KR(tmp_ret), KPC(task));
747
            }
748
            if (OB_FAIL(execute_task(*task))) {
749
              LOG_WARN("fail to send", KR(ret), KPC(task));
750
            }
751
          }
752
          free_task_(allocator, task);
753
        } else {
754
          LOG_TRACE("task is nullptr after try_pop_task");
755
        }
756
        if (OB_SUCCESS != (tmp_ret = try_dump_statistic_(
757
              last_dump_ts))) {
758
          LOG_WARN("fail to try dump statistic", KR(tmp_ret), K(last_dump_ts));
759
        }
760
        if (OB_SUCCESS != (tmp_ret = try_clean_not_in_schedule_task_in_schedule_list_(
761
              last_check_task_in_progress_ts))) {
762
           LOG_WARN("fail to try check task in progress", KR(tmp_ret), K(last_check_task_in_progress_ts));
763
        }
764
      }
765
    }
766
  }
767
  FLOG_INFO("disaster task mgr exits");
768
}
769

770
int ObDRTaskMgr::check_task_in_executing(
771
    const ObDRTaskKey &task_key,
772
    const ObDRTaskPriority priority,
773
    bool &task_in_executing)
774
{
775
  int ret = OB_SUCCESS;
776
  if (OB_FAIL(check_inner_stat_())) {
777
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
778
  } else if (ObDRTaskPriority::HIGH_PRI != priority
779
             && ObDRTaskPriority::LOW_PRI != priority) {
780
    ret = OB_INVALID_ARGUMENT;
781
    LOG_WARN("invalid argument", KR(ret), K(priority));
782
  } else {
783
    ObThreadCondGuard guard(cond_);
784
    ObDRTaskQueue &queue = ObDRTaskPriority::LOW_PRI == priority
785
                           ? low_task_queue_
786
                           : high_task_queue_;
787
    if (OB_FAIL(queue.check_task_in_scheduling(task_key, task_in_executing))) {
788
      LOG_WARN("fail to check task exist", KR(ret), K(task_key));
789
    }
790
  }
791
  return ret;
792
}
793

794
int ObDRTaskMgr::check_task_exist(
795
    const ObDRTaskKey &task_key,
796
    const ObDRTaskPriority priority,
797
    bool &task_exist)
798
{
799
  int ret = OB_SUCCESS;
800
  if (OB_FAIL(check_inner_stat_())) {
801
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
802
  } else if (ObDRTaskPriority::HIGH_PRI != priority
803
             && ObDRTaskPriority::LOW_PRI != priority) {
804
    ret = OB_INVALID_ARGUMENT;
805
    LOG_WARN("invalid argument", KR(ret), K(priority));
806
  } else {
807
    ObThreadCondGuard guard(cond_);
808
    ObDRTaskQueue &queue = ObDRTaskPriority::LOW_PRI == priority
809
                           ? low_task_queue_
810
                           : high_task_queue_;
811
    if (OB_FAIL(queue.check_task_exist(task_key, task_exist))) {
812
      LOG_WARN("fail to check task exist", KR(ret), K(task_key));
813
    }
814
  }
815
  return ret;
816
}
817

818
int ObDRTaskMgr::add_task(
819
    const ObDRTask &task)
820
{
821
  int ret = OB_SUCCESS;
822
  if (OB_FAIL(check_inner_stat_())) {
823
    ret = OB_NOT_INIT;
824
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(loaded), K_(stopped));
825
  } else if (OB_UNLIKELY(!task.is_valid())) {
826
    ret = OB_INVALID_ARGUMENT;
827
    LOG_WARN("invalid dr task", KR(ret), K(task));
828
  } else {
829
    ObThreadCondGuard guard(cond_);
830
    ObDRTaskQueue &queue = task.is_high_priority_task()
831
                           ? high_task_queue_
832
                           : low_task_queue_;
833
    ObDRTaskQueue &sibling_queue = task.is_high_priority_task()
834
                                   ? low_task_queue_
835
                                   : high_task_queue_;
836
    bool has_task_in_schedule = false;
837
    if (OB_UNLIKELY(queue.task_cnt() >= TASK_QUEUE_LIMIT)) {
838
      ret = OB_SIZE_OVERFLOW;
839
      LOG_WARN("disaster recovery task queue is full", KR(ret), "task_cnt", queue.task_cnt());
840
    } else if (OB_FAIL(queue.push_task_in_wait_list(*this, sibling_queue, task, has_task_in_schedule))) {
841
      if (OB_ENTRY_EXIST != ret) {
842
        LOG_WARN("fail to push task", KR(ret), K(task));
843
      } else {
844
        ret = OB_SUCCESS;
845
        LOG_INFO("task already exist in queue", K(task));
846
      }
847
    } else {
848
      int64_t wait_cnt = 0;
849
      int64_t schedule_cnt = 0;
850
      if (OB_FAIL(inner_get_task_cnt_(wait_cnt, schedule_cnt))) {
851
        LOG_WARN("fail to get task cnt", KR(ret));
852
      } else if (!has_task_in_schedule
853
                 && 0 == get_reach_concurrency_limit()) {
854
        cond_.broadcast();
855
        LOG_INFO("success to broad cast cond_", K(wait_cnt), K(schedule_cnt));
856
      }
857
      clear_reach_concurrency_limit();
858
      LOG_INFO("[DRTASK_NOTICE] add task to disaster recovery task mgr finish", KR(ret), K(task));
859
    }
860
  }
861
  return ret;
862
}
863

864
int ObDRTaskMgr::deal_with_task_reply(
865
    const ObDRTaskReplyResult &reply)
866
{
867
  int ret = OB_SUCCESS;
868
  ObDRTaskKey task_key;
869
  if (OB_FAIL(check_inner_stat_())) {
870
    ret = OB_NOT_INIT;
871
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(loaded), K_(stopped));
872
  } else if (OB_FAIL(task_key.init(
873
          reply.tenant_id_,
874
          reply.ls_id_.id(),
875
          0, /* set to 0 */
876
          0, /* set to 0 */
877
          ObDRTaskKeyType::FORMAL_DR_KEY))) {
878
    LOG_WARN("fail to init task key", KR(ret), K(reply));
879
  } else {
880
    int tmp_ret = OB_SUCCESS;
881
    ObDRTask *task = nullptr;
882
    ObThreadCondGuard guard(cond_);
883
    if (OB_SUCCESS != (tmp_ret = get_task_by_id_(reply.task_id_, task_key, task))) {
884
      if (OB_ENTRY_NOT_EXIST == tmp_ret) {
885
        // task not exist, try record this reply result
886
        ROOTSERVICE_EVENT_ADD("disaster_recovery", "finish_disaster_recovery_task",
887
                            "tenant_id", reply.tenant_id_,
888
                            "ls_id", reply.ls_id_.id(),
889
                            "task_id", reply.task_id_,
890
                            "execute_result", reply.result_,
891
                            "ret_comment", ob_disaster_recovery_task_ret_comment_strs(ObDRTaskRetComment::RECEIVE_FROM_STORAGE_RPC));
892
      } else {
893
        LOG_WARN("fail to get task from task manager", KR(tmp_ret), K(reply), K(task_key));
894
      }
895
    } else if (OB_SUCCESS != (tmp_ret = task->log_execute_result(reply.result_, ObDRTaskRetComment::RECEIVE_FROM_STORAGE_RPC))){
896
      LOG_WARN("fail to log execute result", KR(tmp_ret), K(reply));
897
    }
898

899
    if (OB_FAIL(async_add_cleaning_task_to_updater(
900
                    reply.task_id_,
901
                    task_key,
902
                    reply.result_,
903
                    false,/*need_record_event*/
904
                    ObDRTaskRetComment::RECEIVE_FROM_STORAGE_RPC,
905
                    true/*need_clear_server_data_in_limit*/))) {
906
      LOG_WARN("fail to do execute over", KR(ret), K(reply));
907
    }
908
  }
909
  return ret;
910
}
911

912
int ObDRTaskMgr::async_add_cleaning_task_to_updater(
913
    const share::ObTaskId &task_id,
914
    const ObDRTaskKey &task_key,
915
    const int ret_code,
916
    const bool need_record_event,
917
    const ObDRTaskRetComment &ret_comment,
918
    const bool need_clear_server_data_in_limit)
919
{
920
  int ret = OB_SUCCESS;
921
  ObDRTask *task = nullptr;
922
  if (OB_FAIL(check_inner_stat_())) {
923
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
924
  } else if (OB_FAIL(get_task_by_id_(task_id, task_key, task))) {
925
    if (OB_ENTRY_NOT_EXIST == ret) {
926
      LOG_WARN("fail to get task, task may be cleaned earlier", KR(ret), K(task_id), K(task_key));
927
      ret = OB_SUCCESS;
928
    } else {
929
      LOG_WARN("fail to get task from task manager", KR(ret), K(task_id), K(task_key));
930
    }
931
  }
932
  if (OB_SUCC(ret)
933
      && OB_NOT_NULL(task)
934
      && OB_FAIL(disaster_recovery_task_table_updater_.async_update(
935
                      task->get_tenant_id(),
936
                      task->get_ls_id(),
937
                      task->get_disaster_recovery_task_type(),
938
                      task_key,
939
                      ret_code,
940
                      need_clear_server_data_in_limit,
941
                      task_id,
942
                      need_record_event,
943
                      ret_comment))) {
944
    LOG_WARN("fail to async update a dr task", KR(ret), "tenant_id", task->get_tenant_id(),
945
             "ls_id", task->get_ls_id(), K(task_id), K(need_record_event), K(ret_comment));
946
  }
947
  return ret;
948
}
949

950
int ObDRTaskMgr::do_cleaning(
951
    const share::ObTaskId &task_id,
952
    const ObDRTaskKey &task_key,
953
    const int ret_code,
954
    const bool need_clear_server_data_in_limit,
955
    const bool need_record_event,
956
    const ObDRTaskRetComment &ret_comment)
957
{
958
  int ret = OB_SUCCESS;
959
  ObThreadCondGuard guard(cond_);
960
  if (OB_FAIL(check_inner_stat_())) {
961
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
962
  } else {
963
    ObDRTaskQueue *task_queue = nullptr;
964
    ObDRTask *task = nullptr;
965
    common::ObAddr dst_server;
966
    for (int64_t i = 0; OB_SUCC(ret) && i < ARRAYSIZEOF(queues_); ++i) {
967
      if (OB_FAIL(queues_[i].get_task(task_id, task_key, task))) {
968
        LOG_WARN("fail to get schedule task from queue", KR(ret), "priority", queues_[i].get_priority_str());
969
      } else if (OB_NOT_NULL(task)) {
970
        task_queue = &queues_[i];
971
        break;
972
      }
973
    }
974

975
    if (OB_SUCC(ret)) {
976
      if (OB_ISNULL(task)) {
977
        LOG_INFO("in schedule taks not found, maybe not sync because of network traffic",
978
                 K(task_id), K(task_key), K(ret_code));
979
      } else {
980
        if (need_record_event) {
981
          (void)log_task_result(*task, ret_code, ret_comment);
982
        }
983
        dst_server = task->get_dst_server();
984
        if (OB_FAIL(set_sibling_in_schedule(*task, false/* not in schedule*/))) {
985
          LOG_WARN("fail to set sibling in schedule", KR(ret), KPC(task));
986
        } else if (OB_ISNULL(task_queue)) {
987
          LOG_INFO("task_queue is null"); // by pass
988
        } else if (OB_FAIL(task_queue->finish_schedule(task))) {
989
          LOG_WARN("fail to finish scheduling task", KR(ret), KPC(task));
990
        }
991
      }
992
      clear_reach_concurrency_limit();
993
    }
994
  }
995
  return ret;
996
}
997

998
int ObDRTaskMgr::get_all_task_count(
999
    int64_t &high_wait_cnt,
1000
    int64_t &high_schedule_cnt,
1001
    int64_t &low_wait_cnt,
1002
    int64_t &low_schedule_cnt)
1003
{
1004
  int ret = OB_SUCCESS;
1005
  if (OB_FAIL(check_inner_stat_())) {
1006
    ret = OB_NOT_INIT;
1007
    LOG_WARN("not init", KR(ret), K_(inited), K_(loaded), K_(stopped));
1008
  } else {
1009
    ObThreadCondGuard guard(cond_);
1010
    high_wait_cnt = get_high_priority_queue_().get_wait_list().get_size();
1011
    high_schedule_cnt = get_high_priority_queue_().get_schedule_list().get_size();
1012
    low_wait_cnt = get_low_priority_queue_().get_wait_list().get_size();
1013
    low_schedule_cnt = get_low_priority_queue_().get_schedule_list().get_size();
1014
  }
1015
  return ret;
1016
}
1017

1018
int ObDRTaskMgr::log_task_result(
1019
    const ObDRTask &task,
1020
    const int ret_code,
1021
    const ObDRTaskRetComment &ret_comment)
1022
{
1023
  int ret = OB_SUCCESS;
1024
  if (OB_FAIL(task.log_execute_result(ret_code, ret_comment))) {
1025
    LOG_WARN("fail to log execute task", KR(ret), K(task), KR(ret_code), K(ret_comment));
1026
  }
1027
  return ret;
1028
}
1029

1030
int ObDRTaskMgr::get_task_by_id_(
1031
    const share::ObTaskId &task_id,
1032
    const ObDRTaskKey &task_key,
1033
    ObDRTask *&task)
1034
{
1035
  int ret = OB_SUCCESS;
1036
  ObDRTask *task_to_get = nullptr;
1037
  void *raw_ptr = nullptr;
1038
  for (int64_t i = 0; OB_SUCC(ret) && i < ARRAYSIZEOF(queues_); ++i) {
1039
    if (OB_FAIL(queues_[i].get_task(task_id, task_key, task_to_get))) {
1040
      LOG_WARN("fail to get schedule task from queue", KR(ret), "priority", queues_[i].get_priority_str());
1041
    } else if (OB_NOT_NULL(task_to_get)) {
1042
      break;
1043
    }
1044
  }
1045
  if (OB_SUCC(ret) && OB_ISNULL(task_to_get)) {
1046
    task = nullptr;
1047
    ret = OB_ENTRY_NOT_EXIST;
1048
    LOG_WARN("task not exist, maybe cleaned earier", KR(ret), K(task_id), K(task_key));
1049
  } else {
1050
    task = task_to_get;
1051
  }
1052
  return ret;
1053
}
1054

1055
void ObDRTaskMgr::free_task_(
1056
     common::ObIAllocator &allocator,
1057
     ObDRTask *&task)
1058
{
1059
  if (OB_NOT_NULL(task)) {
1060
    task->~ObDRTask();
1061
    allocator.free(task);
1062
    task = nullptr;
1063
  }
1064
}
1065

1066
int ObDRTaskMgr::load_task_to_schedule_list_()
1067
{
1068
  int ret = OB_SUCCESS;
1069
  int tmp_ret = OB_SUCCESS;
1070
  ObThreadCondGuard guard(cond_);
1071
  ObArray<uint64_t> tenant_id_array;
1072

1073
  if (OB_UNLIKELY(!inited_ || stopped_)) {
1074
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped));
1075
  } else if (OB_ISNULL(schema_service_) || OB_ISNULL(sql_proxy_)) {
1076
    ret = OB_INVALID_ARGUMENT;
1077
    LOG_WARN("schema_service_ or sql_proxy_ is nullptr", KR(ret), KP(schema_service_), KP(sql_proxy_));
1078
  } else if (OB_UNLIKELY(ObTenantUtils::get_tenant_ids(schema_service_, tenant_id_array))) {
1079
    LOG_WARN("fail to get tenant id array", KR(ret));
1080
  } else {
1081
    // clear schedule_list and wait_list in two queues
1082
    for (int64_t i = 0; i < static_cast<int64_t>(ObDRTaskPriority::MAX_PRI); ++i) {
1083
      queues_[i].reuse();
1084
    }
1085
    clear_reach_concurrency_limit();
1086
    for (int64_t i = 0; OB_SUCC(ret) && i < tenant_id_array.count(); ++i) {
1087
      // load this tenant's task info into schedule_list
1088
      // TODO@jingyu.cr: need to isolate different tenant
1089
      const uint64_t tenant_id = tenant_id_array.at(i);
1090
      const uint64_t sql_tenant_id = gen_meta_tenant_id(tenant_id);
1091
      ObSqlString sql;
1092
      ObTimeoutCtx ctx;
1093
      SMART_VAR(ObISQLClient::ReadResult, result) {
1094
        if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
1095
          LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx));
1096
        } else if (OB_FAIL(sql.append_fmt(
1097
            "SELECT * FROM %s WHERE tenant_id = %ld",
1098
            share::OB_ALL_LS_REPLICA_TASK_TNAME, tenant_id))) {
1099
          LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(sql_tenant_id));
1100
        } else if (OB_FAIL(sql_proxy_->read(result, sql_tenant_id, sql.ptr()))) {
1101
          LOG_WARN("execute sql failed", KR(ret),
1102
              K(tenant_id), K(sql_tenant_id), "sql", sql.ptr());
1103
        } else if (OB_ISNULL(result.get_result())) {
1104
          ret = OB_ERR_UNEXPECTED;
1105
          LOG_WARN("get mysql result failed", KR(ret), "sql", sql.ptr());
1106
        } else if (OB_FAIL(load_single_tenant_task_infos_(*result.get_result()))) {
1107
          LOG_WARN("load single tenant's task info failed", KR(ret), K(tenant_id), K(sql_tenant_id));
1108
        } else {
1109
          FLOG_INFO("success to load single tenant's task info", K(tenant_id));
1110
        }
1111
      }
1112
    }
1113
    if (OB_SUCC(ret)) {
1114
      loaded_ = true;
1115
    }
1116
  }
1117
  return ret;
1118
}
1119

1120
int ObDRTaskMgr::load_single_tenant_task_infos_(
1121
    sqlclient::ObMySQLResult &res)
1122
{
1123
  int ret = OB_SUCCESS;
1124
  if (OB_UNLIKELY(!inited_ || stopped_)) {
1125
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped));
1126
  } else {
1127
    while (OB_SUCC(ret)) {
1128
      if (OB_FAIL(res.next())) {
1129
        if (OB_ITER_END == ret) {
1130
          ret = OB_SUCCESS;
1131
        } else {
1132
          LOG_WARN("get next result failed", KR(ret));
1133
        }
1134
        break;
1135
      } else if (OB_FAIL(load_task_info_(res))) {
1136
        LOG_WARN("fail to build and load this task info", KR(ret));
1137
      }
1138
    }
1139
  }
1140
  return ret;
1141
}
1142

1143
int ObDRTaskMgr::load_task_info_(
1144
    sqlclient::ObMySQLResult &res)
1145
{
1146
  int ret = OB_SUCCESS;
1147
  if (OB_UNLIKELY(!inited_ || stopped_)) {
1148
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped));
1149
  } else {
1150
    common::ObString task_type;
1151
    int64_t priority = 2;
1152
    (void)GET_COL_IGNORE_NULL(res.get_varchar, "task_type", task_type);
1153
    (void)GET_COL_IGNORE_NULL(res.get_int, "priority", priority);
1154
    if (OB_FAIL(ret)) {
1155
    } else if (task_type == common::ObString("MIGRATE REPLICA")) {
1156
      SMART_VAR(ObMigrateLSReplicaTask, tmp_task) {
1157
        if (OB_FAIL(tmp_task.build_task_from_sql_result(res))) {
1158
          LOG_WARN("fail to build migrate task info from res", KR(ret));
1159
        } else if (OB_FAIL(queues_[priority].push_task_in_schedule_list(tmp_task))) {
1160
          LOG_WARN("fail to load a ObMigrateLSReplicaTask into schedule list", KR(ret));
1161
        }
1162
      }
1163
    } else if (task_type == common::ObString("ADD REPLICA")) {
1164
      SMART_VAR(ObAddLSReplicaTask, tmp_task) {
1165
        if (OB_FAIL(tmp_task.build_task_from_sql_result(res))) {
1166
          LOG_WARN("fail to build ObAddLSReplicaTask from res", KR(ret));
1167
        } else if (OB_FAIL(queues_[priority].push_task_in_schedule_list(tmp_task))) {
1168
          LOG_WARN("fail to load ObAddLSReplicaTask into schedule list", KR(ret));
1169
        }
1170
      }
1171
    } else if (task_type == common::ObString("TYPE TRANSFORM")) {
1172
      SMART_VAR(ObLSTypeTransformTask, tmp_task) {
1173
        if (OB_FAIL(tmp_task.build_task_from_sql_result(res))) {
1174
          LOG_WARN("fail to build ObLSTypeTransformTask from res", KR(ret));
1175
        } else if (OB_FAIL(queues_[priority].push_task_in_schedule_list(tmp_task))) {
1176
          LOG_WARN("fail to load ObLSTypeTransformTask into schedule list", KR(ret));
1177
        }
1178
      }
1179
    } else if (0 == task_type.case_compare(ob_disaster_recovery_task_type_strs(ObDRTaskType::LS_REMOVE_NON_PAXOS_REPLICA))
1180
               || 0 == task_type.case_compare(ob_disaster_recovery_task_type_strs(ObDRTaskType::LS_REMOVE_PAXOS_REPLICA))) {
1181
      SMART_VAR(ObRemoveLSReplicaTask, tmp_task) {
1182
        if (OB_FAIL(tmp_task.build_task_from_sql_result(res))) {
1183
          LOG_WARN("fail to build ObRemoveLSReplicaTask from res", KR(ret));
1184
        } else if (OB_FAIL(queues_[priority].push_task_in_schedule_list(tmp_task))) {
1185
          LOG_WARN("fail to load ObRemoveLSReplicaTask into schedule list", KR(ret));
1186
        }
1187
      }
1188
    } else if (task_type == common::ObString("MODIFY PAXOS REPLICA NUMBER")) {
1189
      SMART_VAR(ObLSModifyPaxosReplicaNumberTask, tmp_task) {
1190
        if (OB_FAIL(tmp_task.build_task_from_sql_result(res))) {
1191
          LOG_WARN("fail to build ObLSModifyPaxosReplicaNumberTask from res", KR(ret));
1192
        } else if (OB_FAIL(queues_[priority].push_task_in_schedule_list(tmp_task))) {
1193
          LOG_WARN("fail to load ObLSModifyPaxosReplicaNumberTask into schedule list", KR(ret));
1194
        }
1195
      }
1196
    } else {
1197
      ret = OB_INVALID_ARGUMENT;
1198
      LOG_WARN("unexpected task type", KR(ret), K(task_type));
1199
    }
1200
  }
1201
  return ret;
1202
}
1203

1204
int ObDRTaskMgr::persist_task_info_(
1205
    const ObDRTask &task,
1206
    ObDRTaskRetComment &ret_comment)
1207
{
1208
  int ret = OB_SUCCESS;
1209
  ret_comment = ObDRTaskRetComment::MAX;
1210
  share::ObDMLSqlSplicer dml;
1211
  ObSqlString sql;
1212
  int64_t affected_rows = 0;
1213
  const uint64_t sql_tenant_id = gen_meta_tenant_id(task.get_tenant_id());
1214
  ObMySQLTransaction trans;
1215
  const int64_t timeout = GCONF.internal_sql_execute_timeout;
1216
  observer::ObInnerSQLConnection *conn = NULL;
1217
  ObConflictCaseWithClone case_to_check(ObConflictCaseWithClone::MODIFY_REPLICA);
1218

1219
  if (OB_FAIL(check_inner_stat_())) {
1220
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
1221
  } else if (OB_ISNULL(sql_proxy_)) {
1222
    ret = OB_INVALID_ARGUMENT;
1223
    LOG_WARN("invalid argument", KR(ret));
1224
  } else if (OB_FAIL(trans.start(sql_proxy_, sql_tenant_id))) {
1225
    LOG_WARN("failed to start trans", KR(ret), K(sql_tenant_id));
1226
  } else if (OB_FAIL(task.fill_dml_splicer(dml))) {
1227
    LOG_WARN("fill dml splicer failed", KR(ret));
1228
  } else if (OB_FAIL(dml.splice_insert_sql(share::OB_ALL_LS_REPLICA_TASK_TNAME, sql))) {
1229
    LOG_WARN("fail to splice batch insert update sql", KR(ret), K(sql));
1230
  } else if (OB_ISNULL(conn = static_cast<observer::ObInnerSQLConnection *>(trans.get_connection()))) {
1231
    ret = OB_ERR_UNEXPECTED;
1232
    LOG_WARN("conn_ is NULL", KR(ret));
1233
  } else if (OB_FAIL(ObInnerConnectionLockUtil::lock_table(sql_tenant_id,
1234
                                                           OB_ALL_LS_REPLICA_TASK_TID,
1235
                                                           EXCLUSIVE,
1236
                                                           timeout,
1237
                                                           conn))) {
1238
    LOG_WARN("lock dest table failed", KR(ret), K(sql_tenant_id));
1239
  } else if (OB_FAIL(ObTenantSnapshotUtil::check_tenant_not_in_cloning_procedure(task.get_tenant_id(), case_to_check))) {
1240
    LOG_WARN("fail to check whether tenant is in cloning procedure", KR(ret));
1241
    ret_comment = CANNOT_PERSIST_TASK_DUE_TO_CLONE_CONFLICT;
1242
  } else if (OB_FAIL(trans.write(sql_tenant_id, sql.ptr(), affected_rows))) {
1243
    LOG_WARN("execute sql failed", KR(ret), "tenant_id",task.get_tenant_id(), K(sql_tenant_id), K(sql));
1244
  }
1245
  if (trans.is_started()) {
1246
    int tmp_ret = OB_SUCCESS;
1247
    if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
1248
      LOG_WARN("trans end failed", KR(tmp_ret), KR(ret));
1249
      ret = OB_SUCC(ret) ? tmp_ret : ret;
1250
    }
1251
  }
1252
  FLOG_INFO("[DRTASK_NOTICE] finish persist task into inner table", KR(ret), K(task));
1253
  return ret;
1254
}
1255

1256
int ObDRTaskMgr::try_dump_statistic_(
1257
    int64_t &last_dump_ts) const
1258
{
1259
  int ret = OB_SUCCESS;
1260
  if (OB_FAIL(check_inner_stat_())) {
1261
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
1262
  } else {
1263
    ObThreadCondGuard guard(cond_);
1264
    const int64_t now = ObTimeUtility::current_time();
1265
    if (now > last_dump_ts + config_->balancer_log_interval) {
1266
      last_dump_ts = now;
1267
      int tmp_ret = inner_dump_statistic_();
1268
      if (OB_SUCCESS != tmp_ret) {
1269
        LOG_WARN("task manager dump statistics failed", KR(tmp_ret));
1270
      }
1271
    };
1272
  }
1273
  return ret;
1274
}
1275

1276
int ObDRTaskMgr::inner_dump_statistic_() const
1277
{
1278
  int ret = OB_SUCCESS;
1279
  if (OB_FAIL(check_inner_stat_())) {
1280
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
1281
  } else {
1282
    LOG_INFO("[DRTASK_NOTICE] disaster recovery task manager statistics",
1283
        "waiting_high_priority_task_cnt", high_task_queue_.wait_task_cnt(),
1284
        "executing_high_priority_task_cnt", high_task_queue_.in_schedule_task_cnt(),
1285
        "waiting_low_priority_task_cnt", low_task_queue_.wait_task_cnt(),
1286
        "executing_low_priority_task_cnt", low_task_queue_.in_schedule_task_cnt());
1287
    for (int64_t i = 0; i < ARRAYSIZEOF(queues_); ++i) {
1288
      // ignore error to make sure checking two queues 
1289
      if (OB_FAIL(queues_[i].dump_statistic())) {
1290
        LOG_WARN("fail to dump statistic for this queue", KR(ret), "priority", queues_[i].get_priority_str());
1291
      }
1292
    }
1293
  }
1294
  return ret;
1295
}
1296

1297
int ObDRTaskMgr::try_clean_not_in_schedule_task_in_schedule_list_(
1298
    int64_t &last_check_task_in_progress_ts)
1299
{
1300
  int ret = OB_SUCCESS;
1301
  if (OB_FAIL(check_inner_stat_())) {
1302
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
1303
  } else {
1304
    int64_t wait = 0;
1305
    int64_t schedule = 0;
1306
    ObThreadCondGuard guard(cond_);
1307
    if (OB_FAIL(inner_get_task_cnt_(wait, schedule))) {
1308
      LOG_WARN("fail to get task cnt", KR(ret));
1309
    } else if (schedule <= 0) {
1310
      // bypass
1311
    } else {
1312
      const int64_t now = ObTimeUtility::current_time();
1313
      if (now > last_check_task_in_progress_ts + schedule * CHECK_IN_PROGRESS_INTERVAL_PER_TASK) {
1314
        last_check_task_in_progress_ts = now;
1315
        int tmp_ret = inner_clean_not_in_schedule_task_in_schedule_list_();
1316
        if (OB_SUCCESS != tmp_ret) {
1317
          LOG_WARN("fail to do check task in progress", KR(tmp_ret));
1318
        }
1319
      }
1320
    }
1321
  }
1322
  return ret;
1323
}
1324

1325
int ObDRTaskMgr::inner_clean_not_in_schedule_task_in_schedule_list_()
1326
{
1327
  int ret = OB_SUCCESS;
1328
  if (OB_FAIL(check_inner_stat_())) {
1329
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
1330
  } else {
1331
    for (int64_t i = 0; i < ARRAYSIZEOF(queues_); ++i) {
1332
      // ignore error to make sure checking two queues 
1333
      if (OB_FAIL(queues_[i].handle_not_in_progress_task(*this))) {
1334
        LOG_WARN("fail to handle not in progress task in this queue", KR(ret),
1335
                 "priority", queues_[i].get_priority_str());
1336
      }
1337
    }
1338
  }
1339
  FLOG_INFO("finish inner check task in progress", KR(ret));
1340
  return ret;
1341
}
1342

1343
int ObDRTaskMgr::inner_get_task_cnt_(
1344
    int64_t &wait_cnt,
1345
    int64_t &in_schedule_cnt) const
1346
{
1347
  int ret = OB_SUCCESS;
1348
  if (OB_FAIL(check_inner_stat_())) {
1349
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
1350
  } else {
1351
    wait_cnt = 0;
1352
    in_schedule_cnt = 0;
1353
    for (int64_t i = 0; i < ARRAYSIZEOF(queues_); ++i) {
1354
      wait_cnt += queues_[i].wait_task_cnt();
1355
      in_schedule_cnt += queues_[i].in_schedule_task_cnt();
1356
    }
1357
  }
1358
  return ret;
1359
}
1360

1361
int ObDRTaskMgr::try_pop_task(
1362
    common::ObIAllocator &allocator,
1363
    ObDRTask *&task)
1364
{
1365
  int ret = OB_SUCCESS;
1366
  ObThreadCondGuard guard(cond_);
1367
  int64_t wait_cnt = 0;
1368
  int64_t in_schedule_cnt = 0;
1369
  ObDRTask *my_task = nullptr;
1370
  void *raw_ptr = nullptr;
1371
  if (OB_FAIL(check_inner_stat_())) {
1372
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
1373
  } else if (OB_FAIL(inner_get_task_cnt_(wait_cnt, in_schedule_cnt))) {
1374
    LOG_WARN("fail to get task cnt", KR(ret));
1375
  } else if (wait_cnt > 0
1376
             && 0 == concurrency_limited_ts_) {
1377
    if (OB_FAIL(pop_task(my_task))) {
1378
      LOG_WARN("fail to pop task", KR(ret));
1379
    } else if (OB_ISNULL(my_task)) {
1380
      task = nullptr;
1381
    } else if (OB_ISNULL(raw_ptr = allocator.alloc(my_task->get_clone_size()))) {
1382
      ret = OB_ALLOCATE_MEMORY_FAILED;
1383
      LOG_WARN("fail to allocate task", KR(ret));
1384
    } else if (OB_FAIL(my_task->clone(raw_ptr, task))) {
1385
      LOG_WARN("fail to clone task", KR(ret), "source_task", *my_task);
1386
    } else if (OB_ISNULL(task)) {
1387
      ret = OB_ERR_UNEXPECTED;
1388
      LOG_WARN("task ptr is null", KR(ret));
1389
    } else {
1390
      my_task->set_execute_time(ObTimeUtility::current_time());
1391
    }
1392
    
1393
    if (OB_FAIL(ret)) {
1394
      if (OB_NOT_NULL(task)) {
1395
        free_task_(allocator, task);
1396
      } else if (OB_NOT_NULL(raw_ptr)) {
1397
        allocator.free(raw_ptr);
1398
        raw_ptr = nullptr;
1399
      }
1400
    }
1401
  } else {
1402
    int64_t now = ObTimeUtility::current_time();
1403
    cond_.wait(get_schedule_interval());
1404
    if (get_reach_concurrency_limit() + CONCURRENCY_LIMIT_INTERVAL < now) {
1405
      clear_reach_concurrency_limit();
1406
      LOG_TRACE("success to clear concurrency limit");
1407
    }
1408
  }
1409
  if (OB_SUCC(ret) && OB_NOT_NULL(task)) {
1410
    LOG_INFO("[DRTASK_NOTICE] success to pop a task", KPC(task), K_(concurrency_limited_ts),
1411
             K(in_schedule_cnt));
1412
  }
1413
  return ret;
1414
}
1415

1416
int ObDRTaskMgr::pop_task(
1417
    ObDRTask *&task)
1418
{
1419
  int ret = OB_SUCCESS;
1420
  int64_t wait_cnt = 0;
1421
  if (OB_FAIL(check_inner_stat_())) {
1422
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
1423
  } else {
1424
    task = nullptr;
1425
    for (int64_t i = 0; OB_SUCC(ret) && i < ARRAYSIZEOF(queues_); ++i) {
1426
      if (queues_[i].wait_task_cnt() > 0) {
1427
        wait_cnt += queues_[i].wait_task_cnt();
1428
        if (OB_FAIL(queues_[i].pop_task(task))) {
1429
          LOG_WARN("pop_task from queue failed", KR(ret), "priority", queues_[i].get_priority_str());
1430
        } else if (OB_NOT_NULL(task)) {
1431
          break;
1432
        }
1433
      }
1434
    }
1435
    if (OB_SUCC(ret)) {
1436
      if (OB_ISNULL(task)) {
1437
        if (wait_cnt > 0) {
1438
          set_reach_concurrency_limit();
1439
        }
1440
      } else {
1441
        const bool in_schedule = true;
1442
        if (OB_FAIL(set_sibling_in_schedule(*task, in_schedule))) {
1443
          LOG_WARN("set sibling in schedule failed", KR(ret), KPC(task));
1444
        }
1445
      }
1446
    }
1447
  }
1448
  return ret;
1449
}
1450

1451
int ObDRTaskMgr::execute_task(
1452
    const ObDRTask &task)
1453
{
1454
  int ret = OB_SUCCESS;
1455
  ObCurTraceId::init(self_);
1456
  FLOG_INFO("execute disaster recovery task", K(task));
1457
  int dummy_ret = OB_SUCCESS;
1458
  ObDRTaskRetComment ret_comment = ObDRTaskRetComment::MAX;
1459
  if (OB_FAIL(check_inner_stat_())) {
1460
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
1461
  } else if (OB_FAIL(persist_task_info_(task, ret_comment))) {
1462
    LOG_WARN("fail to persist task info into table", KR(ret));
1463
  } else if (OB_FAIL(task_executor_->execute(task, dummy_ret, ret_comment))) {
1464
    LOG_WARN("fail to execute disaster recovery task", KR(ret));
1465
  }
1466
  if (OB_FAIL(ret)) {
1467
    //TODO@jingyu.cr:
1468
    // (1) use rwlock instead of threadcond
1469
    // (2) deal with block in status
1470
    (void)log_task_result(task, ret, ret_comment);
1471
    ObThreadCondGuard guard(cond_);
1472
    const bool data_in_limit = (OB_REACH_SERVER_DATA_COPY_IN_CONCURRENCY_LIMIT == ret);
1473
    if (OB_SUCCESS != async_add_cleaning_task_to_updater(
1474
          task.get_task_id(),
1475
          task.get_task_key(),
1476
          ret,
1477
          false,/*need_record_event*/
1478
          ret_comment,
1479
          !data_in_limit)) {
1480
      LOG_WARN("fail to do execute over", KR(ret), K(task));
1481
    }
1482
  }
1483
  return ret;
1484
}
1485

1486
int ObDRTaskMgr::set_sibling_in_schedule(
1487
    const ObDRTask &task,
1488
    const bool in_schedule)
1489
{
1490
  int ret = OB_SUCCESS;
1491
  if (OB_FAIL(check_inner_stat_())) {
1492
    LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));
1493
  } else {
1494
    for (int64_t i = 0; OB_SUCC(ret) && i < ARRAYSIZEOF(queues_); ++i) {
1495
      if (OB_FAIL(queues_[i].set_sibling_in_schedule(task, in_schedule))) {
1496
        if (i == 0) {
1497
          LOG_WARN("fail to set sibling in schedule in high priority queue", KR(ret), K(task));
1498
        } else {
1499
          LOG_WARN("fail to set sibling in schedule in low priority queue", KR(ret), K(task));
1500
        }
1501
      }
1502
    }
1503
  }
1504
  return ret;
1505
}
1506
} // end namespace rootserver
1507
} // end namespace oceanbase
1508

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.