oceanbase
892 строки · 35.4 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS14
15#include "ob_recover_table_job_scheduler.h"16#include "rootserver/ob_rs_event_history_table_operator.h"17#include "rootserver/restore/ob_recover_table_initiator.h"18#include "rootserver/restore/ob_restore_service.h"19#include "share/backup/ob_backup_data_table_operator.h"20#include "share/ob_primary_standby_service.h"21#include "share/location_cache/ob_location_service.h"22#include "share/restore/ob_physical_restore_table_operator.h"23#include "share/restore/ob_import_util.h"24#include "storage/tablelock/ob_lock_inner_connection_util.h"25#include "observer/ob_inner_sql_connection.h"26
27using namespace oceanbase;28using namespace rootserver;29using namespace share;30
31void ObRecoverTableJobScheduler::reset()32{
33rs_rpc_proxy_ = nullptr;34sql_proxy_ = nullptr;35schema_service_ = nullptr;36srv_rpc_proxy_ = nullptr;37is_inited_ = false;38tenant_id_ = OB_INVALID_TENANT_ID;39}
40
41int ObRecoverTableJobScheduler::init(42share::schema::ObMultiVersionSchemaService &schema_service,43common::ObMySQLProxy &sql_proxy,44obrpc::ObCommonRpcProxy &rs_rpc_proxy,45obrpc::ObSrvRpcProxy &srv_rpc_proxy)46{
47int ret = OB_SUCCESS;48const uint64_t tenant_id = gen_user_tenant_id(MTL_ID());49if (IS_INIT) {50ret = OB_INIT_TWICE;51LOG_WARN("ObRecoverTableJobScheduler init twice", K(ret));52} else if (OB_FAIL(helper_.init(tenant_id))) {53LOG_WARN("failed to init table op", K(ret), K(tenant_id));54} else {55schema_service_ = &schema_service;56sql_proxy_ = &sql_proxy;57rs_rpc_proxy_ = &rs_rpc_proxy;58srv_rpc_proxy_ = &srv_rpc_proxy;59tenant_id_ = tenant_id;60is_inited_ = true;61}62return ret;63}
64
65void ObRecoverTableJobScheduler::wakeup_()66{
67ObRestoreService *restore_service = nullptr;68if (OB_ISNULL(restore_service = MTL(ObRestoreService *))) {69LOG_ERROR_RET(OB_ERR_UNEXPECTED, "restore service must not be null");70} else {71restore_service->wakeup();72}73}
74
75void ObRecoverTableJobScheduler::do_work()76{
77int ret = OB_SUCCESS;78ObArray<share::ObRecoverTableJob> jobs;79if (IS_NOT_INIT) {80ret = OB_NOT_INIT;81LOG_WARN("not init ObSysRecoverTableJobScheduler", K(ret));82} else if (OB_FAIL(check_compatible_())) {83LOG_WARN("check compatible failed", K(ret));84} else if (OB_FAIL(helper_.get_all_recover_table_job(*sql_proxy_, jobs))) {85LOG_WARN("failed to get recover all recover table job", K(ret));86} else {87ObCurTraceId::init(GCTX.self_addr());88ARRAY_FOREACH(jobs, i) {89ObRecoverTableJob &job = jobs.at(i);90if (!job.is_valid()) {91ret = OB_ERR_UNEXPECTED;92LOG_WARN("recover table job is not valid", K(ret), K(job));93} else if (is_sys_tenant(job.get_tenant_id())) {94sys_process_(job);95} else if (is_user_tenant(job.get_tenant_id())) {96user_process_(job);97} else {98ret = OB_ERR_UNEXPECTED;99LOG_WARN("invalid tenant", K(ret), K(job));100}101}102}103
104}
105
106int ObRecoverTableJobScheduler::check_compatible_() const107{
108int ret = OB_SUCCESS;109uint64_t data_version = 0;110if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, data_version))) {111LOG_WARN("fail to get data version", K(ret), K_(tenant_id));112} else if (data_version < DATA_VERSION_4_2_1_0) {113ret = OB_OP_NOT_ALLOW;114LOG_WARN("min data version is smaller than v4.2.1", K(ret), K_(tenant_id), K(data_version));115} else if (is_sys_tenant(tenant_id_)) {116} else if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(tenant_id_), data_version))) {117LOG_WARN("fail to get data version", K(ret), "tenant_id", gen_meta_tenant_id(tenant_id_));118} else if (data_version < DATA_VERSION_4_2_1_0) {119ret = OB_OP_NOT_ALLOW;120LOG_WARN("min data version is smaller than v4.2.1", K(ret), K_(tenant_id), K(data_version));121}122
123return ret;124}
125
126int ObRecoverTableJobScheduler::try_advance_status_(share::ObRecoverTableJob &job, const int err_code)127{
128int ret = OB_SUCCESS;129share::ObRecoverTableStatus next_status;130const uint64_t tenant_id = job.get_tenant_id();131const int64_t job_id = job.get_job_id();132bool need_advance_status = true;133if (err_code != OB_SUCCESS) {134if (ObImportTableUtil::can_retrieable_err(err_code)) {135need_advance_status = false;136} else {137share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());138next_status = ObRecoverTableStatus::FAILED;139if (job.get_result().is_comment_setted()) {140} else if (OB_FAIL(job.get_result().set_result(141err_code, trace_id, GCONF.self_addr_))) {142LOG_WARN("failed to set result", K(ret));143}144LOG_WARN("[RECOVER_TABLE]recover table job failed", K(err_code), K(job));145ROOTSERVICE_EVENT_ADD("recover_table", "recover_table_failed", K(tenant_id), K(job_id), K(err_code), K(trace_id));146}147} else if (job.get_tenant_id() == OB_SYS_TENANT_ID) {148next_status = ObRecoverTableStatus::get_sys_next_status(job.get_status());149} else {150next_status = ObRecoverTableStatus::get_user_next_status(job.get_status());151}152if (next_status.is_finish()) {153job.set_end_ts(ObTimeUtility::current_time());154}155if (OB_FAIL(ret)) {156} else if (need_advance_status && OB_FAIL(helper_.advance_status(*sql_proxy_, job, next_status))) {157LOG_WARN("failed to advance statsu", K(ret), K(job), K(next_status));158} else {159wakeup_();160ROOTSERVICE_EVENT_ADD("recover_table", "advance_status", K(tenant_id), K(job_id), K(next_status));161}162return ret;163}
164
165void ObRecoverTableJobScheduler::sys_process_(share::ObRecoverTableJob &job)166{
167int ret = OB_SUCCESS;168LOG_INFO("ready to schedule sys recover table job", K(job));169switch(job.get_status()) {170case ObRecoverTableStatus::Status::PREPARE: {171if (OB_FAIL(sys_prepare_(job))) {172LOG_WARN("failed to do sys prepare work", K(ret), K(job));173}174break;175}176case ObRecoverTableStatus::Status::RECOVERING: {177if (OB_FAIL(recovering_(job))) {178LOG_WARN("failed to do sys recovering work", K(ret), K(job));179}180break;181}182case ObRecoverTableStatus::Status::COMPLETED:183case ObRecoverTableStatus::Status::FAILED: {184if (OB_FAIL(sys_finish_(job))) {185LOG_WARN("failed to do sys finish work", K(ret), K(job));186}187break;188}189default: {190ret = OB_ERR_SYS;191LOG_WARN("invalid sys recover job status", K(ret), K(job));192break;193}194}195}
196
197int ObRecoverTableJobScheduler::check_target_tenant_version_(share::ObRecoverTableJob &job)198{
199int ret = OB_SUCCESS;200uint64_t data_version = 0;201const uint64_t target_tenant_id = job.get_target_tenant_id();202// check data version203if (OB_FAIL(GET_MIN_DATA_VERSION(target_tenant_id, data_version))) {204LOG_WARN("fail to get data version", K(ret), K(target_tenant_id));205} else if (data_version < DATA_VERSION_4_2_1_0) {206ret = OB_OP_NOT_ALLOW;207LOG_WARN("min data version is smaller than v4.2.1", K(ret), K(target_tenant_id), K(data_version));208} else if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(target_tenant_id), data_version))) {209LOG_WARN("fail to get data version", K(ret), "target_tenant_id", gen_meta_tenant_id(target_tenant_id));210} else if (data_version < DATA_VERSION_4_2_1_0) {211ret = OB_OP_NOT_ALLOW;212LOG_WARN("min data version is smaller than v4.2.1", K(ret), K(target_tenant_id), K(data_version));213}214
215if (OB_FAIL(ret)) {216int tmp_ret = OB_SUCCESS;217schema::ObSchemaGetterGuard guard;218if (OB_TMP_FAIL(ObImportTableUtil::get_tenant_schema_guard(*schema_service_, job.get_target_tenant_id(), guard))) {219if (OB_TENANT_NOT_EXIST == tmp_ret) {220ret = tmp_ret;221}222LOG_WARN("failed to get tenant schema guard", K(tmp_ret));223}224}225return ret;226}
227
228int ObRecoverTableJobScheduler::sys_prepare_(share::ObRecoverTableJob &job)229{
230int ret = OB_SUCCESS;231ObRecoverTableJob target_job;232share::ObRecoverTablePersistHelper helper;233DEBUG_SYNC(BEFORE_INSERT_UERR_RECOVER_TABLE_JOB);234if (OB_FAIL(check_target_tenant_version_(job))) {235LOG_WARN("failed to check target tenant version", K(ret));236} else if (OB_FAIL(helper.init(job.get_target_tenant_id()))) {237LOG_WARN("failed to init recover table persist helper", K(ret));238}239
240ObMySQLTransaction trans;241const uint64_t meta_tenant_id = gen_meta_tenant_id(job.get_target_tenant_id());242if (FAILEDx(trans.start(sql_proxy_, meta_tenant_id))) {243LOG_WARN("failed to start trans", K(ret));244} else if (OB_FAIL(lock_recover_table_(meta_tenant_id, trans))) {245LOG_WARN("failed to lock recover table", K(ret));246} else if (OB_FAIL(helper.get_recover_table_job_by_initiator(trans, job, target_job))) {247if (OB_ENTRY_NOT_EXIST == ret) {248if (OB_FAIL(helper.get_recover_table_job_history_by_initiator(trans, job, target_job))) {249if (OB_ENTRY_NOT_EXIST == ret) {250if (OB_FAIL(insert_user_job_(job, trans, helper))) {251LOG_WARN("failed to insert user job", K(ret), K(job));252} else {253ROOTSERVICE_EVENT_ADD("recover_table", "insert_user_job",254"tenant_id", job.get_tenant_id(),255"job_id", job.get_job_id());256}257} else {258LOG_WARN("failed to get target tenant recover table job history", K(ret), K(job));259}260}261} else {262LOG_WARN("failed to get target tenant recover table job", K(ret), K(job));263}264}265
266if (trans.is_started()) {267int tmp_ret = OB_SUCCESS;268if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {269ret = OB_SUCC(ret) ? tmp_ret : ret;270LOG_WARN("failed to end trans", K(ret));271}272}273
274#ifdef ERRSIM275ret = OB_E(EventTable::EN_INSERT_USER_RECOVER_JOB_FAILED) OB_SUCCESS;276if (OB_FAIL(ret)) {277ROOTSERVICE_EVENT_ADD("recover_table_errsim", "insert_user_job_failed");278}279#endif280int tmp_ret = OB_SUCCESS;281if (OB_SUCCESS != (tmp_ret = try_advance_status_(job, ret))) {282LOG_INFO("failed to advance status", K(tmp_ret), K(ret), K(job));283}284return ret;285}
286
287int ObRecoverTableJobScheduler::lock_recover_table_(288const uint64_t tenant_id, ObMySQLTransaction &trans)289{
290int ret = OB_SUCCESS;291ObLockTableRequest recover_job_arg;292recover_job_arg.table_id_ = OB_ALL_RECOVER_TABLE_JOB_TID;293recover_job_arg.lock_mode_ = EXCLUSIVE;294recover_job_arg.timeout_us_ = 0; // try lock295recover_job_arg.op_type_ = IN_TRANS_COMMON_LOCK; // unlock when trans end296observer::ObInnerSQLConnection *conn = nullptr;297if (OB_ISNULL(conn = dynamic_cast<observer::ObInnerSQLConnection *>(trans.get_connection()))) {298ret = OB_ERR_UNEXPECTED;299LOG_WARN("conn is NULL", KR(ret));300} else if (OB_FAIL(transaction::tablelock::ObInnerConnectionLockUtil::lock_table(tenant_id, recover_job_arg, conn))) {301LOG_WARN("failed to lock table", K(ret));302}303return ret;304}
305
306int ObRecoverTableJobScheduler::insert_user_job_(307const share::ObRecoverTableJob &job,308ObMySQLTransaction &trans,309share::ObRecoverTablePersistHelper &helper)310{
311int ret = OB_SUCCESS;312ObRecoverTableJob target_job;313
314if (OB_FAIL(target_job.assign(job))) {315LOG_WARN("failed to assign target job", K(ret));316} else {317target_job.set_tenant_id(job.get_target_tenant_id());318target_job.set_initiator_tenant_id(job.get_tenant_id());319target_job.set_initiator_job_id(job.get_job_id());320target_job.set_target_tenant_id(target_job.get_tenant_id());321}322int64_t job_id = 0;323if (FAILEDx(ObLSBackupInfoOperator::get_next_job_id(trans, job.get_target_tenant_id(), job_id))) {324LOG_WARN("failed to get next job id", K(ret), "tenant_id", job.get_target_tenant_id());325} else if (OB_FALSE_IT(target_job.set_job_id(job_id))) {326} else if (OB_FAIL(helper.insert_recover_table_job(trans, target_job))) {327LOG_WARN("failed to insert initial recover table job", K(ret));328}329
330return ret;331}
332
333int ObRecoverTableJobScheduler::recovering_(share::ObRecoverTableJob &job)334{
335int ret = OB_SUCCESS;336share::ObRecoverTablePersistHelper helper;337ObRecoverTableJob target_job;338bool user_job_finish = true;339bool user_tenant_not_exist = false;340int tmp_ret = OB_SUCCESS;341DEBUG_SYNC(BEFORE_RECOVER_UESR_RECOVER_TABLE_JOB);342if (OB_FAIL(helper.init(job.get_target_tenant_id()))) {343LOG_WARN("failed to init recover table persist helper", K(ret));344} else if (OB_FAIL(helper.get_recover_table_job_history_by_initiator(*sql_proxy_, job, target_job))) {345if (OB_ENTRY_NOT_EXIST == ret) {346user_job_finish = false;347ret = OB_SUCCESS;348} else {349LOG_WARN("failed to get target tenant recover table job history", K(ret), K(job));350}351} else {352ROOTSERVICE_EVENT_ADD("recover_table", "sys_wait_user_recover_finish",353"tenant_id", job.get_tenant_id(),354"job_id", job.get_job_id());355job.set_result(target_job.get_result());356}357if (OB_FAIL(ret)) {358schema::ObSchemaGetterGuard guard;359if (OB_TMP_FAIL(ObImportTableUtil::get_tenant_schema_guard(*schema_service_, job.get_target_tenant_id(), guard))) {360if (OB_TENANT_NOT_EXIST == tmp_ret) {361user_tenant_not_exist = true;362}363LOG_WARN("failed to get tenant schema guard", K(tmp_ret));364}365}366
367if ((OB_FAIL(ret) && user_tenant_not_exist) || (OB_SUCC(ret) && user_job_finish)) {368if (OB_SUCC(ret) && !job.get_result().is_succeed()) {369ret = OB_LS_RESTORE_FAILED;370}371job.set_end_ts(ObTimeUtility::current_time());372if (OB_SUCCESS != (tmp_ret = try_advance_status_(job, ret))) {373LOG_INFO("failed to advance status", K(tmp_ret), K(ret), K(job));374}375}376return ret;377}
378
379int ObRecoverTableJobScheduler::sys_finish_(const share::ObRecoverTableJob &job)380{
381int ret = OB_SUCCESS;382ObMySQLTransaction trans;383bool drop_aux_tenant = GCONF._auto_drop_recovering_auxiliary_tenant;384if (drop_aux_tenant && OB_FAIL(drop_aux_tenant_(job))) {385LOG_WARN("failed ot drop aux tenant", K(ret));386} else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {387LOG_WARN("failed to start trans", K(ret));388} else {389if (OB_FAIL(helper_.insert_recover_table_job_history(trans, job))) {390LOG_WARN("failed to insert recover table job history", K(ret), K(job));391} else if (OB_FAIL(helper_.delete_recover_table_job(trans, job))) {392LOG_WARN("failed to delete recover table job", K(ret), K(job));393}394
395if (OB_SUCC(ret)) {396if (OB_FAIL(trans.end(true))) {397LOG_WARN("failed to commit", K(ret));398} else {399ROOTSERVICE_EVENT_ADD("recover_table", "sys_recover_finish",400"tenant_id", job.get_tenant_id(),401"job_id", job.get_job_id());402}403} else {404int tmp_ret = OB_SUCCESS;405if (OB_SUCCESS != (tmp_ret = trans.end(false))) {406LOG_WARN("failed to roll back", K(tmp_ret), K(ret));407}408}409}410return ret;411}
412
413int ObRecoverTableJobScheduler::drop_aux_tenant_(const share::ObRecoverTableJob &job)414{
415int ret = OB_SUCCESS;416obrpc::ObDropTenantArg drop_tenant_arg;417drop_tenant_arg.exec_tenant_id_ = OB_SYS_TENANT_ID;418drop_tenant_arg.if_exist_ = false;419drop_tenant_arg.force_drop_ = true;420drop_tenant_arg.delay_to_drop_ = false;421drop_tenant_arg.open_recyclebin_ = false;422drop_tenant_arg.tenant_name_ = job.get_aux_tenant_name();423drop_tenant_arg.drop_only_in_restore_ = false;424common::ObAddr rs_addr;425if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) {426ret = OB_ERR_UNEXPECTED;427LOG_WARN("rootserver rpc proxy or rs mgr must not be NULL", K(ret), K(GCTX));428} else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) {429LOG_WARN("failed to get rootservice address", K(ret));430} else if (OB_FAIL(GCTX.rs_rpc_proxy_->to(rs_addr).drop_tenant(drop_tenant_arg))) {431if (OB_TENANT_NOT_EXIST == ret) {432ret = OB_SUCCESS;433} else {434LOG_WARN("failed to drop tenant", K(ret), K(drop_tenant_arg));435}436} else {437LOG_INFO("[RECOVER_TABLE]drop aux tenant succeed", K(job));438ROOTSERVICE_EVENT_ADD("recover_table", "drop_aux_tenant",439"tenant_id", job.get_tenant_id(),440"job_id", job.get_job_id(),441"aux_tenant_name", job.get_aux_tenant_name());442}443return ret;444}
445
446void ObRecoverTableJobScheduler::user_process_(share::ObRecoverTableJob &job)447{
448int ret = OB_SUCCESS;449LOG_INFO("ready to schedule user recover table job", K(job));450switch(job.get_status()) {451case ObRecoverTableStatus::Status::PREPARE: {452if (OB_FAIL(user_prepare_(job))) {453LOG_WARN("failed to do user prepare work", K(ret), K(job));454}455break;456}457case ObRecoverTableStatus::Status::RESTORE_AUX_TENANT: {458if (OB_FAIL(restore_aux_tenant_(job))) {459LOG_WARN("failed to do user restore aux tenant work", K(ret), K(job));460}461break;462}463case ObRecoverTableStatus::Status::ACTIVE_AUX_TENANT: {464if (OB_FAIL(active_aux_tenant_(job))) {465LOG_WARN("failed to do user active aux tenant work", K(ret), K(job));466}467break;468}469case ObRecoverTableStatus::Status::GEN_IMPORT_JOB: {470if (OB_FAIL(gen_import_job_(job))) {471LOG_WARN("failed to do user import work", K(ret), K(job));472}473break;474}475case ObRecoverTableStatus::Status::CANCELING: {476if (OB_FAIL(canceling_(job))) {477LOG_WARN("failed to do user canceling", K(ret), K(job));478}479break;480}481case ObRecoverTableStatus::Status::IMPORTING: {482if (OB_FAIL(importing_(job))) {483LOG_WARN("failed to do user importing work", K(ret), K(job));484}485break;486}487case ObRecoverTableStatus::Status::COMPLETED:488case ObRecoverTableStatus::Status::FAILED: {489if (OB_FAIL(user_finish_(job))) {490LOG_WARN("failed to do user finish work", K(ret), K(job));491}492break;493}494default: {495ret = OB_ERR_SYS;496LOG_WARN("invalid sys recover job status", K(ret), K(job));497break;498}499}500}
501
502int ObRecoverTableJobScheduler::canceling_(share::ObRecoverTableJob &job)503{
504int ret = OB_SUCCESS;505share::ObImportTableJobPersistHelper helper;506share::ObImportTableJob import_job;507bool cancel_import_job_finish = false;508if (OB_FAIL(helper.init(job.get_tenant_id()))) {509LOG_WARN("failed to init helper", K(ret));510} else if (OB_FAIL(helper.get_import_table_job_by_initiator(511*sql_proxy_, job.get_tenant_id(), job.get_job_id(), import_job))) {512if (OB_ENTRY_NOT_EXIST == ret) {513cancel_import_job_finish = true;514ret = OB_SUCCESS;515} else {516LOG_WARN("failed to get import table job by initiator", K(ret));517}518}519
520if (OB_SUCC(ret) && cancel_import_job_finish) {521share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());522job.get_result().set_result(OB_CANCELED, trace_id, GCONF.self_addr_);523if (OB_FAIL(try_advance_status_(job, OB_CANCELED))) {524LOG_WARN("failed to advance status", K(ret));525} else {526LOG_INFO("[RECOVER_TABLE]cancel recover table job finish", K(job), K(import_job));527ROOTSERVICE_EVENT_ADD("recover_table", "cancel recover job finish",528"tenant_id", job.get_tenant_id(),529"recover_job_id", job.get_job_id());530}531}532return ret;533}
534
535int ObRecoverTableJobScheduler::user_prepare_(share::ObRecoverTableJob &job)536{
537int ret = OB_SUCCESS;538if (OB_FAIL(try_advance_status_(job, ret))) {539LOG_WARN("failed to advance status", K(ret));540}541return ret;542}
543
544int ObRecoverTableJobScheduler::restore_aux_tenant_(share::ObRecoverTableJob &job)545{
546int ret = OB_SUCCESS;547ObRestorePersistHelper restore_helper;548ObHisRestoreJobPersistInfo restore_history_info;549bool aux_tenant_restore_finish = true;550int tmp_ret = OB_SUCCESS;551DEBUG_SYNC(BEFORE_RESTORE_AUX_TENANT);552if (OB_FAIL(restore_helper.init(OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) {553LOG_WARN("failed to init retore helper", K(ret));554} else if (OB_FAIL(restore_helper.get_restore_job_history(555*sql_proxy_, job.get_initiator_job_id(), job.get_initiator_tenant_id(), restore_history_info))) {556if (OB_ENTRY_NOT_EXIST == ret) {557aux_tenant_restore_finish = false;558ret = OB_SUCCESS;559if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {560LOG_INFO("[RECOVER_TABLE]aux tenant restore not finish, wait later", K(job));561}562} else {563LOG_WARN("failed to get restore job history", K(ret),564"initiator_job_id", job.get_job_id(), "initiator_tenant_id", job.get_tenant_id());565}566} else {567LOG_INFO("[RECOVER_TABLE]aux tenant restore finish", K(restore_history_info), K(job));568ROOTSERVICE_EVENT_ADD("recover_table", "restore_aux_tenant_finish",569"tenant_id", job.get_tenant_id(),570"job_id", job.get_job_id(),571"aux_tenant_name", job.get_aux_tenant_name());572const uint64_t aux_tenant_id = restore_history_info.restore_tenant_id_;573schema::ObSchemaGetterGuard guard;574schema::ObTenantStatus status;575if (!restore_history_info.is_restore_success()) {576ret = OB_LS_RESTORE_FAILED; // TODO(zeyong) adjust error code to restore tenant failed later.577LOG_WARN("[RECOVER_TABLE]restore aux tenant failed", K(ret), K(restore_history_info), K(job));578job.get_result().set_result(false, restore_history_info.comment_);579} else if (OB_FAIL(check_aux_tenant_(job, aux_tenant_id))) {580LOG_WARN("failed to check aux tenant", K(ret), K(aux_tenant_id));581}582
583int tmp_ret = OB_SUCCESS;584if (OB_SUCCESS != (tmp_ret = try_advance_status_(job, ret))) {585LOG_WARN("failed to advance status", K(tmp_ret), K(ret));586}587}588return ret;589}
590
591int ObRecoverTableJobScheduler::active_aux_tenant_(share::ObRecoverTableJob &job)592{
593int ret = OB_SUCCESS;594int tmp_ret = OB_SUCCESS;595ObRestorePersistHelper restore_helper;596ObHisRestoreJobPersistInfo restore_history_info;597if (OB_FAIL(restore_helper.init(OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) {598LOG_WARN("failed to init retore helper", K(ret));599} else if (OB_FAIL(restore_helper.get_restore_job_history(600*sql_proxy_, job.get_initiator_job_id(), job.get_initiator_tenant_id(), restore_history_info))) {601LOG_WARN("failed to get restore job history", K(ret),602"initiator_job_id", job.get_job_id(), "initiator_tenant_id", job.get_tenant_id());603} else if (OB_FAIL(ban_multi_version_recycling_(job, restore_history_info.restore_tenant_id_))) {604LOG_WARN("failed to ban multi version cecycling", K(ret));605} else if (OB_FAIL(failover_to_primary_(job, restore_history_info.restore_tenant_id_))) {606LOG_WARN("failed to failover to primary", K(ret), K(restore_history_info));607}608if (OB_FAIL(ret)) {609int tmp_ret = OB_SUCCESS;610schema::ObSchemaGetterGuard guard;611if (OB_TMP_FAIL(ObImportTableUtil::get_tenant_schema_guard(*schema_service_,612restore_history_info.restore_tenant_id_,613guard))) {614if (OB_TENANT_NOT_EXIST == tmp_ret) {615ret = tmp_ret;616}617LOG_WARN("failed to get tenant schema guard", K(tmp_ret));618}619}620
621if (OB_SUCC(ret) || OB_TENANT_NOT_EXIST == ret) {622if (OB_SUCCESS != (tmp_ret = try_advance_status_(job, ret))) {623LOG_WARN("failed to advance status", K(tmp_ret), K(ret));624}625}626return ret;627}
628
629int ObRecoverTableJobScheduler::ban_multi_version_recycling_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id)630{
631int ret = OB_SUCCESS;632const int64_t tenant_id = aux_tenant_id;633const int64_t MAX_UNDO_RETENTION = 31536000; // 1 year634int64_t affected_row = 0;635ObSqlString sql;636MTL_SWITCH(OB_SYS_TENANT_ID) {637if (OB_FAIL(sql.assign_fmt("alter system set undo_retention = %ld", MAX_UNDO_RETENTION))) {638LOG_WARN("failed to assign fmt", K(ret));639} else if (OB_FAIL(sql_proxy_->write(tenant_id, sql.ptr(), affected_row))) {640LOG_WARN("failed to set undo retention", K(ret));641}642}643return ret;644}
645
646int ObRecoverTableJobScheduler::failover_to_primary_(647share::ObRecoverTableJob &job, const uint64_t aux_tenant_id)648{
649int ret = OB_SUCCESS;650common::ObAddr leader;651obrpc::ObSwitchTenantArg switch_tenant_arg;652MTL_SWITCH(OB_SYS_TENANT_ID) {653if (OB_FAIL(switch_tenant_arg.init(aux_tenant_id, obrpc::ObSwitchTenantArg::OpType::FAILOVER_TO_PRIMARY, "", false))) {654LOG_WARN("failed to init switch tenant arg", K(ret), K(aux_tenant_id));655} else if (OB_FAIL(OB_PRIMARY_STANDBY_SERVICE.switch_tenant(switch_tenant_arg))) {656LOG_WARN("failed to switch_tenant", KR(ret), K(switch_tenant_arg));657} else {658LOG_INFO("[RECOVER_TABLE]succeed to switch aux tenant role to primary", K(aux_tenant_id), K(job));659}660}661return ret;662}
663
664int ObRecoverTableJobScheduler::check_aux_tenant_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id)665{
666int ret = OB_SUCCESS;667int tmp_ret = OB_SUCCESS;668schema::ObTenantStatus status;669schema::ObSchemaGetterGuard aux_tenant_guard;670schema::ObSchemaGetterGuard recover_tenant_guard;671bool is_compatible = true;672if (OB_FAIL(schema_service_->get_tenant_schema_guard(aux_tenant_id, aux_tenant_guard))) {673if (OB_TENANT_NOT_EXIST == ret) {674ObImportResult::Comment comment;675if (OB_TMP_FAIL(databuff_printf(comment.ptr(), comment.capacity(),676"aux tenant %.*s has been dropped", job.get_aux_tenant_name().length(), job.get_aux_tenant_name().ptr()))) {677LOG_WARN("failed to databuff printf", K(ret));678} else {679job.get_result().set_result(false, comment);680}681}682LOG_WARN("failed to get tenant schema guard", K(ret), K(aux_tenant_id));683} else if (OB_FAIL(aux_tenant_guard.get_tenant_status(aux_tenant_id, status))) {684LOG_WARN("failed to get tenant status", K(ret), K(aux_tenant_id));685} else if (schema::ObTenantStatus::TENANT_STATUS_NORMAL != status) {686ret = OB_STATE_NOT_MATCH;687LOG_WARN("aux tenant status is not normal", K(ret), K(aux_tenant_id), K(status));688} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(job.get_tenant_id(), recover_tenant_guard))) {689LOG_WARN("failed to get tenant schema guard", K(ret), "tenant_id", job.get_tenant_id());690} else if (OB_FAIL(check_tenant_compatibility(aux_tenant_guard, recover_tenant_guard, is_compatible))) {691LOG_WARN("failed to get check tenant compatibility", K(ret));692} else if (!is_compatible) {693ret = OB_NOT_SUPPORTED;694LOG_WARN("recover from different compatibility tenant is not supported", K(ret));695if (OB_TMP_FAIL(job.get_result().set_result(false, "recover from different compatibility tenant is not supported"))) {696LOG_WARN("failed to set result", K(ret), K(tmp_ret));697}698} else if (OB_FAIL(check_case_sensitive_compatibility(aux_tenant_guard, recover_tenant_guard, is_compatible))) {699LOG_WARN("failed to check case sensitive compatibility", K(ret));700} else if (!is_compatible) {701ret = OB_NOT_SUPPORTED;702LOG_WARN("recover from different case sensitive compatibility tenant is not supported", K(ret));703if (OB_TMP_FAIL(job.get_result().set_result(false, "recover from different case sensitive compatibility tenant is not supported"))) {704LOG_WARN("failed to set result", K(ret), K(tmp_ret));705}706}707return ret;708}
709
710int ObRecoverTableJobScheduler::check_tenant_compatibility(711share::schema::ObSchemaGetterGuard &aux_tenant_guard,712share::schema::ObSchemaGetterGuard &recover_tenant_guard,713bool &is_compatible)714{
715int ret = OB_SUCCESS;716lib::Worker::CompatMode aux_compat_mode;717lib::Worker::CompatMode recover_compat_mode;718is_compatible = false;719const uint64_t aux_tenant_id = aux_tenant_guard.get_tenant_id();720const uint64_t recover_tenant_id = recover_tenant_guard.get_tenant_id();721if (OB_FAIL(aux_tenant_guard.get_tenant_compat_mode(aux_tenant_id, aux_compat_mode))) {722LOG_WARN("failed to get tenant compat mode", K(ret), K(aux_tenant_id));723} else if (OB_FAIL(recover_tenant_guard.get_tenant_compat_mode(recover_tenant_id, recover_compat_mode))) {724LOG_WARN("failed to get tenant compat mode", K(ret), K(recover_tenant_id));725} else {726is_compatible = aux_compat_mode == recover_compat_mode;727if (!is_compatible) {728LOG_WARN("[RECOVER_TABLE]tenant compat mode is different", K(is_compatible),729K(aux_tenant_id),730K(aux_compat_mode),731K(recover_tenant_id),732K(recover_compat_mode));733}734}735return ret;736}
737
738int ObRecoverTableJobScheduler::check_case_sensitive_compatibility(739share::schema::ObSchemaGetterGuard &aux_tenant_guard,740share::schema::ObSchemaGetterGuard &recover_tenant_guard,741bool &is_compatible)742{
743int ret = OB_SUCCESS;744common::ObNameCaseMode aux_mode;745common::ObNameCaseMode recover_mode;746is_compatible = false;747const uint64_t aux_tenant_id = aux_tenant_guard.get_tenant_id();748const uint64_t recover_tenant_id = recover_tenant_guard.get_tenant_id();749if (OB_FAIL(aux_tenant_guard.get_tenant_name_case_mode(aux_tenant_id, aux_mode))) {750LOG_WARN("failed to get tenant name case mode", K(ret), K(aux_tenant_id));751} else if (OB_FAIL(recover_tenant_guard.get_tenant_name_case_mode(recover_tenant_id, recover_mode))) {752LOG_WARN("failed to get tenant name case mode", K(ret), K(recover_tenant_id));753} else {754is_compatible = aux_mode == recover_mode;755if (!is_compatible) {756LOG_WARN("[RECOVER_TABLE]tenant name case mode is different", K(is_compatible),757K(aux_tenant_id),758K(aux_mode),759K(recover_tenant_id),760K(recover_mode));761}762}763return ret;764}
765
766int ObRecoverTableJobScheduler::gen_import_job_(share::ObRecoverTableJob &job)767{
768int ret = OB_SUCCESS;769LOG_INFO("[RECOVER_TABLE]generate import table job", K(job));770share::ObImportTableJobPersistHelper import_helper;771share::ObImportTableJob import_job;772import_job.set_tenant_id(job.get_tenant_id());773import_job.set_job_id(job.get_job_id());774import_job.set_initiator_job_id(job.get_job_id());775import_job.set_initiator_tenant_id(job.get_tenant_id());776import_job.set_start_ts(ObTimeUtility::current_time());777import_job.set_status(ObImportTableJobStatus::INIT);778int tmp_ret = OB_SUCCESS;779schema::ObSchemaGetterGuard guard;780uint64_t tenant_id = OB_INVALID_TENANT_ID;781ObMySQLTransaction trans;782int64_t job_id = 0;783if (OB_FAIL(import_job.set_src_tenant_name(job.get_aux_tenant_name()))) {784LOG_WARN("failed to set src tenant name", K(ret));785} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {786LOG_WARN("failed to get schema guard", K(ret));787} else if (OB_FAIL(guard.get_tenant_id(job.get_aux_tenant_name(), tenant_id))) {788if (OB_ERR_INVALID_TENANT_NAME == ret) {789ObImportResult::Comment comment;790if (OB_TMP_FAIL(databuff_printf(comment.ptr(), comment.capacity(),791"aux tenant %.*s has been dropped", job.get_aux_tenant_name().length(), job.get_aux_tenant_name().ptr()))) {792LOG_WARN("failed to databuff printf", K(ret));793} else {794job.get_result().set_result(false, comment);795}796}797LOG_WARN("failed to get tenant id", K(ret), "aux_tenant_name", job.get_aux_tenant_name());798} else if (OB_FALSE_IT(import_job.set_src_tenant_id(tenant_id))) {799} else if (OB_FAIL(import_job.get_import_arg().assign(job.get_import_arg()))) {800LOG_WARN("failed to assign import arg", K(ret));801} else if (OB_FAIL(import_helper.init(import_job.get_tenant_id()))) {802LOG_WARN("failed to init import job", K(ret));803} else if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(job.get_tenant_id())))) {804LOG_WARN("failed to start trans", K(ret));805} else if (OB_FAIL(ObLSBackupInfoOperator::get_next_job_id(trans, job.get_tenant_id(), job_id))) {806LOG_WARN("failed to get next job id", K(ret));807} else if (OB_FALSE_IT(import_job.set_job_id(job_id))) {808} else if (OB_FAIL(import_helper.insert_import_table_job(trans, import_job))) {809LOG_WARN("failed to insert into improt table job", K(ret));810}811
812if (trans.is_started()) {813if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {814ret = OB_SUCC(ret) ? tmp_ret : ret;815LOG_WARN("failed to commit trans", K(ret), K(tmp_ret));816}817if (OB_SUCC(ret)) {818LOG_INFO("[RECOVER_TABLE]succeed generate import job", K(job), K(import_job));819ROOTSERVICE_EVENT_ADD("recover_table", "generate_import_job",820"tenant_id", job.get_tenant_id(),821"job_id", job.get_job_id(),822"import_job_id", import_job.get_job_id());823}824}825
826if (OB_TMP_FAIL(try_advance_status_(job, ret))) {827LOG_WARN("failed to advance status", K(tmp_ret), K(ret));828}829return ret;830}
831
832int ObRecoverTableJobScheduler::importing_(share::ObRecoverTableJob &job)833{
834int ret = OB_SUCCESS;835share::ObImportTableJobPersistHelper helper;836share::ObImportTableJob import_job;837if (OB_FAIL(helper.init(job.get_tenant_id()))) {838LOG_WARN("failed to init helper", K(ret));839} else if (OB_FAIL(helper.get_import_table_job_history_by_initiator(840*sql_proxy_, job.get_tenant_id(), job.get_job_id(), import_job))) {841if (OB_ENTRY_NOT_EXIST == ret) {842if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {843LOG_INFO("[RECOVER_TABLE]import table is not finish, wait later", K(job));844}845ret = OB_SUCCESS;846} else {847LOG_WARN("failed to get recover table job histroy by initiator", K(ret));848}849} else if (OB_FALSE_IT(job.set_end_ts(ObTimeUtility::current_time()))) {850} else if (OB_FALSE_IT(job.set_result(import_job.get_result()))) {851} else if (!job.get_result().is_succeed() && OB_FALSE_IT(ret = OB_LS_RESTORE_FAILED)) {852} else if (OB_FAIL(try_advance_status_(job, ret))) {853LOG_WARN("failed to advance status", K(ret));854} else {855LOG_INFO("[RECOVER_TABLE]import table job finish", K(job), K(import_job));856ROOTSERVICE_EVENT_ADD("recover_table", "import job finish",857"tenant_id", job.get_tenant_id(),858"job_id", job.get_job_id(),859"import_job_id", import_job.get_job_id());860}861return ret;862}
863
864int ObRecoverTableJobScheduler::user_finish_(const share::ObRecoverTableJob &job)865{
866int ret = OB_SUCCESS;867ObMySQLTransaction trans;868const uint64_t tenant_id = job.get_tenant_id();869const int64_t job_id = job.get_job_id();870if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id)))) {871LOG_WARN("failed to start trans", K(ret));872} else if (OB_FAIL(helper_.insert_recover_table_job_history(trans, job))) {873LOG_WARN("failed to insert recover table job history", K(ret), K(job));874} else if (OB_FAIL(helper_.delete_recover_table_job(trans, job))) {875LOG_WARN("failed to delete recover table job", K(ret), K(job));876}877
878if (trans.is_started()) {879int tmp_ret = OB_SUCCESS;880if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {881ret = OB_SUCC(ret) ? tmp_ret : ret;882LOG_WARN("end trans failed", K(ret), K(tmp_ret));883}884if (OB_SUCC(ret)) {885LOG_INFO("[RECOVER_TABLE] recover table finish", K(job));886ROOTSERVICE_EVENT_ADD("recover_table", "recover table job finish",887"tenant_id", job.get_tenant_id(),888"job_id", job.get_job_id());889}890}891return ret;892}
893