oceanbase
591 строка · 26.3 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS_RESTORE14
15#include "ob_tenant_clone_util.h"16#include "share/tenant_snapshot/ob_tenant_snapshot_table_operator.h"17#include "share/restore/ob_tenant_clone_table_operator.h"18#include "share/location_cache/ob_location_service.h"19#include "rootserver/tenant_snapshot/ob_tenant_snapshot_util.h"20
21using namespace oceanbase::rootserver;22using namespace oceanbase::share;23
24//a source tenant only has one clone_job at the same time
25int ObTenantCloneUtil::check_source_tenant_has_clone_job(26common::ObISQLClient &sql_client,27const uint64_t source_tenant_id,28bool &has_job)29{
30int ret = OB_SUCCESS;31has_job = false;32ObTenantCloneTableOperator clone_op;33ObCloneJob clone_job;34
35if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {36LOG_WARN("fail to init clone op", KR(ret));37} else if (OB_FAIL(clone_op.get_clone_job_by_source_tenant_id(38source_tenant_id, clone_job))) {39if (OB_ENTRY_NOT_EXIST == ret) {40ret = OB_SUCCESS;41} else {42LOG_WARN("fail to get job", KR(ret), K(source_tenant_id));43}44} else {45has_job = true;46}47
48return ret;49}
50
51int ObTenantCloneUtil::check_clone_tenant_exist(common::ObISQLClient &sql_client,52const ObString &clone_tenant_name,53bool &is_exist)54{
55int ret = OB_SUCCESS;56is_exist = false;57ObTenantCloneTableOperator clone_op;58ObCloneJob clone_job;59
60if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {61LOG_WARN("fail to init clone op", KR(ret));62} else if (OB_FAIL(clone_op.get_clone_job_by_clone_tenant_name(63clone_tenant_name, false/*need lock*/, clone_job))) {64if (OB_ENTRY_NOT_EXIST == ret) {65ret = OB_SUCCESS;66} else {67LOG_WARN("fail to get job", KR(ret), K(clone_tenant_name));68}69} else {70is_exist = true;71LOG_INFO("clone job exist", KR(ret), K(clone_tenant_name));72}73
74return ret;75}
76
77int ObTenantCloneUtil::fill_clone_job(const int64_t job_id,78const obrpc::ObCloneTenantArg &arg,79const uint64_t source_tenant_id,80const ObString &source_tenant_name,81const ObTenantSnapItem &snapshot_item,82ObCloneJob &clone_job)83{
84int ret = OB_SUCCESS;85clone_job.reset();86ObTenantCloneJobType job_type = ObTenantCloneJobType::CLONE_JOB_MAX_TYPE;87common::ObCurTraceId::TraceId trace_id;88
89if (OB_UNLIKELY(job_id < 090|| !arg.is_valid()91|| !is_user_tenant(source_tenant_id)92|| source_tenant_name.empty())) {93ret = OB_INVALID_ARGUMENT;94LOG_WARN("invalid argument", KR(ret), K(job_id), K(arg), K(source_tenant_id),95K(source_tenant_name), K(snapshot_item));96} else if (FALSE_IT(job_type = snapshot_item.is_valid() ?97ObTenantCloneJobType::RESTORE :98ObTenantCloneJobType::FORK)) {99} else {100ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id();101if (nullptr != cur_trace_id) {102trace_id = *cur_trace_id;103} else {104trace_id.init(GCONF.self_addr_);105}106const ObCloneJob::ObCloneJobInitArg init_arg = {107.trace_id_ = trace_id,108.tenant_id_ = OB_SYS_TENANT_ID,109.job_id_ = job_id,110.source_tenant_id_ = source_tenant_id,111.source_tenant_name_ = source_tenant_name,112.clone_tenant_id_ = OB_INVALID_TENANT_ID,113.clone_tenant_name_ = arg.get_new_tenant_name(),114.tenant_snapshot_id_ = snapshot_item.get_tenant_snapshot_id(),115.tenant_snapshot_name_ = snapshot_item.get_snapshot_name(),116.resource_pool_id_ = OB_INVALID_ID,117.resource_pool_name_ = arg.get_resource_pool_name(),118.unit_config_name_ = arg.get_unit_config_name(),119.restore_scn_ = snapshot_item.get_snapshot_scn(),120.status_ = ObTenantCloneStatus(ObTenantCloneStatus::Status::CLONE_SYS_LOCK),121.job_type_ = job_type,122.ret_code_ = OB_SUCCESS,123};124if (OB_FAIL(clone_job.init(init_arg))) {125LOG_WARN("fail to init clone job", KR(ret), K(init_arg));126}127}128
129return ret;130}
131
132int ObTenantCloneUtil::record_clone_job(common::ObISQLClient &sql_client,133const share::ObCloneJob &clone_job)134{
135int ret = OB_SUCCESS;136ObTenantCloneTableOperator clone_op;137uint64_t source_tenant_id = clone_job.get_source_tenant_id();138ObString clone_tenant_name = clone_job.get_clone_tenant_name();139bool has_job = false;140bool tenant_exist_in_clone_job = false;141
142if (OB_UNLIKELY(!clone_job.is_valid())) {143ret = OB_INVALID_ARGUMENT;144LOG_WARN("invalid argument", KR(ret), K(clone_job));145} else if (OB_FAIL(check_source_tenant_has_clone_job(sql_client, source_tenant_id, has_job))) {146LOG_WARN("fail to check source tenant has clone job", KR(ret), K(source_tenant_id));147} else if (has_job) {148ret = OB_OP_NOT_ALLOW;149LOG_WARN("source tenant already has clone job", KR(ret), K(source_tenant_id));150LOG_USER_ERROR(OB_OP_NOT_ALLOW, "source tenant has a running clone job, clone tenant now");151} else if (OB_FAIL(ObTenantCloneUtil::check_clone_tenant_exist(sql_client,152clone_tenant_name, tenant_exist_in_clone_job))) {153LOG_WARN("failed to check clone tenant exist in clone job", KR(ret), K(clone_tenant_name));154} else if (tenant_exist_in_clone_job) {155ret = OB_OP_NOT_ALLOW;156LOG_WARN("duplicate clone tenant name in clone job", KR(ret), K(clone_tenant_name));157LOG_USER_ERROR(OB_OP_NOT_ALLOW, "clone tenant name exists in a running clone job, clone tenant now");158} else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {159LOG_WARN("fail to init clone op", KR(ret));160} else if (OB_FAIL(clone_op.insert_clone_job(clone_job))) {161LOG_WARN("fail to insert clone job", KR(ret), K(clone_job));162}163
164return ret;165}
166
167int ObTenantCloneUtil::update_resource_pool_id_of_clone_job(common::ObISQLClient &sql_client,168const int64_t job_id,169const uint64_t resource_pool_id)170{
171int ret = OB_SUCCESS;172ObTenantCloneTableOperator clone_op;173
174if (OB_UNLIKELY(job_id < 0 || OB_INVALID_ID == resource_pool_id)) {175ret = OB_INVALID_ARGUMENT;176LOG_WARN("invalid argument", KR(ret), K(job_id), K(resource_pool_id));177} else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {178LOG_WARN("fail to init clone op", KR(ret));179} else if (OB_FAIL(clone_op.update_job_resource_pool_id(job_id, resource_pool_id))) {180LOG_WARN("fail to update clone job resource pool id", KR(ret), K(job_id), K(resource_pool_id));181}182
183return ret;184}
185
186int ObTenantCloneUtil::update_snapshot_info_for_fork_job(common::ObISQLClient &sql_client,187const int64_t job_id,188const ObTenantSnapshotID tenant_snapshot_id,189const ObString &tenant_snapshot_name)190{
191int ret = OB_SUCCESS;192ObTenantCloneTableOperator clone_op;193
194if (OB_UNLIKELY(job_id < 0 || !tenant_snapshot_id.is_valid() ||195tenant_snapshot_name.empty())) {196ret = OB_INVALID_ARGUMENT;197LOG_WARN("invalid argument", KR(ret), K(job_id), K(tenant_snapshot_id), K(tenant_snapshot_name));198} else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {199LOG_WARN("fail to init clone op", KR(ret));200} else if (OB_FAIL(clone_op.update_job_snapshot_info(job_id,201tenant_snapshot_id,202tenant_snapshot_name))) {203LOG_WARN("fail to update clone job id and name", KR(ret), K(job_id), K(tenant_snapshot_id), K(tenant_snapshot_name));204}205
206return ret;207}
208
209int ObTenantCloneUtil::update_restore_scn_for_fork_job(common::ObISQLClient &sql_client,210const int64_t job_id,211const SCN &restore_scn)212{
213int ret = OB_SUCCESS;214ObTenantCloneTableOperator clone_op;215
216if (OB_UNLIKELY(OB_INVALID_ID == job_id || !restore_scn.is_valid())) {217ret = OB_INVALID_ARGUMENT;218LOG_WARN("invalid argument", KR(ret), K(job_id), K(restore_scn));219} else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {220LOG_WARN("fail to init clone op", KR(ret));221} else if (OB_FAIL(clone_op.update_job_snapshot_scn(job_id,222restore_scn))) {223LOG_WARN("fail to update clone job snapshot scn", KR(ret), K(job_id), K(restore_scn));224}225
226return ret;227}
228
229int ObTenantCloneUtil::insert_user_tenant_clone_job(common::ObISQLClient &sql_client,230const ObString &clone_tenant_name,231const uint64_t user_tenant_id)232{
233int ret = OB_SUCCESS;234if (OB_UNLIKELY(clone_tenant_name.empty() || !is_user_tenant(user_tenant_id))) {235ret = OB_INVALID_ARGUMENT;236LOG_WARN("invalid argument", KR(ret), K(clone_tenant_name), K(user_tenant_id));237} else {238ObTenantCloneTableOperator clone_op;239ObCloneJob clone_job;240ObCloneJob user_clone_job;241if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {242LOG_WARN("fail init clone op", KR(ret));243} else if (OB_FAIL(clone_op.get_clone_job_by_clone_tenant_name(244clone_tenant_name, false/*need_lock*/, clone_job))) {245LOG_WARN("fail to get clone job", KR(ret), K(clone_tenant_name));246} else if (OB_FAIL(user_clone_job.assign(clone_job))) {247LOG_WARN("fail to assign clone job", KR(ret), K(clone_job));248} else {249user_clone_job.set_tenant_id(user_tenant_id);250user_clone_job.set_clone_tenant_id(user_tenant_id);251user_clone_job.set_status(ObTenantCloneStatus::Status::CLONE_USER_PREPARE);252ObTenantCloneTableOperator user_clone_op;253if (OB_FAIL(user_clone_op.init(user_tenant_id, &sql_client))) {254LOG_WARN("fail init clone op", KR(ret), K(user_tenant_id));255} else if (OB_FAIL(user_clone_op.insert_clone_job(user_clone_job))) {256LOG_WARN("fail to insert clone job", KR(ret), K(user_clone_job));257}258}259}260return ret;261}
262
263int ObTenantCloneUtil::recycle_clone_job(common::ObISQLClient &sql_client,264const ObCloneJob &job)265{
266int ret = OB_SUCCESS;267ObTenantCloneTableOperator clone_op;268ObMySQLTransaction trans;269const uint64_t tenant_id = job.get_tenant_id();270
271if (OB_UNLIKELY(!job.is_valid())) {272ret = OB_INVALID_ARGUMENT;273LOG_WARN("invalid argument", KR(ret), K(job));274} else if (OB_FAIL(trans.start(&sql_client, gen_meta_tenant_id(tenant_id)))) {275LOG_WARN("fail to start trans", KR(ret), K(gen_meta_tenant_id(tenant_id)));276} else if (OB_FAIL(clone_op.init(tenant_id, &trans))) {277LOG_WARN("fail to init", KR(ret), K(tenant_id));278} else if (OB_FAIL(clone_op.insert_clone_job_history(job))) {279LOG_WARN("fail to insert clone job history", KR(ret), K(job));280} else if (OB_FAIL(clone_op.remove_clone_job(job))) {281LOG_WARN("fail to remove clone job", KR(ret), K(job));282}283if (trans.is_started()) {284int tmp_ret = OB_SUCCESS;285if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {286LOG_WARN("trans end failed", "is_commit", OB_SUCCESS == ret, KR(tmp_ret), KR(ret));287ret = (OB_SUCC(ret)) ? tmp_ret : ret;288}289}290
291return ret;292}
293
294int ObTenantCloneUtil::notify_clone_scheduler(const uint64_t tenant_id)295{
296int ret = OB_SUCCESS;297common::ObAddr leader_addr;298obrpc::ObNotifyCloneSchedulerArg arg;299arg.set_tenant_id(tenant_id);300obrpc::ObNotifyCloneSchedulerResult res;301
302if (OB_UNLIKELY(!is_sys_tenant(tenant_id))) {303ret = OB_INVALID_ARGUMENT;304LOG_WARN("invalid argument", KR(ret), K(tenant_id));305} else if (OB_ISNULL(GCTX.srv_rpc_proxy_) || OB_ISNULL(GCTX.location_service_)) {306ret = OB_ERR_UNEXPECTED;307LOG_WARN("pointer is null", KR(ret), KP(GCTX.srv_rpc_proxy_), KP(GCTX.location_service_));308} else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout(309GCONF.cluster_id, tenant_id, ObLSID(ObLSID::SYS_LS_ID), leader_addr))) {310LOG_WARN("failed to get leader address", KR(ret), K(tenant_id));311} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr).by(tenant_id).notify_clone_scheduler(arg, res))) {312LOG_WARN("failed to notify clone scheduler", KR(ret), K(leader_addr), K(arg));313} else {314int res_ret = res.get_result();315if (OB_SUCCESS != res_ret) {316ret = res_ret;317LOG_WARN("the result of notify clone scheduler failed", KR(res_ret), K(leader_addr), K(arg));318}319}320return ret;321}
322
323int ObTenantCloneUtil::release_clone_tenant_resource_of_clone_job(const ObCloneJob &clone_job)324{
325int ret = OB_SUCCESS;326const int64_t timeout = GCONF._ob_ddl_timeout;327const uint64_t resource_pool_id = clone_job.get_resource_pool_id();328const uint64_t clone_tenant_id = clone_job.get_clone_tenant_id();329const ObString &clone_tenant_name = clone_job.get_clone_tenant_name();330
331if (!clone_job.get_status().is_sys_failed_status()) {332ret = OB_ERR_UNEXPECTED;333LOG_WARN("try to release resource of a processing or success job", KR(ret), K(clone_job));334} else if (OB_ISNULL(GCTX.rs_rpc_proxy_)) {335ret = OB_ERR_UNEXPECTED;336LOG_WARN("unexpected null", KR(ret));337} else if (OB_INVALID_ID == resource_pool_id) {338// clone tenant and resource pool have not been created339// it's no need to release resource340} else {341if (OB_INVALID_TENANT_ID != clone_tenant_id) {342obrpc::ObDropTenantArg arg;343ObArenaAllocator allocator;344arg.exec_tenant_id_ = clone_job.get_tenant_id();345arg.if_exist_ = false;346arg.delay_to_drop_ = false;347arg.force_drop_ = true;348arg.drop_only_in_restore_ = true;349arg.tenant_id_ = clone_tenant_id;350
351if (OB_FAIL(deep_copy_ob_string(allocator, clone_tenant_name, arg.tenant_name_))) {352LOG_WARN("fail to assign", KR(ret), K(clone_job));353} else if (OB_FAIL(GCTX.rs_rpc_proxy_->timeout(timeout).drop_tenant(arg))) {354if (ret == OB_TENANT_NOT_EXIST) {355ret = OB_SUCCESS;356} else {357LOG_WARN("fail to drop tenant", KR(ret), K(clone_job), K(arg));358}359}360LOG_INFO("recycle clone tenant", KR(ret), K(clone_job));361}362if (OB_SUCC(ret)) {363obrpc::ObDropResourcePoolArg arg;364arg.exec_tenant_id_ = clone_job.get_tenant_id();365arg.pool_id_ = resource_pool_id;366arg.if_exist_ = true;367if (OB_FAIL(GCTX.rs_rpc_proxy_->timeout(timeout).drop_resource_pool(arg))) {368LOG_WARN("drop_resource_pool failed", KR(ret), K(clone_job));369}370LOG_INFO("recycle clone resource pool", KR(ret), K(clone_job));371}372}373
374return ret;375}
376
377int ObTenantCloneUtil::release_source_tenant_resource_of_clone_job(common::ObISQLClient &sql_client,378const ObCloneJob &clone_job)379{
380int ret = OB_SUCCESS;381int tmp_ret = OB_SUCCESS;382ObMySQLTransaction trans;383ObTenantSnapshotTableOperator table_op;384const int64_t job_id = clone_job.get_job_id();385const uint64_t source_tenant_id = clone_job.get_source_tenant_id();386const ObTenantSnapshotID tenant_snapshot_id = clone_job.get_tenant_snapshot_id();387const ObTenantCloneJobType job_type = clone_job.get_job_type();388const ObTenantCloneStatus status = clone_job.get_status();389bool is_already_unlocked = false;390bool is_source_tenant_exist = true;391bool need_notify_tenant_snapshot_scheduler = false;392
393share::schema::ObSchemaGetterGuard schema_guard;394
395if (!clone_job.get_status().is_sys_release_resource_status()) {396ret = OB_ERR_UNEXPECTED;397LOG_WARN("try to release resource of a processing or success job", KR(ret), K(clone_job));398} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {399LOG_WARN("fail to get schema guard", KR(ret), K(source_tenant_id));400} else if (OB_FAIL(schema_guard.check_tenant_exist(source_tenant_id, is_source_tenant_exist))) {401LOG_WARN("get tenant ids failed", K(ret));402} else if (OB_UNLIKELY(!is_source_tenant_exist)) {403LOG_INFO("source tenant doesn't exist while release source tenant resource", KR(ret), K(source_tenant_id));404} else if (OB_FAIL(schema_guard.reset())) {405LOG_WARN("fail to reset schema guard", KR(ret));406} else if (OB_FAIL(trans.start(&sql_client, gen_meta_tenant_id(source_tenant_id)))) {407LOG_WARN("trans start failed", KR(ret), K(clone_job));408} else if (OB_FAIL(table_op.init(source_tenant_id, &trans))) {409LOG_WARN("failed to init table op", KR(ret), K(clone_job));410} else { // is_source_tenant_exist == true411// release global lock412ObTenantSnapItem global_lock;413if (OB_FAIL(table_op.get_tenant_snap_item(414ObTenantSnapshotID(ObTenantSnapshotTableOperator::GLOBAL_STATE_ID),415true /*for update*/,416global_lock))) {417if (OB_TENANT_SNAPSHOT_NOT_EXIST == ret) {418ret = OB_SUCCESS;419LOG_INFO("global lock has not been created", KR(ret), K(clone_job));420is_already_unlocked = true;421} else {422LOG_WARN("fail to get global_lock", KR(ret), K(clone_job));423}424} else if (ObTenantSnapStatus::CLONING != global_lock.get_status()) {425is_already_unlocked = true;426LOG_INFO("global lock has been released", KR(ret), K(clone_job));427} else if (OB_FAIL(ObTenantSnapshotUtil::unlock_tenant_snapshot_simulated_mutex_from_clone_release_task(428trans,429source_tenant_id,430job_id,431ObTenantSnapStatus::CLONING,432is_already_unlocked))) {433LOG_WARN("fail to unlock", KR(ret), K(clone_job), K(global_lock));434}435
436// release snapshot437// "is_already_unlocked == true" means the release trans of this clone job has been committed,438// or clone job is failed at the first status (CLONE_SYS_LOCK)439// thus, no need to handle with the snapshot440if (OB_SUCC(ret) && !is_already_unlocked) {441ObTenantSnapItem tenant_snapshot_item;442ObTenantSnapStatus next_snap_status = ObTenantCloneJobType::RESTORE == job_type ?443ObTenantSnapStatus::NORMAL :444ObTenantSnapStatus::DELETING;445if (!tenant_snapshot_id.is_valid()) {446if (ObTenantCloneJobType::FORK == job_type &&447!status.is_sys_valid_snapshot_status_for_fork()) {448LOG_INFO("fork tenant snapshot has not been created", K(clone_job));449} else {450ret = OB_ERR_UNEXPECTED;451LOG_WARN("tenant snapshot is invalid", KR(ret), K(clone_job));452}453} else if (OB_FAIL(ObTenantSnapshotUtil::get_tenant_snapshot_info(trans,454source_tenant_id,455tenant_snapshot_id,456tenant_snapshot_item))) {457if (OB_TENANT_SNAPSHOT_NOT_EXIST == ret && ObTenantCloneJobType::FORK == job_type) {458// fork clone job will generate tenant_snapshot_id at first, and then create snapshot.459// thus, it is possible that job has valid tenant_snapshot_id, but the snapshot doesn't exist460ret = OB_SUCCESS;461LOG_INFO("tenant snapshot has not been created", K(clone_job));462} else {463LOG_WARN("fail to get tenant snapshot", KR(ret), K(clone_job));464}465} else if (tenant_snapshot_item.get_status() == next_snap_status) {466LOG_INFO("tenant snapshot item already in next status",467K(clone_job), K(tenant_snapshot_item), K(next_snap_status));468} else if (OB_FAIL(table_op.update_tenant_snap_item(tenant_snapshot_id,469tenant_snapshot_item.get_status(),470next_snap_status))) {471LOG_WARN("failed to update snapshot status", KR(ret), K(clone_job));472} else if (ObTenantSnapStatus::DELETING == next_snap_status) {473ObTenantSnapJobItem job_item(source_tenant_id,474tenant_snapshot_id,475ObTenantSnapOperation::DELETE,476clone_job.get_trace_id());477if (OB_FAIL(table_op.insert_tenant_snap_job_item(job_item))) {478LOG_WARN("fail to insert tenant snapshot job", KR(ret), K(job_item));479} else if (OB_FAIL(ObTenantSnapshotUtil::recycle_tenant_snapshot_ls_replicas(trans, source_tenant_id,480clone_job.get_tenant_snapshot_name()))) {481LOG_WARN("fail to recycle tenant snapshot ls replicas", KR(ret), K(clone_job));482} else {483need_notify_tenant_snapshot_scheduler = true;484LOG_INFO("release source tenant resource", KR(ret), K(clone_job));485}486}487}488}489
490if (trans.is_started()) {491if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {492LOG_WARN("trans end failed", "is_commit", OB_SUCCESS == ret, K(tmp_ret));493ret = (OB_SUCC(ret)) ? tmp_ret : ret;494}495}496
497if (OB_SUCC(ret) && need_notify_tenant_snapshot_scheduler) {498if (OB_TMP_FAIL(ObTenantSnapshotUtil::notify_scheduler(source_tenant_id))) {499LOG_WARN("notify tenant snapshot scheduler failed", KR(tmp_ret), K(clone_job));500}501}502
503return ret;504}
505
506int ObTenantCloneUtil::get_clone_job_failed_message(common::ObISQLClient &sql_client,507const int64_t job_id,508const uint64_t tenant_id,509ObIAllocator &allocator,510ObString &err_msg)511{
512int ret = OB_SUCCESS;513ObTenantCloneTableOperator clone_op;514
515if (OB_UNLIKELY(job_id < 0 || !is_sys_tenant(tenant_id))) {516ret = OB_INVALID_ARGUMENT;517LOG_WARN("invalid argument", KR(ret), K(job_id), K(tenant_id));518} else if (OB_FAIL(clone_op.init(tenant_id, &sql_client))) {519LOG_WARN("fail to init", KR(ret), K(tenant_id));520} else if (OB_FAIL(clone_op.get_job_failed_message(job_id, allocator, err_msg))) {521LOG_WARN("fail to get clone job failed message", KR(ret), K(job_id), K(tenant_id));522}523return ret;524}
525
526//This function is called by the user executing "cancel clone" sql.
527int ObTenantCloneUtil::cancel_clone_job(common::ObISQLClient &sql_client,528const ObString &clone_tenant_name,529bool &clone_already_finish)530{
531int ret = OB_SUCCESS;532clone_already_finish = false;533ObTenantCloneTableOperator clone_op;534ObCloneJob clone_job;535ObMySQLTransaction trans;536ObSqlString err_msg;537const ObTenantCloneStatus next_status(ObTenantCloneStatus::Status::CLONE_SYS_CANCELING);538
539if (OB_UNLIKELY(clone_tenant_name.empty())) {540ret = OB_INVALID_ARGUMENT;541LOG_WARN("invalid argument", KR(ret), K(clone_tenant_name));542} else if (OB_FAIL(trans.start(&sql_client, OB_SYS_TENANT_ID))) {543LOG_WARN("failed to start trans", KR(ret));544} else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &trans))) {545LOG_WARN("fail init clone op", KR(ret));546} else if (OB_FAIL(clone_op.get_clone_job_by_clone_tenant_name(547clone_tenant_name, true/*need_lock*/, clone_job))) {548if (OB_ENTRY_NOT_EXIST != ret) {549LOG_WARN("fail to get clone job", KR(ret), K(clone_tenant_name));550} else {551ret = OB_SUCCESS;552clone_already_finish = true;553}554} else if (clone_job.get_status().is_user_status()) {555ret = OB_ERR_UNEXPECTED;556LOG_WARN("unexpected sys clone job status", KR(ret), K(clone_job));557} else if (ObTenantCloneStatus::Status::CLONE_SYS_RELEASE_RESOURCE == clone_job.get_status()558|| !clone_job.get_status().is_sys_processing_status()) {559clone_already_finish = true;560} else if (OB_FAIL(clone_op.update_job_status(clone_job.get_job_id(),561clone_job.get_status(), /*old_status*/562next_status))) {563LOG_WARN("fail to update job status", KR(ret), K(clone_tenant_name), K(clone_job));564} else if (OB_FAIL(err_msg.append_fmt("clone job has been canceled in %s status",565ObTenantCloneStatus::get_clone_status_str(clone_job.get_status())))) {566} else if (OB_FAIL(clone_op.update_job_failed_info(clone_job.get_job_id(), OB_CANCELED, err_msg.string()))) {567LOG_WARN("fail to update job failed info", KR(ret), K(clone_job));568}569
570if (trans.is_started()) {571int tmp_ret = OB_SUCCESS;572if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {573LOG_WARN("trans end failed", "is_commit", (OB_SUCCESS == ret), KR(tmp_ret));574ret = (OB_SUCC(ret)) ? tmp_ret : ret;575}576}577
578if (OB_SUCC(ret)) {579LOG_INFO("[RESTORE] switch job status", KR(ret), K(clone_job), K(next_status));580
581const char *prev_status_str = ObTenantCloneStatus::get_clone_status_str(clone_job.get_status());582const char *cur_status_str = ObTenantCloneStatus::get_clone_status_str(next_status);583
584ROOTSERVICE_EVENT_ADD("clone", "change_clone_status",585"job_id", clone_job.get_job_id(),586K(ret),587"prev_clone_status", prev_status_str,588"cur_clone_status", cur_status_str);589}590return ret;591}
592