oceanbase
1507 строк · 53.1 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS14
15#include "ob_disaster_recovery_task_mgr.h"16
17#include "lib/lock/ob_mutex.h"18#include "lib/stat/ob_diagnose_info.h"19#include "lib/profile/ob_trace_id.h"20#include "lib/alloc/ob_malloc_allocator.h"21#include "share/ob_debug_sync.h"22#include "share/ob_srv_rpc_proxy.h"23#include "share/config/ob_server_config.h"24#include "ob_disaster_recovery_task_executor.h"25#include "rootserver/ob_root_balancer.h"26#include "ob_rs_event_history_table_operator.h"27#include "share/ob_rpc_struct.h"28#include "observer/ob_server_struct.h"29#include "sql/executor/ob_executor_rpc_proxy.h"30#include "rootserver/ob_disaster_recovery_task.h" // for ObDRTaskType31#include "share/ob_share_util.h" // for ObShareUtil32#include "lib/lock/ob_tc_rwlock.h" // for common::RWLock33#include "rootserver/ob_disaster_recovery_task.h"34#include "rootserver/tenant_snapshot/ob_tenant_snapshot_util.h" // for ObTenantSnapshotUtil35#include "share/inner_table/ob_inner_table_schema_constants.h"36#include "share/ob_all_server_tracer.h"37#include "storage/tablelock/ob_lock_inner_connection_util.h" // for ObInnerConnectionLockUtil38#include "observer/ob_inner_sql_connection.h"39
40namespace oceanbase41{
42using namespace common;43using namespace lib;44using namespace obrpc;45using namespace transaction::tablelock;46using namespace share;47
48namespace rootserver49{
50ObDRTaskQueue::ObDRTaskQueue() : inited_(false),51config_(nullptr),52task_alloc_(),53wait_list_(),54schedule_list_(),55task_map_(),56rpc_proxy_(nullptr),57priority_(ObDRTaskPriority::MAX_PRI)58{
59}
60
61//TODO@jingyu.cr: need to make clear resue() and reset()
62ObDRTaskQueue::~ObDRTaskQueue()63{
64reuse();65}
66
67void ObDRTaskQueue::reuse()68{
69while (!wait_list_.is_empty()) {70ObDRTask *t = wait_list_.remove_first();71remove_task_from_map_and_free_it_(t);72}73while (!schedule_list_.is_empty()) {74ObDRTask *t = schedule_list_.remove_first();75remove_task_from_map_and_free_it_(t);76}77task_map_.clear();78}
79
80void ObDRTaskQueue::reset()81{
82wait_list_.reset();83schedule_list_.reset();84task_map_.clear();85}
86
87void ObDRTaskQueue::free_task_(ObDRTask *&task)88{
89if (OB_NOT_NULL(task)) {90task->~ObDRTask();91task_alloc_.free(task);92task = nullptr;93}94}
95
96void ObDRTaskQueue::remove_task_from_map_and_free_it_(ObDRTask *&task)97{
98if (OB_NOT_NULL(task)) {99task_map_.erase_refactored(task->get_task_key());100free_task_(task);101}102}
103
104int ObDRTaskQueue::init(105common::ObServerConfig &config,106const int64_t bucket_num,107obrpc::ObSrvRpcProxy *rpc_proxy,108ObDRTaskPriority priority)109{
110int ret = OB_SUCCESS;111if (OB_UNLIKELY(inited_)) {112ret = OB_INIT_TWICE;113LOG_WARN("init twice", KR(ret));114} else if (OB_UNLIKELY(bucket_num <= 0)115|| OB_ISNULL(rpc_proxy)116|| (ObDRTaskPriority::LOW_PRI != priority && ObDRTaskPriority::HIGH_PRI != priority)) {117ret = OB_INVALID_ARGUMENT;118LOG_WARN("invalid argument", KR(ret), K(bucket_num), KP(rpc_proxy), K(priority));119} else if (OB_FAIL(task_map_.create(bucket_num, "DRTaskMap"))) {120LOG_WARN("fail to create task map", KR(ret), K(bucket_num));121} else if (OB_FAIL(task_alloc_.init(122ObMallocAllocator::get_instance(), OB_MALLOC_MIDDLE_BLOCK_SIZE,123ObMemAttr(common::OB_SERVER_TENANT_ID, "DRTaskAlloc")))) {124LOG_WARN("fail to init task allocator", KR(ret));125} else {126config_ = &config;127rpc_proxy_ = rpc_proxy;128priority_ = priority;129inited_ = true;130}131return ret;132}
133
134int ObDRTaskQueue::check_task_in_scheduling(135const ObDRTaskKey &task_key,136bool &task_in_scheduling) const137{
138int ret = OB_SUCCESS;139if (OB_UNLIKELY(!inited_)) {140ret = OB_NOT_INIT;141LOG_WARN("not init", KR(ret));142} else if (OB_UNLIKELY(!task_key.is_valid())) {143ret = OB_INVALID_ARGUMENT;144LOG_WARN("invalid argument", KR(ret), K(task_key));145} else {146ObDRTask *task = nullptr;147int tmp_ret = task_map_.get_refactored(task_key, task);148if (OB_SUCCESS == tmp_ret) {149if (OB_ISNULL(task)) {150ret = OB_ERR_UNEXPECTED;151LOG_WARN("a null task ptr getted from task_map", KR(ret), K(task_key));152} else {153task_in_scheduling = task->in_schedule();154}155} else if (OB_HASH_NOT_EXIST == tmp_ret) {156// task not exist means task not executing157task_in_scheduling = false;158} else {159ret = tmp_ret;160LOG_WARN("fail to get from map", KR(ret), K(task_key));161}162}163return ret;164}
165
166int ObDRTaskQueue::check_task_exist(167const ObDRTaskKey &task_key,168bool &task_exist)169{
170int ret = OB_SUCCESS;171if (OB_UNLIKELY(!inited_)) {172ret = OB_NOT_INIT;173LOG_WARN("not init", KR(ret));174} else if (OB_UNLIKELY(!task_key.is_valid())) {175ret = OB_INVALID_ARGUMENT;176LOG_WARN("invalid argument", KR(ret), K(task_key));177} else {178ObDRTask *task = nullptr;179int tmp_ret = task_map_.get_refactored(task_key, task);180if (OB_SUCCESS == tmp_ret) {181task_exist = true;182} else if (OB_HASH_NOT_EXIST == tmp_ret) {183task_exist = false;184} else {185ret = OB_ERR_UNEXPECTED;186LOG_WARN("fail to get from task_map", KR(ret), K(task_key));187}188}189return ret;190}
191
192int ObDRTaskQueue::push_task_in_wait_list(193ObDRTaskMgr &task_mgr,194const ObDRTaskQueue &sibling_queue,195const ObDRTask &task,196bool &has_task_in_schedule)197{
198int ret = OB_SUCCESS;199has_task_in_schedule = false;200if (OB_UNLIKELY(!inited_)) {201ret = OB_NOT_INIT;202LOG_WARN("not init", KR(ret));203} else {204const ObDRTaskKey &task_key = task.get_task_key();205ObDRTask *task_ptr = nullptr;206int tmp_ret = task_map_.get_refactored(task_key, task_ptr);207if (OB_HASH_NOT_EXIST == tmp_ret) {208if (OB_FAIL(do_push_task_in_wait_list_(task_mgr, sibling_queue, task, has_task_in_schedule))) {209LOG_WARN("fail to push back", KR(ret));210}211} else if (OB_SUCCESS == tmp_ret) {212ret = OB_ENTRY_EXIST;213LOG_INFO("disaster recovery task exist", KR(ret), K(task_key), K(task), KP(this));214} else {215ret = tmp_ret;216LOG_WARN("fail to check task exist", KR(ret), K(task_key));217}218}219return ret;220}
221
222int ObDRTaskQueue::push_task_in_schedule_list(223ObDRTask &task)224{
225// STEP 1: push task into schedule list226// STEP 2: push task into task_map227// STEP 3: set task in schedule228int ret = OB_SUCCESS;229void *raw_ptr = nullptr;230ObDRTask *new_task = nullptr;231
232if (OB_UNLIKELY(!inited_)) {233ret = OB_NOT_INIT;234LOG_WARN("not init", KR(ret));235} else if (OB_ISNULL(raw_ptr = task_alloc_.alloc(task.get_clone_size()))) {236ret = OB_ALLOCATE_MEMORY_FAILED;237LOG_WARN("fail to allocate task", KR(ret));238} else if (OB_FAIL(task.clone(raw_ptr, new_task))) {239LOG_WARN("fail to clone task", KR(ret), K(task));240} else if (OB_ISNULL(new_task)) {241ret = OB_ERR_UNEXPECTED;242LOG_WARN("new_task is nullptr", KR(ret));243} else if (OB_FAIL(task_map_.set_refactored(new_task->get_task_key(), new_task))) {244LOG_WARN("fail to set map", KR(ret), "task_key", new_task->get_task_key());245} else {246// set schedule_time for this task247new_task->set_schedule();248if (!schedule_list_.add_last(new_task)) {249ret = OB_ERR_UNEXPECTED;250LOG_WARN("fail to add task to schedule list", KR(ret), KPC(new_task));251} else {252FLOG_INFO("[DRTASK_NOTICE] finish add task into schedule list", KPC(new_task));253}254}255
256if (OB_FAIL(ret)) {257if (OB_NOT_NULL(new_task)) {258remove_task_from_map_and_free_it_(new_task);259} else if (OB_NOT_NULL(raw_ptr)) {260task_alloc_.free(raw_ptr);261raw_ptr = nullptr;262}263}264return ret;265}
266
267int ObDRTaskQueue::do_push_task_in_wait_list_(268ObDRTaskMgr &task_mgr,269const ObDRTaskQueue &sibling_queue,270const ObDRTask &task,271bool &has_task_in_schedule)272{
273int ret = OB_SUCCESS;274void *raw_ptr = nullptr;275ObDRTask *new_task = nullptr;276if (OB_UNLIKELY(!inited_)) {277ret = OB_NOT_INIT;278LOG_WARN("not init", KR(ret));279} else if (OB_ISNULL(config_)) {280ret = OB_ERR_UNEXPECTED;281LOG_WARN("config_ ptr is null", KR(ret), KP(config_));282} else if (OB_ISNULL(raw_ptr = task_alloc_.alloc(283task.get_clone_size()))) {284ret = OB_ALLOCATE_MEMORY_FAILED;285LOG_WARN("fail to alloc task", KR(ret), "size", task.get_clone_size());286} else if (OB_FAIL(task.clone(raw_ptr, new_task))) {287LOG_WARN("fail to clone task", KR(ret), K(task));288} else if (OB_ISNULL(new_task)) {289ret = OB_ERR_UNEXPECTED;290LOG_WARN("new task ptr is null", KR(ret));291} else if (!wait_list_.add_last(new_task)) {292ret = OB_ERR_UNEXPECTED;293LOG_WARN("fail to add new task to wait list", KR(ret), "task", *new_task);294} else {295has_task_in_schedule = false;296bool sibling_in_schedule = false;297if (OB_FAIL(sibling_queue.check_task_in_scheduling(298new_task->get_task_key(), sibling_in_schedule))) {299LOG_WARN("fail to check has in schedule task", KR(ret),300"task_key", new_task->get_task_key());301} else if (OB_FAIL(task_map_.set_refactored(302new_task->get_task_key(), new_task))) {303LOG_WARN("fail to set map", KR(ret), "task_key", new_task->get_task_key());304} else if (task_mgr.get_reach_concurrency_limit() && !sibling_in_schedule) {305task_mgr.clear_reach_concurrency_limit();306}307if (OB_SUCC(ret)) {308has_task_in_schedule = sibling_in_schedule;309if (OB_FAIL(set_sibling_in_schedule(new_task->get_task_key(), sibling_in_schedule))) {310LOG_WARN("fail to set sibling in schedule", KR(ret),311"task_key", new_task->get_task_key(), K(sibling_in_schedule));312}313}314
315if (OB_FAIL(ret)) {316wait_list_.remove(new_task);317} else {318LOG_INFO("success to push a task in waiting list", K(task), K(has_task_in_schedule));319}320}321
322if (OB_FAIL(ret)) {323if (OB_NOT_NULL(new_task)) {324remove_task_from_map_and_free_it_(new_task);325} else if (OB_NOT_NULL(raw_ptr)) {326task_alloc_.free(raw_ptr);327raw_ptr = nullptr;328}329}330return ret;331}
332
333int ObDRTaskQueue::pop_task(334ObDRTask *&task)335{
336int ret = OB_SUCCESS;337task = nullptr;338if (OB_UNLIKELY(!inited_)) {339ret = OB_NOT_INIT;340LOG_WARN("task queue not init", KR(ret));341} else if (OB_ISNULL(config_)) {342ret = OB_ERR_UNEXPECTED;343LOG_WARN("config_ ptr is null", KR(ret), KP(config_));344} else {345DLIST_FOREACH(t, wait_list_) {346if (t->is_sibling_in_schedule()) {347// task can not pop348LOG_INFO("can not pop this task because a sibling task already in schedule", KPC(t));349} else {350task = t;351break;352}353}354if (OB_NOT_NULL(task)) {355// when task not empty, we move it from wait to schedule list,356LOG_INFO("a task from queue to pop found", KPC(task));357wait_list_.remove(task);358if (!schedule_list_.add_last(task)) {359ret = OB_ERR_UNEXPECTED;360LOG_WARN("fail to add task to schedule list", KR(ret));361} else {362task->set_schedule();363LOG_INFO("success to set task in schedule normally", KPC(task));364}365// if fail to add to schedule list, clean it directly366if (OB_FAIL(ret)) {367remove_task_from_map_and_free_it_(task);368}369}370}371return ret;372}
373
374int ObDRTaskQueue::get_task(375const share::ObTaskId &task_id,376const ObDRTaskKey &task_key,377ObDRTask *&task)378{
379int ret = OB_SUCCESS;380if (OB_UNLIKELY(!inited_)) {381ret = OB_NOT_INIT;382LOG_WARN("not init", KR(ret));383} else {384ObDRTask *my_task = nullptr;385int tmp_ret = task_map_.get_refactored(task_key, my_task);386if (OB_HASH_NOT_EXIST == tmp_ret) {387task = nullptr;388} else if (OB_SUCCESS == tmp_ret) {389if (OB_ISNULL(my_task)) {390ret = OB_ERR_UNEXPECTED;391LOG_WARN("my_task ptr is null", KR(ret), KP(my_task));392} else if (my_task->get_task_id() == task_id) {393task = my_task;394} else {395task = nullptr;396}397} else {398ret = tmp_ret;399LOG_WARN("fail to get task from map", KR(ret), K(task_key), K(task_id));400}401}402return ret;403}
404
405int ObDRTaskQueue::check_task_need_cleaning_(406const ObDRTask &task,407bool &need_cleanning,408ObDRTaskRetComment &ret_comment)409{
410int ret = OB_SUCCESS;411// do not clean this task by default412// need_cleanning = true under these cases413// (1) server not exist414// (2) server is permanant offline415// (3) rpc ls_check_dr_task_exist successfully told us task not exist416// (4) task is timeout while any failure during whole procedure417need_cleanning = false;418Bool task_exist = false;419const ObAddr &dst_server = task.get_dst_server();420share::ObServerInfoInTable server_info;421if (OB_UNLIKELY(!inited_)) {422ret = OB_NOT_INIT;423LOG_WARN("task queue not init", KR(ret));424} else if (OB_ISNULL(rpc_proxy_)) {425ret = OB_ERR_UNEXPECTED;426LOG_WARN("some ptr is null", KR(ret), KP(rpc_proxy_));427} else if (OB_FAIL(SVR_TRACER.get_server_info(dst_server, server_info))) {428LOG_WARN("fail to get server_info", KR(ret), "server", dst_server);429// case 1. server not exist430if (OB_ENTRY_NOT_EXIST == ret) {431ret = OB_SUCCESS;432FLOG_INFO("the reason to clean this task: server not exist", K(task));433need_cleanning = true;434ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_SERVER_NOT_EXIST;435}436} else if (server_info.is_permanent_offline()) {437// case 2. server is permanant offline438FLOG_INFO("the reason to clean this task: server permanent offline", K(task), K(server_info));439need_cleanning = true;440ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_SERVER_PERMANENT_OFFLINE;441} else if (server_info.is_alive()) {442ObDRTaskExistArg arg;443arg.task_id_ = task.get_task_id();444arg.tenant_id_ = task.get_tenant_id();445arg.ls_id_ = task.get_ls_id();446if (OB_FAIL(rpc_proxy_->to(task.get_dst_server()).by(task.get_tenant_id())447.ls_check_dr_task_exist(arg, task_exist))) {448LOG_WARN("fail to check task exist", KR(ret), "tenant_id", task.get_tenant_id(),449"task_id", task.get_task_id(), "dst", task.get_dst_server());450} else if (!task_exist) {451// case 3. rpc ls_check_dr_task_exist successfully told us task not exist452FLOG_INFO("the reason to clean this task: task not running", K(task));453need_cleanning = true;454ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_TASK_NOT_RUNNING;455}456} else if (server_info.is_temporary_offline()) {457ret = OB_SERVER_NOT_ALIVE;458LOG_WARN("server status is not alive, task may be cleanned later", KR(ret), "server", task.get_dst_server(), K(server_info), K(task));459} else {460ret = OB_ERR_UNEXPECTED;461LOG_WARN("unexpected server status", KR(ret), "server", task.get_dst_server(), K(server_info), K(task));462}463
464// case 4. task is timeout while any OB_FAIL occurs465if (OB_FAIL(ret) && task.is_already_timeout()) {466FLOG_INFO("the reason to clean this task: task is timeout", KR(ret), K(task));467ret = OB_SUCCESS;468need_cleanning = true;469ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_TASK_TIMEOUT;470}471return ret;472}
473
474int ObDRTaskQueue::handle_not_in_progress_task(475ObDRTaskMgr &task_mgr)476{
477int ret = OB_SUCCESS;478if (OB_UNLIKELY(!inited_)) {479ret = OB_NOT_INIT;480LOG_WARN("task queue not init", KR(ret));481} else {482const int ret_code = OB_LS_REPLICA_TASK_RESULT_UNCERTAIN;483ObDRTaskRetComment ret_comment = ObDRTaskRetComment::MAX;484bool need_cleaning = false;485DLIST_FOREACH(t, schedule_list_) {486int tmp_ret = OB_SUCCESS;487need_cleaning = false;488DEBUG_SYNC(BEFORE_CHECK_CLEAN_DRTASK);489if (OB_SUCCESS != (tmp_ret = check_task_need_cleaning_(*t, need_cleaning, ret_comment))) {490LOG_WARN("fail to check this task exist for cleaning", KR(tmp_ret), KPC(t));491} else if (need_cleaning492&& OB_SUCCESS != (tmp_ret = task_mgr.async_add_cleaning_task_to_updater(493t->get_task_id(),494t->get_task_key(),495ret_code,496true,/*need_record_event*/497ret_comment))) {498LOG_WARN("do execute over failed", KR(tmp_ret), KPC(t), K(ret_comment));499}500}501}502return ret;503}
504
505int ObDRTaskQueue::finish_schedule(506ObDRTask *task)507{
508int ret = OB_SUCCESS;509if (OB_UNLIKELY(!inited_)) {510ret = OB_NOT_INIT;511LOG_WARN("task queue not init", KR(ret));512} else if (OB_ISNULL(task)) {513ret = OB_INVALID_ARGUMENT;514LOG_WARN("invalid argument", KR(ret), KP(task));515} else if (OB_UNLIKELY(!task->in_schedule())) {516ret = OB_STATE_NOT_MATCH;517LOG_WARN("task state not match", KR(ret), KPC(task));518} else {519// remove from schedule_list_520schedule_list_.remove(task);521FLOG_INFO("[DRTASK_NOTICE] success to finish schedule task", KR(ret), KPC(task));522remove_task_from_map_and_free_it_(task);523}524return ret;525}
526
527int ObDRTaskQueue::set_sibling_in_schedule(528const ObDRTask &task,529const bool in_schedule)530{
531int ret = OB_SUCCESS;532if (OB_UNLIKELY(!inited_)) {533ret = OB_NOT_INIT;534LOG_WARN("not init", KR(ret));535} else {536const ObDRTaskKey &task_key = task.get_task_key();537ObDRTask *my_task = nullptr;538int tmp_ret = task_map_.get_refactored(task_key, my_task);539if (OB_HASH_NOT_EXIST == tmp_ret) {540// bypass541} else if (OB_SUCCESS == tmp_ret) {542if (OB_ISNULL(my_task)) {543ret = OB_ERR_UNEXPECTED;544LOG_WARN("my_task ptr is null", KR(ret), K(task));545} else {546my_task->set_sibling_in_schedule(in_schedule);547}548} else {549ret = tmp_ret;550LOG_WARN("fail to get task from map", KR(ret), K(task_key));551}552}553return ret;554}
555
556int ObDRTaskQueue::set_sibling_in_schedule(557const ObDRTaskKey &task_key,558const bool in_schedule)559{
560int ret = OB_SUCCESS;561if (OB_UNLIKELY(!inited_)) {562ret = OB_NOT_INIT;563LOG_WARN("not init", KR(ret));564} else if (OB_UNLIKELY(!task_key.is_valid())) {565ret = OB_INVALID_ARGUMENT;566LOG_WARN("invalid argument", KR(ret), K(task_key));567} else {568ObDRTask *my_task = nullptr;569int tmp_ret = task_map_.get_refactored(task_key, my_task);570if (OB_HASH_NOT_EXIST == tmp_ret) {571// bypass572} else if (OB_SUCCESS == tmp_ret) {573if (OB_ISNULL(my_task)) {574ret = OB_ERR_UNEXPECTED;575LOG_WARN("my_task ptr is null", KR(ret), K(task_key), KP(my_task));576} else {577my_task->set_sibling_in_schedule(in_schedule);578}579} else {580ret = tmp_ret;581LOG_WARN("fail to get task from map", KR(ret), K(task_key));582}583}584return ret;585}
586
587int ObDRTaskQueue::dump_statistic() const588{
589int ret = OB_SUCCESS;590if (OB_UNLIKELY(!inited_)) {591ret = OB_NOT_INIT;592LOG_WARN("not init", KR(ret));593} else {594DLIST_FOREACH(t, schedule_list_) {595FLOG_INFO("[DRTASK_NOTICE] tasks in schedule list", "priority", get_priority_str(), "task_key", t->get_task_key(),596"task_id", t->get_task_id(), "task_type", t->get_disaster_recovery_task_type());597}598DLIST_FOREACH(t, wait_list_) {599FLOG_INFO("[DRTASK_NOTICE] tasks in wait list", "priority", get_priority_str(), "task_key", t->get_task_key(),600"task_id", t->get_task_id(), "task_type", t->get_disaster_recovery_task_type());601}602}603return ret;604}
605
606int ObDRTaskMgr::init(607const common::ObAddr &server,608common::ObServerConfig &config,609ObDRTaskExecutor &task_executor,610obrpc::ObSrvRpcProxy *rpc_proxy,611common::ObMySQLProxy *sql_proxy,612share::schema::ObMultiVersionSchemaService *schema_service)613{
614int ret = OB_SUCCESS;615static const int64_t thread_count = 1;616if (OB_UNLIKELY(inited_ || !stopped_)) {617ret = OB_INIT_TWICE;618LOG_WARN("init twice", KR(ret), K(inited_), K_(stopped));619} else if (OB_UNLIKELY(!server.is_valid())620|| OB_ISNULL(rpc_proxy)621|| OB_ISNULL(sql_proxy)622|| OB_ISNULL(schema_service)) {623ret = OB_INVALID_ARGUMENT;624LOG_WARN("invalid argument", KR(ret), K(server), KP(rpc_proxy),625KP(sql_proxy), KP(schema_service));626} else if (OB_FAIL(cond_.init(ObWaitEventIds::REBALANCE_TASK_MGR_COND_WAIT))) {627LOG_WARN("fail to init cond", KR(ret));628} else if (OB_FAIL(create(thread_count, "DRTaskMgr"))) {629LOG_WARN("fail to create disaster recovery task mgr", KR(ret));630} else {631config_ = &config;632self_ = server;633task_executor_ = &task_executor;634rpc_proxy_ = rpc_proxy;635sql_proxy_ = sql_proxy;636schema_service_ = schema_service;637if (OB_FAIL(high_task_queue_.init(638config, TASK_QUEUE_LIMIT, rpc_proxy_, ObDRTaskPriority::HIGH_PRI))) {639LOG_WARN("fail to init high priority task queue", KR(ret));640} else if (OB_FAIL(low_task_queue_.init(641config, TASK_QUEUE_LIMIT, rpc_proxy_, ObDRTaskPriority::LOW_PRI))) {642LOG_WARN("fail to init low priority task queue", KR(ret));643} else if (OB_FAIL(disaster_recovery_task_table_updater_.init(sql_proxy, this))) {644LOG_WARN("fail to init a ObDRTaskTableUpdater", KR(ret));645} else {646inited_ = true;647}648}649return ret;650}
651
652int ObDRTaskMgr::start()653{
654int ret = OB_SUCCESS;655if (OB_UNLIKELY(!inited_)) {656ret = OB_NOT_INIT;657LOG_WARN("task mgr not inited", KR(ret), K(inited_));658} else if (OB_UNLIKELY(!stopped_)) {659ret = OB_ERR_UNEXPECTED;660LOG_WARN("can not start ObDRTaskMgr twice", KR(ret), K_(stopped));661} else if (OB_ISNULL(sql_proxy_)) {662ret = OB_INVALID_ARGUMENT;663LOG_WARN("invalid argument", KR(ret), KP(sql_proxy_));664} else if (OB_FAIL(ObRsReentrantThread::start())) {665LOG_WARN("fail to start ObRsReentrantThread", KR(ret));666} else if (OB_FAIL(disaster_recovery_task_table_updater_.start())) {667LOG_WARN("fail to start disaster_recovery_task_table_updater", KR(ret));668} else {669stopped_ = false;670FLOG_INFO("success to start ObDRTaskMgr");671}672return ret;673}
674
675void ObDRTaskMgr::stop()676{
677loaded_ = false;678stopped_ = true;679ObRsReentrantThread::stop();680disaster_recovery_task_table_updater_.stop();681ObThreadCondGuard guard(cond_);682cond_.broadcast();683for (int64_t i = 0; i < static_cast<int64_t>(ObDRTaskPriority::MAX_PRI); ++i) {684queues_[i].reuse();685}686FLOG_INFO("success to stop ObDRTaskMgr");687}
688
689void ObDRTaskMgr::wait()690{
691ObRsReentrantThread::wait();692disaster_recovery_task_table_updater_.wait();693}
694
695int ObDRTaskMgr::check_inner_stat_() const696{
697int ret = OB_SUCCESS;698if (OB_UNLIKELY(!inited_ || stopped_ || !loaded_)) {699ret = OB_NOT_INIT;700LOG_WARN("ObDRTaskMgr is not inited or is stopped or not loaded", KR(ret), K_(inited), K_(stopped), K_(loaded));701}702return ret;703}
704
705void ObDRTaskMgr::run3()706{
707FLOG_INFO("Disaster recovery task mgr start");708if (OB_UNLIKELY(!inited_ || stopped_)) {709LOG_WARN_RET(OB_NOT_INIT, "ObDRTaskMgr not init", K(inited_), K_(stopped));710} else {711int64_t last_dump_ts = ObTimeUtility::current_time();712int64_t last_check_task_in_progress_ts = ObTimeUtility::current_time();713int ret = OB_SUCCESS;714int tmp_ret = OB_SUCCESS;715while (!stop_) {716// thread detect717if (!loaded_ && OB_FAIL(load_task_to_schedule_list_())) {718LOG_WARN("fail to load task infos into schedule list, will retry until success", KR(ret));719} else {720update_last_run_timestamp();721
722common::ObArenaAllocator allocator;723ObDRTask *task = nullptr;724if (OB_FAIL(try_pop_task(allocator, task))) {725LOG_WARN("fail to try pop task", KR(ret));726} else if (OB_NOT_NULL(task)) {727const ObAddr &dst_server = task->get_dst_server();728share::ObServerInfoInTable server_info;729if (OB_FAIL(SVR_TRACER.get_server_info(dst_server, server_info))) {730LOG_WARN("fail to get server_info", KR(ret), K(dst_server));731} else if (server_info.is_permanent_offline()) {732// dest server permanent offline, do not execute this task, just clean it733LOG_INFO("[DRTASK_NOTICE] dest server is permanent offline, task can not execute", K(dst_server), K(server_info));734ObThreadCondGuard guard(cond_);735if (OB_SUCCESS != (tmp_ret = async_add_cleaning_task_to_updater(736task->get_task_id(),737task->get_task_key(),738OB_REBALANCE_TASK_CANT_EXEC,739false/*need_record_event*/,740ObDRTaskRetComment::CANNOT_EXECUTE_DUE_TO_SERVER_PERMANENT_OFFLINE,741false/*reach_data_copy_concurrency*/))) {742LOG_WARN("fail to do execute over", KR(tmp_ret), KPC(task));743}744} else {745if (OB_SUCCESS != (tmp_ret = task->log_execute_start())) {746LOG_WARN("fail to log task start", KR(tmp_ret), KPC(task));747}748if (OB_FAIL(execute_task(*task))) {749LOG_WARN("fail to send", KR(ret), KPC(task));750}751}752free_task_(allocator, task);753} else {754LOG_TRACE("task is nullptr after try_pop_task");755}756if (OB_SUCCESS != (tmp_ret = try_dump_statistic_(757last_dump_ts))) {758LOG_WARN("fail to try dump statistic", KR(tmp_ret), K(last_dump_ts));759}760if (OB_SUCCESS != (tmp_ret = try_clean_not_in_schedule_task_in_schedule_list_(761last_check_task_in_progress_ts))) {762LOG_WARN("fail to try check task in progress", KR(tmp_ret), K(last_check_task_in_progress_ts));763}764}765}766}767FLOG_INFO("disaster task mgr exits");768}
769
770int ObDRTaskMgr::check_task_in_executing(771const ObDRTaskKey &task_key,772const ObDRTaskPriority priority,773bool &task_in_executing)774{
775int ret = OB_SUCCESS;776if (OB_FAIL(check_inner_stat_())) {777LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));778} else if (ObDRTaskPriority::HIGH_PRI != priority779&& ObDRTaskPriority::LOW_PRI != priority) {780ret = OB_INVALID_ARGUMENT;781LOG_WARN("invalid argument", KR(ret), K(priority));782} else {783ObThreadCondGuard guard(cond_);784ObDRTaskQueue &queue = ObDRTaskPriority::LOW_PRI == priority785? low_task_queue_786: high_task_queue_;787if (OB_FAIL(queue.check_task_in_scheduling(task_key, task_in_executing))) {788LOG_WARN("fail to check task exist", KR(ret), K(task_key));789}790}791return ret;792}
793
794int ObDRTaskMgr::check_task_exist(795const ObDRTaskKey &task_key,796const ObDRTaskPriority priority,797bool &task_exist)798{
799int ret = OB_SUCCESS;800if (OB_FAIL(check_inner_stat_())) {801LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));802} else if (ObDRTaskPriority::HIGH_PRI != priority803&& ObDRTaskPriority::LOW_PRI != priority) {804ret = OB_INVALID_ARGUMENT;805LOG_WARN("invalid argument", KR(ret), K(priority));806} else {807ObThreadCondGuard guard(cond_);808ObDRTaskQueue &queue = ObDRTaskPriority::LOW_PRI == priority809? low_task_queue_810: high_task_queue_;811if (OB_FAIL(queue.check_task_exist(task_key, task_exist))) {812LOG_WARN("fail to check task exist", KR(ret), K(task_key));813}814}815return ret;816}
817
818int ObDRTaskMgr::add_task(819const ObDRTask &task)820{
821int ret = OB_SUCCESS;822if (OB_FAIL(check_inner_stat_())) {823ret = OB_NOT_INIT;824LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(loaded), K_(stopped));825} else if (OB_UNLIKELY(!task.is_valid())) {826ret = OB_INVALID_ARGUMENT;827LOG_WARN("invalid dr task", KR(ret), K(task));828} else {829ObThreadCondGuard guard(cond_);830ObDRTaskQueue &queue = task.is_high_priority_task()831? high_task_queue_832: low_task_queue_;833ObDRTaskQueue &sibling_queue = task.is_high_priority_task()834? low_task_queue_835: high_task_queue_;836bool has_task_in_schedule = false;837if (OB_UNLIKELY(queue.task_cnt() >= TASK_QUEUE_LIMIT)) {838ret = OB_SIZE_OVERFLOW;839LOG_WARN("disaster recovery task queue is full", KR(ret), "task_cnt", queue.task_cnt());840} else if (OB_FAIL(queue.push_task_in_wait_list(*this, sibling_queue, task, has_task_in_schedule))) {841if (OB_ENTRY_EXIST != ret) {842LOG_WARN("fail to push task", KR(ret), K(task));843} else {844ret = OB_SUCCESS;845LOG_INFO("task already exist in queue", K(task));846}847} else {848int64_t wait_cnt = 0;849int64_t schedule_cnt = 0;850if (OB_FAIL(inner_get_task_cnt_(wait_cnt, schedule_cnt))) {851LOG_WARN("fail to get task cnt", KR(ret));852} else if (!has_task_in_schedule853&& 0 == get_reach_concurrency_limit()) {854cond_.broadcast();855LOG_INFO("success to broad cast cond_", K(wait_cnt), K(schedule_cnt));856}857clear_reach_concurrency_limit();858LOG_INFO("[DRTASK_NOTICE] add task to disaster recovery task mgr finish", KR(ret), K(task));859}860}861return ret;862}
863
864int ObDRTaskMgr::deal_with_task_reply(865const ObDRTaskReplyResult &reply)866{
867int ret = OB_SUCCESS;868ObDRTaskKey task_key;869if (OB_FAIL(check_inner_stat_())) {870ret = OB_NOT_INIT;871LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(loaded), K_(stopped));872} else if (OB_FAIL(task_key.init(873reply.tenant_id_,874reply.ls_id_.id(),8750, /* set to 0 */8760, /* set to 0 */877ObDRTaskKeyType::FORMAL_DR_KEY))) {878LOG_WARN("fail to init task key", KR(ret), K(reply));879} else {880int tmp_ret = OB_SUCCESS;881ObDRTask *task = nullptr;882ObThreadCondGuard guard(cond_);883if (OB_SUCCESS != (tmp_ret = get_task_by_id_(reply.task_id_, task_key, task))) {884if (OB_ENTRY_NOT_EXIST == tmp_ret) {885// task not exist, try record this reply result886ROOTSERVICE_EVENT_ADD("disaster_recovery", "finish_disaster_recovery_task",887"tenant_id", reply.tenant_id_,888"ls_id", reply.ls_id_.id(),889"task_id", reply.task_id_,890"execute_result", reply.result_,891"ret_comment", ob_disaster_recovery_task_ret_comment_strs(ObDRTaskRetComment::RECEIVE_FROM_STORAGE_RPC));892} else {893LOG_WARN("fail to get task from task manager", KR(tmp_ret), K(reply), K(task_key));894}895} else if (OB_SUCCESS != (tmp_ret = task->log_execute_result(reply.result_, ObDRTaskRetComment::RECEIVE_FROM_STORAGE_RPC))){896LOG_WARN("fail to log execute result", KR(tmp_ret), K(reply));897}898
899if (OB_FAIL(async_add_cleaning_task_to_updater(900reply.task_id_,901task_key,902reply.result_,903false,/*need_record_event*/904ObDRTaskRetComment::RECEIVE_FROM_STORAGE_RPC,905true/*need_clear_server_data_in_limit*/))) {906LOG_WARN("fail to do execute over", KR(ret), K(reply));907}908}909return ret;910}
911
912int ObDRTaskMgr::async_add_cleaning_task_to_updater(913const share::ObTaskId &task_id,914const ObDRTaskKey &task_key,915const int ret_code,916const bool need_record_event,917const ObDRTaskRetComment &ret_comment,918const bool need_clear_server_data_in_limit)919{
920int ret = OB_SUCCESS;921ObDRTask *task = nullptr;922if (OB_FAIL(check_inner_stat_())) {923LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));924} else if (OB_FAIL(get_task_by_id_(task_id, task_key, task))) {925if (OB_ENTRY_NOT_EXIST == ret) {926LOG_WARN("fail to get task, task may be cleaned earlier", KR(ret), K(task_id), K(task_key));927ret = OB_SUCCESS;928} else {929LOG_WARN("fail to get task from task manager", KR(ret), K(task_id), K(task_key));930}931}932if (OB_SUCC(ret)933&& OB_NOT_NULL(task)934&& OB_FAIL(disaster_recovery_task_table_updater_.async_update(935task->get_tenant_id(),936task->get_ls_id(),937task->get_disaster_recovery_task_type(),938task_key,939ret_code,940need_clear_server_data_in_limit,941task_id,942need_record_event,943ret_comment))) {944LOG_WARN("fail to async update a dr task", KR(ret), "tenant_id", task->get_tenant_id(),945"ls_id", task->get_ls_id(), K(task_id), K(need_record_event), K(ret_comment));946}947return ret;948}
949
950int ObDRTaskMgr::do_cleaning(951const share::ObTaskId &task_id,952const ObDRTaskKey &task_key,953const int ret_code,954const bool need_clear_server_data_in_limit,955const bool need_record_event,956const ObDRTaskRetComment &ret_comment)957{
958int ret = OB_SUCCESS;959ObThreadCondGuard guard(cond_);960if (OB_FAIL(check_inner_stat_())) {961LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));962} else {963ObDRTaskQueue *task_queue = nullptr;964ObDRTask *task = nullptr;965common::ObAddr dst_server;966for (int64_t i = 0; OB_SUCC(ret) && i < ARRAYSIZEOF(queues_); ++i) {967if (OB_FAIL(queues_[i].get_task(task_id, task_key, task))) {968LOG_WARN("fail to get schedule task from queue", KR(ret), "priority", queues_[i].get_priority_str());969} else if (OB_NOT_NULL(task)) {970task_queue = &queues_[i];971break;972}973}974
975if (OB_SUCC(ret)) {976if (OB_ISNULL(task)) {977LOG_INFO("in schedule taks not found, maybe not sync because of network traffic",978K(task_id), K(task_key), K(ret_code));979} else {980if (need_record_event) {981(void)log_task_result(*task, ret_code, ret_comment);982}983dst_server = task->get_dst_server();984if (OB_FAIL(set_sibling_in_schedule(*task, false/* not in schedule*/))) {985LOG_WARN("fail to set sibling in schedule", KR(ret), KPC(task));986} else if (OB_ISNULL(task_queue)) {987LOG_INFO("task_queue is null"); // by pass988} else if (OB_FAIL(task_queue->finish_schedule(task))) {989LOG_WARN("fail to finish scheduling task", KR(ret), KPC(task));990}991}992clear_reach_concurrency_limit();993}994}995return ret;996}
997
998int ObDRTaskMgr::get_all_task_count(999int64_t &high_wait_cnt,1000int64_t &high_schedule_cnt,1001int64_t &low_wait_cnt,1002int64_t &low_schedule_cnt)1003{
1004int ret = OB_SUCCESS;1005if (OB_FAIL(check_inner_stat_())) {1006ret = OB_NOT_INIT;1007LOG_WARN("not init", KR(ret), K_(inited), K_(loaded), K_(stopped));1008} else {1009ObThreadCondGuard guard(cond_);1010high_wait_cnt = get_high_priority_queue_().get_wait_list().get_size();1011high_schedule_cnt = get_high_priority_queue_().get_schedule_list().get_size();1012low_wait_cnt = get_low_priority_queue_().get_wait_list().get_size();1013low_schedule_cnt = get_low_priority_queue_().get_schedule_list().get_size();1014}1015return ret;1016}
1017
1018int ObDRTaskMgr::log_task_result(1019const ObDRTask &task,1020const int ret_code,1021const ObDRTaskRetComment &ret_comment)1022{
1023int ret = OB_SUCCESS;1024if (OB_FAIL(task.log_execute_result(ret_code, ret_comment))) {1025LOG_WARN("fail to log execute task", KR(ret), K(task), KR(ret_code), K(ret_comment));1026}1027return ret;1028}
1029
1030int ObDRTaskMgr::get_task_by_id_(1031const share::ObTaskId &task_id,1032const ObDRTaskKey &task_key,1033ObDRTask *&task)1034{
1035int ret = OB_SUCCESS;1036ObDRTask *task_to_get = nullptr;1037void *raw_ptr = nullptr;1038for (int64_t i = 0; OB_SUCC(ret) && i < ARRAYSIZEOF(queues_); ++i) {1039if (OB_FAIL(queues_[i].get_task(task_id, task_key, task_to_get))) {1040LOG_WARN("fail to get schedule task from queue", KR(ret), "priority", queues_[i].get_priority_str());1041} else if (OB_NOT_NULL(task_to_get)) {1042break;1043}1044}1045if (OB_SUCC(ret) && OB_ISNULL(task_to_get)) {1046task = nullptr;1047ret = OB_ENTRY_NOT_EXIST;1048LOG_WARN("task not exist, maybe cleaned earier", KR(ret), K(task_id), K(task_key));1049} else {1050task = task_to_get;1051}1052return ret;1053}
1054
1055void ObDRTaskMgr::free_task_(1056common::ObIAllocator &allocator,1057ObDRTask *&task)1058{
1059if (OB_NOT_NULL(task)) {1060task->~ObDRTask();1061allocator.free(task);1062task = nullptr;1063}1064}
1065
1066int ObDRTaskMgr::load_task_to_schedule_list_()1067{
1068int ret = OB_SUCCESS;1069int tmp_ret = OB_SUCCESS;1070ObThreadCondGuard guard(cond_);1071ObArray<uint64_t> tenant_id_array;1072
1073if (OB_UNLIKELY(!inited_ || stopped_)) {1074LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped));1075} else if (OB_ISNULL(schema_service_) || OB_ISNULL(sql_proxy_)) {1076ret = OB_INVALID_ARGUMENT;1077LOG_WARN("schema_service_ or sql_proxy_ is nullptr", KR(ret), KP(schema_service_), KP(sql_proxy_));1078} else if (OB_UNLIKELY(ObTenantUtils::get_tenant_ids(schema_service_, tenant_id_array))) {1079LOG_WARN("fail to get tenant id array", KR(ret));1080} else {1081// clear schedule_list and wait_list in two queues1082for (int64_t i = 0; i < static_cast<int64_t>(ObDRTaskPriority::MAX_PRI); ++i) {1083queues_[i].reuse();1084}1085clear_reach_concurrency_limit();1086for (int64_t i = 0; OB_SUCC(ret) && i < tenant_id_array.count(); ++i) {1087// load this tenant's task info into schedule_list1088// TODO@jingyu.cr: need to isolate different tenant1089const uint64_t tenant_id = tenant_id_array.at(i);1090const uint64_t sql_tenant_id = gen_meta_tenant_id(tenant_id);1091ObSqlString sql;1092ObTimeoutCtx ctx;1093SMART_VAR(ObISQLClient::ReadResult, result) {1094if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {1095LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx));1096} else if (OB_FAIL(sql.append_fmt(1097"SELECT * FROM %s WHERE tenant_id = %ld",1098share::OB_ALL_LS_REPLICA_TASK_TNAME, tenant_id))) {1099LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(sql_tenant_id));1100} else if (OB_FAIL(sql_proxy_->read(result, sql_tenant_id, sql.ptr()))) {1101LOG_WARN("execute sql failed", KR(ret),1102K(tenant_id), K(sql_tenant_id), "sql", sql.ptr());1103} else if (OB_ISNULL(result.get_result())) {1104ret = OB_ERR_UNEXPECTED;1105LOG_WARN("get mysql result failed", KR(ret), "sql", sql.ptr());1106} else if (OB_FAIL(load_single_tenant_task_infos_(*result.get_result()))) {1107LOG_WARN("load single tenant's task info failed", KR(ret), K(tenant_id), K(sql_tenant_id));1108} else {1109FLOG_INFO("success to load single tenant's task info", K(tenant_id));1110}1111}1112}1113if (OB_SUCC(ret)) {1114loaded_ = true;1115}1116}1117return ret;1118}
1119
1120int ObDRTaskMgr::load_single_tenant_task_infos_(1121sqlclient::ObMySQLResult &res)1122{
1123int ret = OB_SUCCESS;1124if (OB_UNLIKELY(!inited_ || stopped_)) {1125LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped));1126} else {1127while (OB_SUCC(ret)) {1128if (OB_FAIL(res.next())) {1129if (OB_ITER_END == ret) {1130ret = OB_SUCCESS;1131} else {1132LOG_WARN("get next result failed", KR(ret));1133}1134break;1135} else if (OB_FAIL(load_task_info_(res))) {1136LOG_WARN("fail to build and load this task info", KR(ret));1137}1138}1139}1140return ret;1141}
1142
1143int ObDRTaskMgr::load_task_info_(1144sqlclient::ObMySQLResult &res)1145{
1146int ret = OB_SUCCESS;1147if (OB_UNLIKELY(!inited_ || stopped_)) {1148LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped));1149} else {1150common::ObString task_type;1151int64_t priority = 2;1152(void)GET_COL_IGNORE_NULL(res.get_varchar, "task_type", task_type);1153(void)GET_COL_IGNORE_NULL(res.get_int, "priority", priority);1154if (OB_FAIL(ret)) {1155} else if (task_type == common::ObString("MIGRATE REPLICA")) {1156SMART_VAR(ObMigrateLSReplicaTask, tmp_task) {1157if (OB_FAIL(tmp_task.build_task_from_sql_result(res))) {1158LOG_WARN("fail to build migrate task info from res", KR(ret));1159} else if (OB_FAIL(queues_[priority].push_task_in_schedule_list(tmp_task))) {1160LOG_WARN("fail to load a ObMigrateLSReplicaTask into schedule list", KR(ret));1161}1162}1163} else if (task_type == common::ObString("ADD REPLICA")) {1164SMART_VAR(ObAddLSReplicaTask, tmp_task) {1165if (OB_FAIL(tmp_task.build_task_from_sql_result(res))) {1166LOG_WARN("fail to build ObAddLSReplicaTask from res", KR(ret));1167} else if (OB_FAIL(queues_[priority].push_task_in_schedule_list(tmp_task))) {1168LOG_WARN("fail to load ObAddLSReplicaTask into schedule list", KR(ret));1169}1170}1171} else if (task_type == common::ObString("TYPE TRANSFORM")) {1172SMART_VAR(ObLSTypeTransformTask, tmp_task) {1173if (OB_FAIL(tmp_task.build_task_from_sql_result(res))) {1174LOG_WARN("fail to build ObLSTypeTransformTask from res", KR(ret));1175} else if (OB_FAIL(queues_[priority].push_task_in_schedule_list(tmp_task))) {1176LOG_WARN("fail to load ObLSTypeTransformTask into schedule list", KR(ret));1177}1178}1179} else if (0 == task_type.case_compare(ob_disaster_recovery_task_type_strs(ObDRTaskType::LS_REMOVE_NON_PAXOS_REPLICA))1180|| 0 == task_type.case_compare(ob_disaster_recovery_task_type_strs(ObDRTaskType::LS_REMOVE_PAXOS_REPLICA))) {1181SMART_VAR(ObRemoveLSReplicaTask, tmp_task) {1182if (OB_FAIL(tmp_task.build_task_from_sql_result(res))) {1183LOG_WARN("fail to build ObRemoveLSReplicaTask from res", KR(ret));1184} else if (OB_FAIL(queues_[priority].push_task_in_schedule_list(tmp_task))) {1185LOG_WARN("fail to load ObRemoveLSReplicaTask into schedule list", KR(ret));1186}1187}1188} else if (task_type == common::ObString("MODIFY PAXOS REPLICA NUMBER")) {1189SMART_VAR(ObLSModifyPaxosReplicaNumberTask, tmp_task) {1190if (OB_FAIL(tmp_task.build_task_from_sql_result(res))) {1191LOG_WARN("fail to build ObLSModifyPaxosReplicaNumberTask from res", KR(ret));1192} else if (OB_FAIL(queues_[priority].push_task_in_schedule_list(tmp_task))) {1193LOG_WARN("fail to load ObLSModifyPaxosReplicaNumberTask into schedule list", KR(ret));1194}1195}1196} else {1197ret = OB_INVALID_ARGUMENT;1198LOG_WARN("unexpected task type", KR(ret), K(task_type));1199}1200}1201return ret;1202}
1203
1204int ObDRTaskMgr::persist_task_info_(1205const ObDRTask &task,1206ObDRTaskRetComment &ret_comment)1207{
1208int ret = OB_SUCCESS;1209ret_comment = ObDRTaskRetComment::MAX;1210share::ObDMLSqlSplicer dml;1211ObSqlString sql;1212int64_t affected_rows = 0;1213const uint64_t sql_tenant_id = gen_meta_tenant_id(task.get_tenant_id());1214ObMySQLTransaction trans;1215const int64_t timeout = GCONF.internal_sql_execute_timeout;1216observer::ObInnerSQLConnection *conn = NULL;1217ObConflictCaseWithClone case_to_check(ObConflictCaseWithClone::MODIFY_REPLICA);1218
1219if (OB_FAIL(check_inner_stat_())) {1220LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));1221} else if (OB_ISNULL(sql_proxy_)) {1222ret = OB_INVALID_ARGUMENT;1223LOG_WARN("invalid argument", KR(ret));1224} else if (OB_FAIL(trans.start(sql_proxy_, sql_tenant_id))) {1225LOG_WARN("failed to start trans", KR(ret), K(sql_tenant_id));1226} else if (OB_FAIL(task.fill_dml_splicer(dml))) {1227LOG_WARN("fill dml splicer failed", KR(ret));1228} else if (OB_FAIL(dml.splice_insert_sql(share::OB_ALL_LS_REPLICA_TASK_TNAME, sql))) {1229LOG_WARN("fail to splice batch insert update sql", KR(ret), K(sql));1230} else if (OB_ISNULL(conn = static_cast<observer::ObInnerSQLConnection *>(trans.get_connection()))) {1231ret = OB_ERR_UNEXPECTED;1232LOG_WARN("conn_ is NULL", KR(ret));1233} else if (OB_FAIL(ObInnerConnectionLockUtil::lock_table(sql_tenant_id,1234OB_ALL_LS_REPLICA_TASK_TID,1235EXCLUSIVE,1236timeout,1237conn))) {1238LOG_WARN("lock dest table failed", KR(ret), K(sql_tenant_id));1239} else if (OB_FAIL(ObTenantSnapshotUtil::check_tenant_not_in_cloning_procedure(task.get_tenant_id(), case_to_check))) {1240LOG_WARN("fail to check whether tenant is in cloning procedure", KR(ret));1241ret_comment = CANNOT_PERSIST_TASK_DUE_TO_CLONE_CONFLICT;1242} else if (OB_FAIL(trans.write(sql_tenant_id, sql.ptr(), affected_rows))) {1243LOG_WARN("execute sql failed", KR(ret), "tenant_id",task.get_tenant_id(), K(sql_tenant_id), K(sql));1244}1245if (trans.is_started()) {1246int tmp_ret = OB_SUCCESS;1247if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {1248LOG_WARN("trans end failed", KR(tmp_ret), KR(ret));1249ret = OB_SUCC(ret) ? tmp_ret : ret;1250}1251}1252FLOG_INFO("[DRTASK_NOTICE] finish persist task into inner table", KR(ret), K(task));1253return ret;1254}
1255
1256int ObDRTaskMgr::try_dump_statistic_(1257int64_t &last_dump_ts) const1258{
1259int ret = OB_SUCCESS;1260if (OB_FAIL(check_inner_stat_())) {1261LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));1262} else {1263ObThreadCondGuard guard(cond_);1264const int64_t now = ObTimeUtility::current_time();1265if (now > last_dump_ts + config_->balancer_log_interval) {1266last_dump_ts = now;1267int tmp_ret = inner_dump_statistic_();1268if (OB_SUCCESS != tmp_ret) {1269LOG_WARN("task manager dump statistics failed", KR(tmp_ret));1270}1271};1272}1273return ret;1274}
1275
1276int ObDRTaskMgr::inner_dump_statistic_() const1277{
1278int ret = OB_SUCCESS;1279if (OB_FAIL(check_inner_stat_())) {1280LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));1281} else {1282LOG_INFO("[DRTASK_NOTICE] disaster recovery task manager statistics",1283"waiting_high_priority_task_cnt", high_task_queue_.wait_task_cnt(),1284"executing_high_priority_task_cnt", high_task_queue_.in_schedule_task_cnt(),1285"waiting_low_priority_task_cnt", low_task_queue_.wait_task_cnt(),1286"executing_low_priority_task_cnt", low_task_queue_.in_schedule_task_cnt());1287for (int64_t i = 0; i < ARRAYSIZEOF(queues_); ++i) {1288// ignore error to make sure checking two queues1289if (OB_FAIL(queues_[i].dump_statistic())) {1290LOG_WARN("fail to dump statistic for this queue", KR(ret), "priority", queues_[i].get_priority_str());1291}1292}1293}1294return ret;1295}
1296
1297int ObDRTaskMgr::try_clean_not_in_schedule_task_in_schedule_list_(1298int64_t &last_check_task_in_progress_ts)1299{
1300int ret = OB_SUCCESS;1301if (OB_FAIL(check_inner_stat_())) {1302LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));1303} else {1304int64_t wait = 0;1305int64_t schedule = 0;1306ObThreadCondGuard guard(cond_);1307if (OB_FAIL(inner_get_task_cnt_(wait, schedule))) {1308LOG_WARN("fail to get task cnt", KR(ret));1309} else if (schedule <= 0) {1310// bypass1311} else {1312const int64_t now = ObTimeUtility::current_time();1313if (now > last_check_task_in_progress_ts + schedule * CHECK_IN_PROGRESS_INTERVAL_PER_TASK) {1314last_check_task_in_progress_ts = now;1315int tmp_ret = inner_clean_not_in_schedule_task_in_schedule_list_();1316if (OB_SUCCESS != tmp_ret) {1317LOG_WARN("fail to do check task in progress", KR(tmp_ret));1318}1319}1320}1321}1322return ret;1323}
1324
1325int ObDRTaskMgr::inner_clean_not_in_schedule_task_in_schedule_list_()1326{
1327int ret = OB_SUCCESS;1328if (OB_FAIL(check_inner_stat_())) {1329LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));1330} else {1331for (int64_t i = 0; i < ARRAYSIZEOF(queues_); ++i) {1332// ignore error to make sure checking two queues1333if (OB_FAIL(queues_[i].handle_not_in_progress_task(*this))) {1334LOG_WARN("fail to handle not in progress task in this queue", KR(ret),1335"priority", queues_[i].get_priority_str());1336}1337}1338}1339FLOG_INFO("finish inner check task in progress", KR(ret));1340return ret;1341}
1342
1343int ObDRTaskMgr::inner_get_task_cnt_(1344int64_t &wait_cnt,1345int64_t &in_schedule_cnt) const1346{
1347int ret = OB_SUCCESS;1348if (OB_FAIL(check_inner_stat_())) {1349LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));1350} else {1351wait_cnt = 0;1352in_schedule_cnt = 0;1353for (int64_t i = 0; i < ARRAYSIZEOF(queues_); ++i) {1354wait_cnt += queues_[i].wait_task_cnt();1355in_schedule_cnt += queues_[i].in_schedule_task_cnt();1356}1357}1358return ret;1359}
1360
1361int ObDRTaskMgr::try_pop_task(1362common::ObIAllocator &allocator,1363ObDRTask *&task)1364{
1365int ret = OB_SUCCESS;1366ObThreadCondGuard guard(cond_);1367int64_t wait_cnt = 0;1368int64_t in_schedule_cnt = 0;1369ObDRTask *my_task = nullptr;1370void *raw_ptr = nullptr;1371if (OB_FAIL(check_inner_stat_())) {1372LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));1373} else if (OB_FAIL(inner_get_task_cnt_(wait_cnt, in_schedule_cnt))) {1374LOG_WARN("fail to get task cnt", KR(ret));1375} else if (wait_cnt > 01376&& 0 == concurrency_limited_ts_) {1377if (OB_FAIL(pop_task(my_task))) {1378LOG_WARN("fail to pop task", KR(ret));1379} else if (OB_ISNULL(my_task)) {1380task = nullptr;1381} else if (OB_ISNULL(raw_ptr = allocator.alloc(my_task->get_clone_size()))) {1382ret = OB_ALLOCATE_MEMORY_FAILED;1383LOG_WARN("fail to allocate task", KR(ret));1384} else if (OB_FAIL(my_task->clone(raw_ptr, task))) {1385LOG_WARN("fail to clone task", KR(ret), "source_task", *my_task);1386} else if (OB_ISNULL(task)) {1387ret = OB_ERR_UNEXPECTED;1388LOG_WARN("task ptr is null", KR(ret));1389} else {1390my_task->set_execute_time(ObTimeUtility::current_time());1391}1392
1393if (OB_FAIL(ret)) {1394if (OB_NOT_NULL(task)) {1395free_task_(allocator, task);1396} else if (OB_NOT_NULL(raw_ptr)) {1397allocator.free(raw_ptr);1398raw_ptr = nullptr;1399}1400}1401} else {1402int64_t now = ObTimeUtility::current_time();1403cond_.wait(get_schedule_interval());1404if (get_reach_concurrency_limit() + CONCURRENCY_LIMIT_INTERVAL < now) {1405clear_reach_concurrency_limit();1406LOG_TRACE("success to clear concurrency limit");1407}1408}1409if (OB_SUCC(ret) && OB_NOT_NULL(task)) {1410LOG_INFO("[DRTASK_NOTICE] success to pop a task", KPC(task), K_(concurrency_limited_ts),1411K(in_schedule_cnt));1412}1413return ret;1414}
1415
1416int ObDRTaskMgr::pop_task(1417ObDRTask *&task)1418{
1419int ret = OB_SUCCESS;1420int64_t wait_cnt = 0;1421if (OB_FAIL(check_inner_stat_())) {1422LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));1423} else {1424task = nullptr;1425for (int64_t i = 0; OB_SUCC(ret) && i < ARRAYSIZEOF(queues_); ++i) {1426if (queues_[i].wait_task_cnt() > 0) {1427wait_cnt += queues_[i].wait_task_cnt();1428if (OB_FAIL(queues_[i].pop_task(task))) {1429LOG_WARN("pop_task from queue failed", KR(ret), "priority", queues_[i].get_priority_str());1430} else if (OB_NOT_NULL(task)) {1431break;1432}1433}1434}1435if (OB_SUCC(ret)) {1436if (OB_ISNULL(task)) {1437if (wait_cnt > 0) {1438set_reach_concurrency_limit();1439}1440} else {1441const bool in_schedule = true;1442if (OB_FAIL(set_sibling_in_schedule(*task, in_schedule))) {1443LOG_WARN("set sibling in schedule failed", KR(ret), KPC(task));1444}1445}1446}1447}1448return ret;1449}
1450
1451int ObDRTaskMgr::execute_task(1452const ObDRTask &task)1453{
1454int ret = OB_SUCCESS;1455ObCurTraceId::init(self_);1456FLOG_INFO("execute disaster recovery task", K(task));1457int dummy_ret = OB_SUCCESS;1458ObDRTaskRetComment ret_comment = ObDRTaskRetComment::MAX;1459if (OB_FAIL(check_inner_stat_())) {1460LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));1461} else if (OB_FAIL(persist_task_info_(task, ret_comment))) {1462LOG_WARN("fail to persist task info into table", KR(ret));1463} else if (OB_FAIL(task_executor_->execute(task, dummy_ret, ret_comment))) {1464LOG_WARN("fail to execute disaster recovery task", KR(ret));1465}1466if (OB_FAIL(ret)) {1467//TODO@jingyu.cr:1468// (1) use rwlock instead of threadcond1469// (2) deal with block in status1470(void)log_task_result(task, ret, ret_comment);1471ObThreadCondGuard guard(cond_);1472const bool data_in_limit = (OB_REACH_SERVER_DATA_COPY_IN_CONCURRENCY_LIMIT == ret);1473if (OB_SUCCESS != async_add_cleaning_task_to_updater(1474task.get_task_id(),1475task.get_task_key(),1476ret,1477false,/*need_record_event*/1478ret_comment,1479!data_in_limit)) {1480LOG_WARN("fail to do execute over", KR(ret), K(task));1481}1482}1483return ret;1484}
1485
1486int ObDRTaskMgr::set_sibling_in_schedule(1487const ObDRTask &task,1488const bool in_schedule)1489{
1490int ret = OB_SUCCESS;1491if (OB_FAIL(check_inner_stat_())) {1492LOG_WARN("fail to check inner stat", KR(ret), K_(inited), K_(stopped), K_(loaded));1493} else {1494for (int64_t i = 0; OB_SUCC(ret) && i < ARRAYSIZEOF(queues_); ++i) {1495if (OB_FAIL(queues_[i].set_sibling_in_schedule(task, in_schedule))) {1496if (i == 0) {1497LOG_WARN("fail to set sibling in schedule in high priority queue", KR(ret), K(task));1498} else {1499LOG_WARN("fail to set sibling in schedule in low priority queue", KR(ret), K(task));1500}1501}1502}1503}1504return ret;1505}
1506} // end namespace rootserver1507} // end namespace oceanbase1508