oceanbase
1511 строк · 60.6 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS_RESTORE
14
15#include "ob_restore_scheduler.h"
16#include "rootserver/ob_ddl_service.h"
17#include "rootserver/ob_rs_async_rpc_proxy.h"
18#include "rootserver/ob_rs_event_history_table_operator.h"
19#include "rootserver/ob_unit_manager.h"//convert_pool_name_lis
20#include "rootserver/ob_ls_service_helper.h"//create_new_ls_in_trans
21#include "rootserver/ob_common_ls_service.h"//do_create_user_ls
22#include "rootserver/ob_tenant_role_transition_service.h"
23#include "share/ob_schema_status_proxy.h"
24#include "share/schema/ob_schema_utils.h"
25#include "share/schema/ob_schema_mgr.h"
26#include "share/ob_upgrade_utils.h"
27#include "lib/mysqlclient/ob_mysql_transaction.h" //ObMySQLTransaction
28#include "share/ls/ob_ls_status_operator.h" //ObLSStatusOperator
29#include "share/ls/ob_ls_operator.h"//ObLSAttr
30#include "storage/backup/ob_backup_data_store.h"//ObBackupDataLSAttrDesc
31#include "share/restore/ob_physical_restore_info.h"//ObPhysicalRestoreInfo
32#include "share/restore/ob_physical_restore_table_operator.h"//ObPhysicalRestoreTableOperator
33#include "share/ob_tenant_info_proxy.h"//ObAllTenantInfo
34#include "share/restore/ob_log_restore_source_mgr.h"
35#include "share/ls/ob_ls_recovery_stat_operator.h"//ObLSRecoveryStatOperator
36#include "share/ob_rpc_struct.h"
37#include "share/ob_primary_standby_service.h"
38#include "logservice/palf/log_define.h"//scn
39#include "share/scn.h"
40#include "ob_restore_service.h"
41#ifdef OB_BUILD_TDE_SECURITY
42#include "share/ob_master_key_getter.h"
43#endif
44
45namespace oceanbase
46{
47namespace rootserver
48{
49using namespace common;
50using namespace share;
51using namespace share::schema;
52using namespace obrpc;
53using namespace palf;
54
55ObRestoreScheduler::ObRestoreScheduler()
56: inited_(false), schema_service_(NULL),
57sql_proxy_(NULL), rpc_proxy_(NULL),
58srv_rpc_proxy_(NULL), lst_operator_(NULL),
59restore_service_(nullptr), self_addr_(),
60tenant_id_(OB_INVALID_TENANT_ID)
61{
62}
63
64ObRestoreScheduler::~ObRestoreScheduler()
65{
66}
67
68int ObRestoreScheduler::init(ObRestoreService &restore_service)
69{
70int ret = OB_SUCCESS;
71if (inited_) {
72ret = OB_INIT_TWICE;
73LOG_WARN("init twice", KR(ret));
74} else if (OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(GCTX.sql_proxy_)
75|| OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.srv_rpc_proxy_)
76|| OB_ISNULL(GCTX.lst_operator_)) {
77ret = OB_INVALID_ARGUMENT;
78LOG_WARN("invalid argument", KR(ret), KP(GCTX.schema_service_), KP(GCTX.sql_proxy_),
79KP(GCTX.rs_rpc_proxy_), KP(GCTX.srv_rpc_proxy_), KP(GCTX.lst_operator_));
80} else {
81schema_service_ = GCTX.schema_service_;
82sql_proxy_ = GCTX.sql_proxy_;
83rpc_proxy_ = GCTX.rs_rpc_proxy_;
84srv_rpc_proxy_ = GCTX.srv_rpc_proxy_;
85lst_operator_ = GCTX.lst_operator_;
86restore_service_ = &restore_service;
87tenant_id_ = is_sys_tenant(MTL_ID()) ? MTL_ID() : gen_user_tenant_id(MTL_ID());
88self_addr_ = GCTX.self_addr();
89inited_ = true;
90}
91return ret;
92}
93void ObRestoreScheduler::do_work()
94{
95LOG_INFO("[RESTORE] restore scheduler start");
96int ret = OB_SUCCESS;
97if (!inited_) {
98ret = OB_NOT_INIT;
99LOG_WARN("not inited", K(ret));
100} else {
101ObCurTraceId::init(GCTX.self_addr());
102LOG_INFO("[RESTORE] try process restore job");
103ObArray<ObPhysicalRestoreJob> job_infos;
104ObPhysicalRestoreTableOperator restore_op;
105if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_, share::OBCG_STORAGE /*group_id*/))) {
106LOG_WARN("fail init", K(ret), K(tenant_id_));
107} else if (OB_FAIL(restore_op.get_jobs(job_infos))) {
108LOG_WARN("fail to get jobs", KR(ret), K(tenant_id_));
109} else {
110FOREACH_CNT_X(job_info, job_infos, !restore_service_->has_set_stop()) { // ignore ret
111if (OB_ISNULL(job_info)) {
112ret = OB_ERR_UNEXPECTED;
113LOG_WARN("job info is null", K(ret));
114} else if (is_sys_tenant(tenant_id_)) {
115if (OB_FAIL(process_sys_restore_job(*job_info))) {
116LOG_WARN("failed to process sys restore job", KR(ret), KPC(job_info));
117}
118} else if (OB_FAIL(process_restore_job(*job_info))) {
119LOG_WARN("fail to process restore job", K(ret), KPC(job_info));
120}
121}
122}
123ret = OB_SUCCESS;
124restore_service_->idle();
125}
126LOG_INFO("[RESTORE] restore scheduler quit");
127return;
128}
129
130int ObRestoreScheduler::process_sys_restore_job(const ObPhysicalRestoreJob &job)
131{
132int ret = OB_SUCCESS;
133if (!inited_) {
134ret = OB_NOT_INIT;
135LOG_WARN("not inited", K(ret));
136} else if (OB_UNLIKELY(!is_sys_tenant(MTL_ID()))) {
137ret = OB_ERR_UNEXPECTED;
138LOG_WARN("not sys tenant", KR(ret));
139} else {
140switch (job.get_status()) {
141case PHYSICAL_RESTORE_CREATE_TENANT:
142ret = restore_tenant(job);
143break;
144case PHYSICAL_RESTORE_WAIT_TENANT_RESTORE_FINISH:
145ret = restore_wait_tenant_finish(job);
146break;
147case PHYSICAL_RESTORE_SUCCESS:
148ret = tenant_restore_finish(job);
149break;
150case PHYSICAL_RESTORE_FAIL:
151ret = tenant_restore_finish(job);
152break;
153default:
154ret = OB_ERR_UNEXPECTED;
155LOG_WARN("status not match", K(ret), K(job));
156break;
157}
158if (PHYSICAL_RESTORE_FAIL != job.get_status()) {
159int tmp_ret = OB_SUCCESS;
160if (OB_SUCCESS != (tmp_ret = try_recycle_job(job))) {
161LOG_WARN("fail to recycle job", K(tmp_ret), K(job));
162}
163}
164LOG_INFO("[RESTORE] doing restore", K(ret), K(job));
165}
166return ret;
167}
168
169
170int ObRestoreScheduler::process_restore_job(const ObPhysicalRestoreJob &job)
171{
172int ret = OB_SUCCESS;
173if (!inited_) {
174ret = OB_NOT_INIT;
175LOG_WARN("not inited", K(ret));
176} else if (OB_UNLIKELY(is_sys_tenant(MTL_ID()))) {
177ret = OB_ERR_UNEXPECTED;
178LOG_WARN("not sys tenant", KR(ret));
179} else {
180switch (job.get_status()) {
181case PHYSICAL_RESTORE_PRE:
182ret = restore_pre(job);
183break;
184case PHYSICAL_RESTORE_CREATE_INIT_LS:
185ret = restore_init_ls(job);
186break;
187case PHYSICAL_RESTORE_WAIT_CONSISTENT_SCN:
188ret = restore_wait_to_consistent_scn(job);
189break;
190case PHYSICAL_RESTORE_WAIT_LS:
191ret = restore_wait_ls_finish(job);
192break;
193case PHYSICAL_RESTORE_POST_CHECK:
194ret = post_check(job);
195break;
196case PHYSICAL_RESTORE_UPGRADE:
197ret = restore_upgrade(job);
198break;
199case PHYSICAL_RESTORE_SUCCESS:
200ret = restore_finish(job);
201break;
202case PHYSICAL_RESTORE_FAIL:
203ret = restore_finish(job);
204break;
205default:
206ret = OB_ERR_UNEXPECTED;
207LOG_WARN("status not match", K(ret), K(job));
208break;
209}
210//TODO, table restore
211LOG_INFO("[RESTORE] doing restore", K(ret), K(job));
212}
213return ret;
214}
215
216// restore_tenant is not reentrant
217int ObRestoreScheduler::restore_tenant(const ObPhysicalRestoreJob &job_info)
218{
219int ret = OB_SUCCESS;
220ObCreateTenantArg arg;
221//the pool list of job_info is obstring without '\0'
222ObSqlString pool_list;
223UInt64 tenant_id = OB_INVALID_TENANT_ID;
224DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_TENANT);
225int64_t timeout = GCONF._ob_ddl_timeout;
226if (!inited_) {
227ret = OB_NOT_INIT;
228LOG_WARN("not inited", K(ret));
229} else if (OB_FAIL(restore_service_->check_stop())) {
230LOG_WARN("restore scheduler stopped", K(ret));
231} else if (OB_INVALID_TENANT_ID != job_info.get_tenant_id()) {
232// restore_tenant can only be executed once.
233// only update job status
234} else if (OB_FAIL(pool_list.assign(job_info.get_pool_list()))) {
235LOG_WARN("failed to assign pool list", KR(ret), K(job_info));
236} else if (OB_FAIL(fill_create_tenant_arg(job_info, pool_list, arg))) {
237LOG_WARN("fail to fill create tenant arg", K(ret), K(pool_list), K(job_info));
238} else if (OB_FAIL(rpc_proxy_->timeout(timeout).create_tenant(arg, tenant_id))) {
239LOG_WARN("fail to create tenant", K(ret), K(arg));
240} else {
241ObPhysicalRestoreTableOperator restore_op;
242const int64_t job_id = job_info.get_job_id();
243const uint64_t new_tenant_id = tenant_id;
244if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_, share::OBCG_STORAGE /*group_id*/))) {
245LOG_WARN("fail init", K(ret), K(tenant_id_));
246} else if (OB_FAIL(restore_op.update_restore_option(
247job_id, "tenant_id", new_tenant_id))) {
248LOG_WARN("update restore option", K(ret), K(new_tenant_id), K(job_id), K(tenant_id_));
249} else if (OB_FAIL(may_update_restore_concurrency_(new_tenant_id, job_info))) {
250LOG_WARN("failed to update restore concurrency", K(ret), K(new_tenant_id), K(job_info));
251} else {
252restore_service_->wakeup();
253}
254}
255int tmp_ret = OB_SUCCESS;
256if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
257LOG_WARN("fail to update job status", K(ret), K(tmp_ret), K(job_info));
258}
259LOG_INFO("[RESTORE] restore tenant", K(ret), K(arg), K(job_info));
260return ret;
261}
262
263int ObRestoreScheduler::fill_create_tenant_arg(
264const ObPhysicalRestoreJob &job,
265const ObSqlString &pool_list,
266ObCreateTenantArg &arg)
267{
268int ret = OB_SUCCESS;
269ObSchemaGetterGuard schema_guard;
270if (!inited_) {
271ret = OB_NOT_INIT;
272LOG_WARN("not inited", K(ret));
273} else if (OB_FAIL(restore_service_->check_stop())) {
274LOG_WARN("restore scheduler stopped", K(ret));
275} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
276OB_SYS_TENANT_ID, schema_guard))) {
277LOG_WARN("fail to get tenant schema guard", K(ret));
278} else if(lib::Worker::CompatMode::ORACLE != job.get_compat_mode() && lib::Worker::CompatMode::MYSQL != job.get_compat_mode()) {
279ret = OB_INVALID_ARGUMENT;
280LOG_WARN("invalid compat mode", K(ret));
281} else {
282/*
283* restore_tenant will only run trans one when create tenant.
284* Consider the following tenant options:
285* 1) need backup: tenant_name,compatibility_mode
286* 2) need backup and replace(maybe): zone_list,primary_zone,locality,previous_locality
287* 3) not backup yet:locked,default_tablegroup_id,info TODO: (yanmu.ztl)
288* 4) no need to backup:drop_tenant_time,status,collation_type
289* 6) abandoned: replica_num,read_only,rewrite_merge_version,logonly_replica_num,
290* storage_format_version,storage_format_work_version
291*/
292ObCompatibilityMode mode = lib::Worker::CompatMode::ORACLE == job.get_compat_mode() ?
293ObCompatibilityMode::ORACLE_MODE :
294ObCompatibilityMode::MYSQL_MODE;
295arg.exec_tenant_id_ = OB_SYS_TENANT_ID;
296arg.tenant_schema_.set_tenant_name(job.get_tenant_name());
297arg.tenant_schema_.set_compatibility_mode(mode);
298arg.if_not_exist_ = false;
299arg.is_restore_ = true;
300// create tmp tenant for recover table
301arg.is_tmp_tenant_for_recover_ = job.get_recover_table();
302// Physical restore is devided into 2 stages. Recover to 'consistent_scn' which was recorded during
303// data backup first, then to user specified scn.
304arg.recovery_until_scn_ = job.get_consistent_scn();
305arg.compatible_version_ = job.get_source_data_version();
306if (OB_FAIL(assign_pool_list(pool_list.ptr(), arg.pool_list_))) {
307LOG_WARN("fail to get pool list", K(ret), K(pool_list));
308}
309
310if (OB_SUCC(ret)) {
311ObTenantSchema &tenant_schema = arg.tenant_schema_;
312const ObString& locality_str = job.get_locality();
313const ObString &primary_zone = job.get_primary_zone();
314if (!primary_zone.empty()) {
315// specific primary_zone
316tenant_schema.set_primary_zone(primary_zone);
317}
318if (!locality_str.empty()) {
319tenant_schema.set_locality(locality_str);
320}
321}
322if (FAILEDx(ObRestoreUtil::get_restore_ls_palf_base_info(job, SYS_LS, arg.palf_base_info_))) {
323LOG_WARN("failed to get sys ls palf base info", KR(ret), K(job));
324}
325}
326return ret;
327}
328
329int ObRestoreScheduler::assign_pool_list(
330const char *str,
331common::ObIArray<ObString> &pool_list)
332{
333int ret = OB_SUCCESS;
334char *item_str = NULL;
335char *save_ptr = NULL;
336while (OB_SUCC(ret)) {
337item_str = strtok_r((NULL == item_str ? const_cast<char *>(str) : NULL), ",", &save_ptr);
338if (NULL != item_str) {
339ObString pool(item_str);
340if (OB_FAIL(pool_list.push_back(pool))) {
341LOG_WARN("push_back failed", K(ret), K(pool));
342}
343} else {
344break;
345}
346}
347return ret;
348}
349
350int ObRestoreScheduler::check_locality_valid(
351const share::schema::ZoneLocalityIArray &locality)
352{
353int ret = OB_SUCCESS;
354int64_t cnt = locality.count();
355if (cnt <= 0) {
356ret = OB_INVALID_ARGUMENT;
357LOG_WARN("invalid cnt", KR(ret), K(cnt));
358} else {
359for (int64_t i = 0; OB_SUCC(ret) && i < cnt; i++) {
360const share::ObZoneReplicaAttrSet &attr = locality.at(i);
361if (attr.is_specific_readonly_replica()
362|| attr.is_allserver_readonly_replica()
363|| attr.get_encryption_logonly_replica_num() > 0) {
364ret = OB_NOT_SUPPORTED;
365LOG_WARN("locality with readonly/encrytion_logonly replica is not supported",
366KR(ret), K(locality));
367} else if (attr.is_mixed_locality()) {
368ret = OB_NOT_SUPPORTED;
369LOG_WARN("mixed locality is not supported", KR(ret), K(locality));
370} else if (attr.is_specific_replica_attr()) {
371ret = OB_NOT_SUPPORTED;
372LOG_WARN("locality with memstore_percent is not supported", KR(ret), K(locality));
373}
374}
375}
376return ret;
377}
378
379
380int ObRestoreScheduler::check_tenant_can_restore_(const uint64_t tenant_id)
381{
382int ret = OB_SUCCESS;
383if (OB_UNLIKELY(!inited_)) {
384ret = OB_NOT_INIT;
385LOG_WARN("not inited", KR(ret));
386} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
387ret = OB_INVALID_ARGUMENT;
388LOG_WARN("tenant id invalid", KR(ret), K(tenant_id));
389} else if (OB_FAIL(restore_service_->check_stop())) {
390LOG_WARN("restore scheduler stopped", KR(ret));
391} else if (GCONF.in_upgrade_mode()) {
392// 2. check in upgrade mode
393ret = OB_OP_NOT_ALLOW;
394LOG_WARN("[RESTORE] cluster is upgrading, try recycle job",
395KR(ret), K(tenant_id));
396}
397return ret;
398
399}
400
401//restore pre :modify parameters
402int ObRestoreScheduler::restore_pre(const ObPhysicalRestoreJob &job_info)
403{
404int ret = OB_SUCCESS;
405if (!inited_) {
406ret = OB_NOT_INIT;
407LOG_WARN("not inited", K(ret));
408} else if (OB_INVALID_TENANT_ID == tenant_id_
409|| OB_SYS_TENANT_ID == tenant_id_) {
410ret = OB_INVALID_ARGUMENT;
411LOG_WARN("invalid tenant id", K(ret), K(tenant_id_));
412} else if (OB_FAIL(restore_service_->check_stop())) {
413LOG_WARN("restore scheduler stopped", K(ret));
414} else if (OB_FAIL(restore_root_key(job_info))) {
415LOG_WARN("fail to restore root key", K(ret));
416} else if (OB_FAIL(restore_keystore(job_info))) {
417LOG_WARN("fail to restore keystore", K(ret), K(job_info));
418} else {
419if (OB_FAIL(fill_restore_statistics(job_info))) {
420LOG_WARN("fail to fill restore statistics", K(ret), K(job_info));
421}
422int tmp_ret = OB_SUCCESS;
423if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
424LOG_WARN("fail to update job status", K(ret), K(tmp_ret), K(job_info));
425}
426}
427LOG_INFO("[RESTORE] restore pre", K(ret), K(job_info));
428return ret;
429}
430
431int ObRestoreScheduler::fill_restore_statistics(const share::ObPhysicalRestoreJob &job_info)
432{
433int ret = OB_SUCCESS;
434ObRestoreProgressPersistInfo restore_progress_info;
435restore_progress_info.key_.job_id_ = job_info.get_job_id();
436restore_progress_info.key_.tenant_id_ = job_info.get_tenant_id();
437restore_progress_info.restore_scn_ = job_info.get_restore_scn();
438int64_t idx = job_info.get_multi_restore_path_list().get_backup_set_path_list().count() - 1;
439ObBackupDataLSAttrDesc ls_info;
440if (idx < 0) {
441ret = OB_ERR_UNEXPECTED;
442LOG_WARN("invalid job info", K(ret), K(idx), K(job_info));
443} else {
444storage::ObBackupDataStore store;
445storage::ObExternBackupSetInfoDesc backup_set_info;
446const share::ObBackupSetPath &backup_set_path = job_info.get_multi_restore_path_list().get_backup_set_path_list().at(idx);
447if (OB_FAIL(store.init(backup_set_path.ptr()))) {
448LOG_WARN("fail to init backup data store", K(backup_set_path));
449} else if (OB_FAIL(store.read_backup_set_info(backup_set_info))) {
450LOG_WARN("fail to read backup set info", K(ret));
451} else if (OB_FAIL(store.read_ls_attr_info(backup_set_info.backup_set_file_.meta_turn_id_, ls_info))) {
452LOG_WARN("fail to read ls attr info", K(ret));
453} else {
454restore_progress_info.ls_count_ = ls_info.ls_attr_array_.count();
455restore_progress_info.tablet_count_ = backup_set_info.backup_set_file_.stats_.finish_tablet_count_;
456restore_progress_info.total_bytes_ = backup_set_info.backup_set_file_.stats_.output_bytes_;
457}
458}
459if (OB_SUCC(ret)) {
460share::ObRestorePersistHelper helper;
461if (OB_FAIL(helper.init(job_info.get_tenant_id(), share::OBCG_STORAGE /*group_id*/))) {
462LOG_WARN("fail to init heler", K(ret));
463} else if (OB_FAIL(helper.insert_initial_restore_progress(*sql_proxy_, restore_progress_info))) {
464LOG_WARN("fail to insert initail ls restore progress", K(ret));
465}
466}
467return ret;
468}
469
470int ObRestoreScheduler::convert_tde_parameters(
471const ObPhysicalRestoreJob &job_info)
472{
473int ret = OB_SUCCESS;
474#ifdef OB_BUILD_TDE_SECURITY
475uint64_t tenant_id = tenant_id_;
476if (!inited_) {
477ret = OB_NOT_INIT;
478LOG_WARN("not inited", K(ret));
479} else if (OB_INVALID_TENANT_ID == tenant_id
480|| OB_SYS_TENANT_ID == tenant_id) {
481ret = OB_INVALID_ARGUMENT;
482LOG_WARN("invalid tenant id", K(ret), K(tenant_id));
483} else if (OB_FAIL(restore_service_->check_stop())) {
484LOG_WARN("restore scheduler stopped", K(ret));
485} else if (job_info.get_kms_dest().empty()) {
486// do nothing
487} else {
488ObArenaAllocator allocator;
489int64_t affected_row = 0;
490ObString tde_method;
491ObString kms_info;
492ObSqlString sql;
493// set tde_method
494if (OB_FAIL(ObMasterKeyUtil::restore_encrypt_params(allocator, job_info.get_kms_dest(),
495job_info.get_kms_encrypt_key(), tde_method, kms_info))) {
496LOG_WARN("failed to restore encrypt params", K(ret));
497} else if (OB_UNLIKELY(tde_method.empty())) {
498ret = OB_INVALID_ARGUMENT;
499LOG_WARN("tde_method is empty", K(ret));
500} else if (!job_info.get_kms_info().empty()) {
501kms_info = job_info.get_kms_info();
502}
503if (OB_FAIL(ret)) {
504} else if (!ObTdeMethodUtil::is_valid(tde_method)) {
505// do nothing
506} else if (OB_FAIL(ObRestoreCommonUtil::set_tde_parameters(sql_proxy_, rpc_proxy_,
507tenant_id, tde_method, kms_info))) {
508LOG_WARN("failed to set_tde_parameters", KR(ret), K(tenant_id), K(tde_method));
509}
510}
511#endif
512return ret;
513}
514
515int ObRestoreScheduler::restore_root_key(const share::ObPhysicalRestoreJob &job_info)
516{
517int ret = OB_SUCCESS;
518#ifdef OB_BUILD_TDE_SECURITY
519int64_t idx = job_info.get_multi_restore_path_list().get_backup_set_path_list().count() - 1;
520if (idx < 0) {
521ret = OB_ERR_UNEXPECTED;
522LOG_WARN("invalid job info", K(ret), K(idx), K(job_info));
523} else if (OB_ISNULL(srv_rpc_proxy_) || OB_ISNULL(sql_proxy_)) {
524ret = OB_ERR_UNEXPECTED;
525LOG_WARN("unexpected null svr rpc proxy or sql proxy", K(ret));
526} else {
527storage::ObBackupDataStore store;
528const share::ObBackupSetPath &backup_set_path = job_info.get_multi_restore_path_list().get_backup_set_path_list().at(idx);
529ObRootKey root_key;
530if (OB_FAIL(store.init(backup_set_path.ptr()))) {
531LOG_WARN("fail to init backup data store", K(ret));
532} else if (OB_FAIL(store.read_root_key_info(tenant_id_))) {
533LOG_WARN("fail to read root key info", K(ret));
534} else if (OB_FAIL(ObMasterKeyGetter::instance().get_root_key(tenant_id_, root_key))) {
535LOG_WARN("fail to get root key", K(ret));
536} else if (obrpc::RootKeyType::INVALID == root_key.key_type_) {
537// do nothing
538} else if (OB_FAIL(ObRestoreCommonUtil::notify_root_key(srv_rpc_proxy_, sql_proxy_, tenant_id_, root_key))) {
539LOG_WARN("failed to notify root key", KR(ret), K(tenant_id_));
540}
541}
542#endif
543return ret;
544}
545
546int ObRestoreScheduler::restore_keystore(const share::ObPhysicalRestoreJob &job_info)
547{
548int ret = OB_SUCCESS;
549#ifdef OB_BUILD_TDE_SECURITY
550const int64_t DEFAULT_TIMEOUT = 10 * 1000 * 1000L;
551ObUnitTableOperator unit_operator;
552common::ObArray<ObUnit> units;
553ObArray<int> return_code_array;
554obrpc::ObRestoreKeyArg arg;
555if (job_info.get_kms_dest().empty()) {
556// do nothing
557} else if (OB_ISNULL(srv_rpc_proxy_) || OB_ISNULL(sql_proxy_)) {
558ret = OB_ERR_UNEXPECTED;
559LOG_WARN("unexpected null svr rpc proxy or sql proxy", K(ret));
560} else if (OB_FAIL(unit_operator.init(*sql_proxy_))) {
561LOG_WARN("failed to init unit operator", KR(ret));
562} else if (OB_FAIL(unit_operator.get_units_by_tenant(tenant_id_, units))) {
563LOG_WARN("failed to get tenant unit", KR(ret), K_(tenant_id));
564} else {
565ObRestoreKeyProxy proxy(*srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::restore_key);
566arg.tenant_id_ = job_info.get_tenant_id();
567arg.backup_dest_ = job_info.get_kms_dest();
568arg.encrypt_key_ = job_info.get_kms_encrypt_key();
569for (int64_t i = 0; OB_SUCC(ret) && i < units.count(); i++) {
570const ObUnit &unit = units.at(i);
571if (OB_FAIL(proxy.call(unit.server_, DEFAULT_TIMEOUT, arg))) {
572LOG_WARN("failed to send rpc", KR(ret));
573}
574}
575
576int tmp_ret = OB_SUCCESS;
577if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
578LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret));
579ret = OB_SUCC(ret) ? tmp_ret : ret;
580} else if (OB_FAIL(ret)) {
581} else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) {
582LOG_WARN("fail to check return cnt", KR(ret), "return_cnt", return_code_array.count());
583} else {
584for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) {
585ret = return_code_array.at(i);
586const ObAddr &addr = proxy.get_dests().at(i);
587if (OB_FAIL(ret)) {
588LOG_WARN("failed to restore key", KR(ret), K(addr));
589}
590}
591}
592}
593#endif
594return ret;
595}
596
597int ObRestoreScheduler::post_check(const ObPhysicalRestoreJob &job_info)
598{
599int ret = OB_SUCCESS;
600DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_POST_CHECK);
601bool sync_satisfied = true;
602
603if (!inited_) {
604ret = OB_NOT_INIT;
605LOG_WARN("not inited", K(ret));
606} else if (OB_INVALID_TENANT_ID == tenant_id_
607|| OB_SYS_TENANT_ID == tenant_id_) {
608ret = OB_INVALID_ARGUMENT;
609LOG_WARN("invalid tenant id", K(ret), K(tenant_id_));
610} else if (OB_FAIL(restore_service_->check_stop())) {
611LOG_WARN("restore scheduler stopped", K(ret));
612} else if (OB_FAIL(ObRestoreCommonUtil::try_update_tenant_role(sql_proxy_, tenant_id_,
613job_info.get_restore_scn(), false /*is_clone*/, sync_satisfied))) {
614LOG_WARN("failed to try update tenant role", KR(ret), K(tenant_id_), K(job_info));
615} else if (!sync_satisfied) {
616ret = OB_NEED_WAIT;
617LOG_WARN("tenant sync scn not equal to restore scn, need wait", KR(ret), K(job_info));
618}
619
620if (FAILEDx(ObRestoreCommonUtil::process_schema(sql_proxy_, tenant_id_))) {
621LOG_WARN("failed to process schema", KR(ret));
622}
623
624if (FAILEDx(convert_tde_parameters(job_info))) {
625LOG_WARN("fail to convert parameters", K(ret), K(job_info));
626}
627
628if (OB_SUCC(ret)) {
629int tmp_ret = OB_SUCCESS;
630if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
631LOG_WARN("fail to update job status", K(ret), K(tmp_ret), K(job_info));
632}
633}
634LOG_INFO("[RESTORE] post check", K(ret), K(job_info));
635return ret;
636}
637
638int ObRestoreScheduler::restore_finish(const ObPhysicalRestoreJob &job_info)
639{
640int ret = OB_SUCCESS;
641
642if (!inited_) {
643ret = OB_NOT_INIT;
644LOG_WARN("not init", K(ret));
645} else if (OB_FAIL(restore_service_->check_stop())) {
646LOG_WARN("restore scheduler stopped", K(ret));
647} else if (OB_FAIL(ObRestoreUtil::recycle_restore_job(tenant_id_, *sql_proxy_,
648job_info))) {
649LOG_WARN("finish restore tasks failed", K(job_info), K(ret), K(tenant_id_));
650} else {
651LOG_INFO("[RESTORE] restore tenant success", K(ret), K(job_info));
652}
653ROOTSERVICE_EVENT_ADD("physical_restore", "restore_finish",
654"restore_stauts", job_info.get_status(),
655"tenant", job_info.get_tenant_name());
656return ret;
657}
658
659int ObRestoreScheduler::tenant_restore_finish(const ObPhysicalRestoreJob &job_info)
660{
661int ret = OB_SUCCESS;
662ObHisRestoreJobPersistInfo history_info;
663bool restore_tenant_exist = true;
664if (!inited_) {
665ret = OB_NOT_INIT;
666LOG_WARN("not init", K(ret));
667} else if (OB_FAIL(restore_service_->check_stop())) {
668LOG_WARN("restore scheduler stopped", K(ret));
669} else if (OB_FAIL(try_get_tenant_restore_history_(job_info, history_info, restore_tenant_exist))) {
670LOG_WARN("failed to get user tenant restory info", KR(ret), K(job_info));
671} else if (restore_tenant_exist && OB_FAIL(reset_restore_concurrency_(job_info.get_tenant_id(), job_info))) {
672LOG_WARN("failed to reset restore concurrency", K(ret), K(job_info));
673} else if (share::PHYSICAL_RESTORE_SUCCESS == job_info.get_status()) {
674//restore success
675} else {
676int tmp_ret = OB_SUCCESS;
677ObRestoreFailureChecker checker;
678bool is_concurrent_with_clean = false;
679if (OB_TMP_FAIL(checker.init(job_info))) {
680LOG_WARN("failed to init restore failure checker", K(tmp_ret), K(job_info));
681} else if (OB_TMP_FAIL(checker.check_is_concurrent_with_clean(is_concurrent_with_clean))) {
682LOG_WARN("failed to check is clean concurrency failure", K(tmp_ret));
683}
684if (OB_SUCC(ret) && is_concurrent_with_clean) {
685int64_t pos = 0;
686if (OB_FAIL(databuff_printf(history_info.comment_.ptr(), history_info.comment_.capacity(), pos,
687"%s;", "physical restore run concurrently with backup data clean, please check backup and archive jobs"))) {
688if (OB_SIZE_OVERFLOW == ret) {
689ret = OB_SUCCESS;
690} else {
691LOG_WARN("failed to databuff printf comment", K(ret));
692}
693}
694}
695}
696
697if (FAILEDx(ObRestoreUtil::recycle_restore_job(*sql_proxy_,
698job_info, history_info))) {
699LOG_WARN("finish restore tasks failed", KR(ret), K(job_info), K(history_info), K(tenant_id_));
700} else {
701LOG_INFO("[RESTORE] restore tenant finish", K(ret), K(job_info));
702}
703ROOTSERVICE_EVENT_ADD("physical_restore", "restore_finish",
704"restore_status", job_info.get_status(),
705"tenant", job_info.get_tenant_name());
706return ret;
707}
708
709int ObRestoreScheduler::try_get_tenant_restore_history_(
710const ObPhysicalRestoreJob &job_info,
711ObHisRestoreJobPersistInfo &history_info,
712bool &restore_tenant_exist)
713{
714int ret = OB_SUCCESS;
715restore_tenant_exist = true;
716ObHisRestoreJobPersistInfo user_history_info;
717const uint64_t restore_tenant_id = job_info.get_tenant_id();
718if (!inited_) {
719ret = OB_NOT_INIT;
720LOG_WARN("not inited", KR(ret));
721} else if (OB_FAIL(restore_service_->check_stop())) {
722LOG_WARN("restore scheduler stopped", KR(ret));
723} else if (OB_FAIL(ObRestoreCommonUtil::check_tenant_is_existed(schema_service_,
724restore_tenant_id, restore_tenant_exist))) {
725LOG_WARN("fail to check tenant_is_existed", KR(ret), K(restore_tenant_id), K(job_info));
726}
727if (OB_FAIL(ret)) {
728} else if (!restore_tenant_exist) {
729if (OB_FAIL(history_info.init_with_job(job_info))) {
730LOG_WARN("failed to init with job", KR(ret), K(job_info));
731}
732} else if (OB_FAIL(ObRestoreUtil::get_user_restore_job_history(
733*sql_proxy_, job_info.get_tenant_id(),
734job_info.get_restore_key().tenant_id_, job_info.get_job_id(),
735user_history_info))) {
736LOG_WARN("failed to get user restore job history", KR(ret), K(job_info));
737} else if (OB_FAIL(history_info.init_initiator_job_history(job_info, user_history_info))) {
738LOG_WARN("failed to init restore job history", KR(ret), K(job_info), K(user_history_info));
739}
740return ret;
741}
742
743/*
744* 1. Physical restore is not allowed when cluster is in upgrade mode or is standby.
745* 2. Physical restore jobs will be recycled asynchronously when restore tenant has been dropped.
746* 3. Physical restore jobs will be used to avoid duplicate tenant_name when tenant is creating.
747*/
748int ObRestoreScheduler::try_recycle_job(const ObPhysicalRestoreJob &job)
749{
750int ret = OB_SUCCESS;
751ObSchemaGetterGuard schema_guard;
752bool is_dropped = false;
753int failed_ret = OB_SUCCESS;
754DEBUG_SYNC(BEFORE_RECYCLE_PHYSICAL_RESTORE_JOB);
755
756if (!inited_) {
757ret = OB_NOT_INIT;
758LOG_WARN("not inited", KR(ret));
759} else if (OB_FAIL(check_tenant_can_restore_(tenant_id_))) {
760LOG_WARN("tenant cannot restore", KR(ret), K(tenant_id_));
761} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
762LOG_WARN("fail to get tenant schema guard", KR(ret));
763} else if (OB_SUCCESS != schema_guard.check_formal_guard()) {
764// skip
765} else if (OB_INVALID_TENANT_ID == job.get_tenant_id()) {
766//restore tenant may be failed to create, will to restore failed
767} else if (OB_FAIL(schema_guard.check_if_tenant_has_been_dropped(job.get_tenant_id(), is_dropped))) {
768LOG_WARN("fail to get tenant id", KR(ret), K(job));
769} else if (!is_dropped) {
770// skip
771} else {
772// 3. tenant has been dropped
773failed_ret = OB_TENANT_NOT_EXIST;
774LOG_WARN("[RESTORE] tenant has been dropped, try recycle job",
775KR(ret), K(tenant_id_));
776}
777if (OB_SUCC(ret) && OB_SUCCESS != failed_ret) {
778int tmp_ret = OB_SUCCESS;
779if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, failed_ret, job))) {
780LOG_WARN("fail to update job status", KR(ret), K(tmp_ret), K(failed_ret), K(job));
781}
782}
783return ret;
784}
785
786int ObRestoreScheduler::try_update_job_status(
787common::ObISQLClient &sql_client,
788int return_ret,
789const ObPhysicalRestoreJob &job,
790share::PhysicalRestoreMod mod)
791{
792int ret = OB_SUCCESS;
793ObPhysicalRestoreTableOperator restore_op;
794if (!inited_) {
795ret = OB_NOT_INIT;
796LOG_WARN("not inited", K(ret));
797} else if (OB_FAIL(restore_service_->check_stop())) {
798LOG_WARN("restore scheduler stopped", K(ret));
799} else if (OB_FAIL(restore_op.init(&sql_client, tenant_id_, share::OBCG_STORAGE /*group_id*/))) {
800LOG_WARN("fail init", K(ret), K(tenant_id_));
801} else {
802PhysicalRestoreStatus next_status = get_next_status(return_ret, job.get_status());
803const common::ObCurTraceId::TraceId trace_id = *ObCurTraceId::get_trace_id();
804
805if (PHYSICAL_RESTORE_FAIL == next_status && OB_LS_RESTORE_FAILED != return_ret
806&& OB_FAIL(restore_op.update_job_error_info(job.get_job_id(), return_ret, mod, trace_id, self_addr_))) {
807// if restore failed at wait ls, observer has record error info,
808// rs no need to record error info again.
809LOG_WARN("fail to update job error info", K(ret), K(job), K(return_ret), K(mod), K(tenant_id_));
810} else if (OB_FAIL(restore_op.update_job_status(job.get_job_id(), next_status))) {
811LOG_WARN("fail update job status", K(ret), K(job), K(next_status), K(tenant_id_));
812} else {
813//can not be zero
814restore_service_->wakeup();
815LOG_INFO("[RESTORE] switch job status", K(ret), K(job), K(next_status));
816(void)record_rs_event(job, next_status);
817}
818}
819return ret;
820}
821
822void ObRestoreScheduler::record_rs_event(
823const ObPhysicalRestoreJob &job,
824const PhysicalRestoreStatus status)
825{
826const char *status_str = ObPhysicalRestoreTableOperator::get_restore_status_str(
827static_cast<PhysicalRestoreStatus>(status));
828ROOTSERVICE_EVENT_ADD("physical_restore", "change_restore_status",
829"job_id", job.get_job_id(),
830"tenant", job.get_tenant_name(),
831"status", status_str);
832}
833
834PhysicalRestoreStatus ObRestoreScheduler::get_sys_next_status(
835PhysicalRestoreStatus current_status)
836{
837PhysicalRestoreStatus next_status = PHYSICAL_RESTORE_MAX_STATUS;
838switch (current_status) {
839case PHYSICAL_RESTORE_CREATE_TENANT : {
840next_status = PHYSICAL_RESTORE_WAIT_TENANT_RESTORE_FINISH;
841break;
842}
843case PHYSICAL_RESTORE_WAIT_TENANT_RESTORE_FINISH : {
844next_status = PHYSICAL_RESTORE_SUCCESS;
845break;
846}
847default : {
848// do nothing
849}
850}
851return next_status;
852}
853
854
855
856PhysicalRestoreStatus ObRestoreScheduler::get_next_status(
857int return_ret,
858PhysicalRestoreStatus current_status)
859{
860PhysicalRestoreStatus next_status = PHYSICAL_RESTORE_MAX_STATUS;
861if (OB_SUCCESS != return_ret) {
862next_status = PHYSICAL_RESTORE_FAIL;
863} else if (is_sys_tenant(MTL_ID())) {
864next_status = get_sys_next_status(current_status);
865} else {
866switch (current_status) {
867case PHYSICAL_RESTORE_PRE : {
868next_status = PHYSICAL_RESTORE_CREATE_INIT_LS;
869break;
870}
871case PHYSICAL_RESTORE_CREATE_INIT_LS : {
872next_status = PHYSICAL_RESTORE_WAIT_CONSISTENT_SCN;
873break;
874}
875case PHYSICAL_RESTORE_WAIT_CONSISTENT_SCN : {
876next_status = PHYSICAL_RESTORE_WAIT_LS;
877break;
878}
879case PHYSICAL_RESTORE_WAIT_LS : {
880next_status = PHYSICAL_RESTORE_POST_CHECK;
881break;
882}
883case PHYSICAL_RESTORE_POST_CHECK : {
884next_status = PHYSICAL_RESTORE_UPGRADE;
885break;
886}
887case PHYSICAL_RESTORE_UPGRADE : {
888next_status = PHYSICAL_RESTORE_SUCCESS;
889break;
890}
891default : {
892// do nothing
893}
894}
895}
896return next_status;
897}
898
899int ObRestoreScheduler::restore_upgrade(const ObPhysicalRestoreJob &job_info)
900{
901int ret = OB_SUCCESS;
902DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_UPGRADE_PRE);
903if (!inited_) {
904ret = OB_NOT_INIT;
905LOG_WARN("not inited", KR(ret));
906} else if (OB_INVALID_TENANT_ID == tenant_id_
907|| OB_SYS_TENANT_ID == tenant_id_) {
908ret = OB_INVALID_ARGUMENT;
909LOG_WARN("invalid tenant id", KR(ret), K(tenant_id_));
910} else if (OB_FAIL(restore_service_->check_stop())) {
911LOG_WARN("restore scheduler stopped", KR(ret));
912} else {
913if (OB_SUCC(ret)) {
914int tmp_ret = OB_SUCCESS;
915if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
916LOG_WARN("fail to update job status", K(ret), K(tmp_ret), K(job_info));
917}
918}
919}
920LOG_INFO("[RESTORE] upgrade pre finish", KR(ret), K(job_info));
921return ret;
922}
923
924int ObRestoreScheduler::restore_init_ls(const share::ObPhysicalRestoreJob &job_info)
925{
926int ret = OB_SUCCESS;
927DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_INIT_LS);
928ObSchemaGetterGuard schema_guard;
929const share::schema::ObTenantSchema *tenant_schema = NULL;
930const common::ObSArray<share::ObBackupSetPath> &backup_set_path_array =
931job_info.get_multi_restore_path_list().get_backup_set_path_list();
932const common::ObSArray<share::ObBackupPathString> &log_path_array = job_info.get_multi_restore_path_list().get_log_path_list();
933if (OB_UNLIKELY(!inited_)) {
934ret = OB_NOT_INIT;
935LOG_WARN("not inited", KR(ret));
936} else if (OB_FAIL(check_tenant_can_restore_(tenant_id_))) {
937LOG_WARN("tenant can not restore", KR(ret), K(tenant_id_));
938} else if (OB_ISNULL(sql_proxy_)) {
939ret = OB_ERR_UNEXPECTED;
940LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
941} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
942OB_SYS_TENANT_ID, schema_guard))) {
943LOG_WARN("fail to get tenant schema guard", KR(ret));
944} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id_, tenant_schema))) {
945LOG_WARN("fail to get tenant schema", KR(ret), K(job_info));
946} else if (OB_ISNULL(tenant_schema) || !tenant_schema->is_restore_tenant_status()) {
947ret = OB_ERR_UNEXPECTED;
948LOG_WARN("tenant not exist or tenant is not in physical restore status", KR(ret),
949K(tenant_schema));
950} else if (OB_UNLIKELY(0 == backup_set_path_array.count())) {
951ret = OB_ERR_UNEXPECTED;
952LOG_WARN("backup piece path not expected", KR(ret), K(job_info));
953} else {
954const int64_t backup_path_count = backup_set_path_array.count();
955const ObString &backup_set_path = backup_set_path_array.at(backup_path_count - 1).ptr();
956storage::ObBackupDataLSAttrDesc backup_ls_attr;
957ObLogRestoreSourceMgr restore_source_mgr;
958storage::ObBackupDataStore store;
959if (OB_FAIL(store.init(backup_set_path.ptr()))) {
960LOG_WARN("fail to ini backup extern mgr", K(ret));
961} else if (OB_FAIL(store.read_ls_attr_info(backup_ls_attr))) {
962LOG_WARN("failed to read ls info", KR(ret));
963} else {
964const SCN &sync_scn = backup_ls_attr.backup_scn_;
965ObLSRecoveryStatOperator ls_recovery;
966const uint64_t exec_tenant_id = get_private_table_exec_tenant_id(tenant_id_);
967START_TRANSACTION(sql_proxy_, exec_tenant_id)
968LOG_INFO("start to create ls and set sync scn", K(sync_scn), K(backup_ls_attr), KR(ret));
969if (FAILEDx(ls_recovery.update_sys_ls_sync_scn(tenant_id_, trans, sync_scn))) {
970LOG_WARN("failed to update sync ls sync scn", KR(ret), K(sync_scn));
971}
972END_TRANSACTION(trans)
973}
974if (FAILEDx(create_all_ls_(*tenant_schema, backup_ls_attr.ls_attr_array_))) {
975LOG_WARN("failed to create all ls", KR(ret), K(backup_ls_attr), KPC(tenant_schema));
976} else if (OB_FAIL(wait_all_ls_created_(*tenant_schema, job_info))) {
977LOG_WARN("failed to wait all ls created", KR(ret), KPC(tenant_schema));
978} else if (OB_FAIL(finish_create_ls_(*tenant_schema, backup_ls_attr.ls_attr_array_))) {
979LOG_WARN("failed to finish create ls", KR(ret), KPC(tenant_schema));
980} else if (OB_FAIL(restore_source_mgr.init(tenant_id_, sql_proxy_))) {
981LOG_WARN("failed to init restore_source_mgr", KR(ret));
982} else if (1 == log_path_array.count()
983&& OB_FAIL(restore_source_mgr.add_location_source(job_info.get_restore_scn(), log_path_array.at(0).str()))) {
984LOG_WARN("failed to add log restore source", KR(ret), K(job_info), K(log_path_array));
985}
986}
987
988#ifdef ERRSIM
989ret = OB_E(EventTable::EN_RESTORE_CREATE_LS_FAILED) OB_SUCCESS;
990#endif
991
992TenantRestoreStatus tenant_restore_status;
993if (OB_FAIL(ret)) {
994int tmp_ret = OB_SUCCESS;
995if (OB_TMP_FAIL(check_all_ls_restore_to_consistent_scn_finish_(tenant_id_, tenant_restore_status))) {
996LOG_WARN("failed to check all ls restore to consistent scn finish", K(ret));
997}
998}
999
1000if (OB_SUCC(ret) || tenant_restore_status.is_failed()) {
1001int tmp_ret = OB_SUCCESS;
1002if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
1003tmp_ret = OB_SUCC(ret) ? tmp_ret : ret;
1004LOG_WARN("fail to update job status", K(ret), K(tmp_ret), K(job_info));
1005}
1006}
1007if (OB_FAIL(ret)) {
1008restore_service_->wakeup();
1009}
1010LOG_INFO("[RESTORE] create init ls", KR(ret), K(job_info));
1011
1012return ret;
1013}
1014
1015int ObRestoreScheduler::set_restore_to_target_scn_(
1016common::ObMySQLTransaction &trans, const share::ObPhysicalRestoreJob &job_info, const share::SCN &scn)
1017{
1018int ret = OB_SUCCESS;
1019const uint64_t tenant_id = job_info.get_tenant_id();
1020ObAllTenantInfo tenant_info;
1021ObLSRecoveryStatOperator ls_recovery_operator;
1022ObLSRecoveryStat sys_ls_recovery;
1023ObLogRestoreSourceMgr restore_source_mgr;
1024if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, &trans, true/*for_update*/, tenant_info))) {
1025LOG_WARN("failed to load all tenant info", KR(ret), "tenant_id", job_info.get_tenant_id());
1026} else if (OB_FAIL(ls_recovery_operator.get_ls_recovery_stat(tenant_id, share::SYS_LS,
1027true /*for_update*/, sys_ls_recovery, trans))) {
1028LOG_WARN("failed to get ls recovery stat", KR(ret), K(tenant_id));
1029} else if (scn < tenant_info.get_sync_scn() || scn < sys_ls_recovery.get_sync_scn()) {
1030ret = OB_INVALID_ARGUMENT;
1031LOG_WARN("recover before tenant sync_scn or SYS LS sync_scn is not allow", KR(ret), K(tenant_info),
1032K(tenant_id), K(scn), K(sys_ls_recovery));
1033} else if (tenant_info.get_recovery_until_scn() == scn) {
1034LOG_INFO("recovery_until_scn is same with original", K(tenant_info), K(tenant_id), K(scn));
1035} else if (OB_FAIL(restore_source_mgr.init(tenant_id, &trans))) {
1036LOG_WARN("failed to init restore_source_mgr", KR(ret), K(tenant_id), K(scn));
1037} else if (OB_FAIL(restore_source_mgr.update_recovery_until_scn(scn))) {
1038LOG_WARN("failed to update_recovery_until_scn", KR(ret), K(tenant_id), K(scn));
1039} else if (OB_FAIL(ObAllTenantInfoProxy::update_tenant_recovery_until_scn(
1040tenant_id, trans, tenant_info.get_switchover_epoch(), scn))) {
1041LOG_WARN("failed to update_tenant_recovery_until_scn", KR(ret), K(tenant_id), K(scn));
1042} else {
1043LOG_INFO("succeed to set recover until scn", K(scn));
1044}
1045return ret;
1046}
1047int ObRestoreScheduler::create_all_ls_(
1048const share::schema::ObTenantSchema &tenant_schema,
1049const common::ObIArray<ObLSAttr> &ls_attr_array)
1050{
1051int ret = OB_SUCCESS;
1052if (OB_UNLIKELY(!inited_)) {
1053ret = OB_NOT_INIT;
1054LOG_WARN("not inited", KR(ret));
1055} else if (OB_FAIL(restore_service_->check_stop())) {
1056LOG_WARN("restore scheduler stopped", KR(ret));
1057} else if (OB_ISNULL(sql_proxy_)) {
1058ret = OB_ERR_UNEXPECTED;
1059LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1060} else if (OB_FAIL(ObRestoreCommonUtil::create_all_ls(sql_proxy_, tenant_id_, tenant_schema, ls_attr_array))) {
1061LOG_WARN("fail to create all ls", KR(ret), K(tenant_id_), K(tenant_schema), K(ls_attr_array));
1062}
1063return ret;
1064}
1065
1066int ObRestoreScheduler::wait_all_ls_created_(const share::schema::ObTenantSchema &tenant_schema,
1067const share::ObPhysicalRestoreJob &job_info)
1068{
1069int ret = OB_SUCCESS;
1070if (OB_UNLIKELY(!inited_)) {
1071ret = OB_NOT_INIT;
1072LOG_WARN("not inited", KR(ret));
1073} else if (OB_FAIL(restore_service_->check_stop())) {
1074LOG_WARN("restore scheduler stopped", KR(ret));
1075} else if (OB_UNLIKELY(!tenant_schema.is_valid())) {
1076ret = OB_INVALID_ARGUMENT;
1077LOG_WARN("invalid argument", KR(ret), K(tenant_schema));
1078} else if (OB_ISNULL(sql_proxy_)) {
1079ret = OB_ERR_UNEXPECTED;
1080LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1081} else {
1082const uint64_t tenant_id = tenant_schema.get_tenant_id();
1083ObLSStatusOperator status_op;
1084ObLSStatusInfoArray ls_array;
1085palf::PalfBaseInfo palf_base_info;
1086ObLSRecoveryStat recovery_stat;
1087ObLSRecoveryStatOperator ls_recovery_operator;
1088
1089if (OB_FAIL(status_op.get_all_ls_status_by_order(tenant_id, ls_array,
1090*sql_proxy_))) {
1091LOG_WARN("failed to get all ls status", KR(ret), K(tenant_id));
1092}
1093for (int64_t i = 0; OB_SUCC(ret) && i < ls_array.count(); ++i) {
1094const ObLSStatusInfo &info = ls_array.at(i);
1095if (info.ls_is_creating()) {
1096recovery_stat.reset();
1097if (OB_FAIL(ls_recovery_operator.get_ls_recovery_stat(tenant_id, info.ls_id_,
1098false/*for_update*/, recovery_stat, *sql_proxy_))) {
1099LOG_WARN("failed to get ls recovery stat", KR(ret), K(tenant_id), K(info));
1100} else if (OB_FAIL(ObRestoreUtil::get_restore_ls_palf_base_info(
1101job_info, info.ls_id_, palf_base_info))) {
1102LOG_WARN("failed to get restore ls palf info", KR(ret), K(info),
1103K(job_info));
1104} else if (OB_FAIL(ObCommonLSService::do_create_user_ls(
1105tenant_schema, info, recovery_stat.get_create_scn(),
1106true, /*create with palf*/
1107palf_base_info, OB_INVALID_TENANT_ID/*source_tenant_id*/))) {
1108LOG_WARN("failed to create ls with palf", KR(ret), K(info), K(tenant_schema),
1109K(palf_base_info));
1110}
1111}
1112}// end for
1113LOG_INFO("[RESTORE] wait ls created", KR(ret), K(tenant_id), K(ls_array));
1114}
1115return ret;
1116}
1117
1118int ObRestoreScheduler::finish_create_ls_(
1119const share::schema::ObTenantSchema &tenant_schema,
1120const common::ObIArray<share::ObLSAttr> &ls_attr_array)
1121{
1122int ret = OB_SUCCESS;
1123if (OB_UNLIKELY(!inited_)) {
1124ret = OB_NOT_INIT;
1125LOG_WARN("not inited", KR(ret));
1126} else if (OB_FAIL(restore_service_->check_stop())) {
1127LOG_WARN("restore scheduler stopped", KR(ret));
1128} else if (OB_UNLIKELY(!tenant_schema.is_valid())) {
1129ret = OB_INVALID_ARGUMENT;
1130LOG_WARN("invalid argument", KR(ret), K(tenant_schema));
1131} else if (OB_ISNULL(sql_proxy_)) {
1132ret = OB_ERR_UNEXPECTED;
1133LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1134} else if (OB_FAIL(ObRestoreCommonUtil::finish_create_ls(sql_proxy_, tenant_schema, ls_attr_array))) {
1135LOG_WARN("fail to finish create ls", KR(ret), K(tenant_schema), K(ls_attr_array));
1136}
1137return ret;
1138}
1139
1140int ObRestoreScheduler::restore_wait_to_consistent_scn(const share::ObPhysicalRestoreJob &job_info)
1141{
1142int ret = OB_SUCCESS;
1143TenantRestoreStatus tenant_restore_status;
1144const uint64_t tenant_id = job_info.get_tenant_id();
1145const ObTenantSchema *tenant_schema = NULL;
1146ObSchemaGetterGuard schema_guard;
1147bool is_replay_finish = false;
1148DEBUG_SYNC(BEFORE_WAIT_RESTORE_TO_CONSISTENT_SCN);
1149if (OB_UNLIKELY(!inited_)) {
1150ret = OB_NOT_INIT;
1151LOG_WARN("not inited", KR(ret));
1152} else if (OB_FAIL(check_tenant_can_restore_(tenant_id))) {
1153LOG_WARN("failed to check tenant can restore", KR(ret), K(tenant_id));
1154} else if (OB_ISNULL(sql_proxy_)) {
1155ret = OB_ERR_UNEXPECTED;
1156LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1157} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
1158OB_SYS_TENANT_ID, schema_guard))) {
1159LOG_WARN("fail to get tenant schema guard", KR(ret));
1160} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
1161LOG_WARN("fail to get tenant schema", KR(ret), K(tenant_id));
1162} else if (OB_ISNULL(tenant_schema) || !tenant_schema->is_restore()) {
1163ret = OB_ERR_UNEXPECTED;
1164LOG_WARN("tenant not exist or tenant is not in physical restore status", KR(ret),
1165KPC(tenant_schema));
1166} else if (OB_FAIL(check_all_ls_restore_to_consistent_scn_finish_(tenant_id, tenant_restore_status))) {
1167LOG_WARN("fail to check all ls restore finish", KR(ret), K(job_info));
1168} else if (tenant_restore_status.is_finish()) {
1169LOG_INFO("[RESTORE] restore wait all ls restore to consistent scn done", K(tenant_id), K(tenant_restore_status));
1170int tmp_ret = OB_SUCCESS;
1171ObMySQLTransaction trans;
1172const uint64_t exec_tenant_id = gen_meta_tenant_id(job_info.get_tenant_id());
1173if (tenant_restore_status.is_failed()) {
1174ret = OB_LS_RESTORE_FAILED;
1175LOG_INFO("[RESTORE]restore wait all ls restore to consistent scn failed", K(ret));
1176if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
1177LOG_WARN("fail to update job status", KR(ret), K(job_info));
1178}
1179} else if (OB_FAIL(check_tenant_replay_to_consistent_scn(tenant_id, job_info.get_consistent_scn(), is_replay_finish))) {
1180LOG_WARN("fail to check tenant replay to consistent scn", K(ret));
1181} else if (!is_replay_finish) {
1182} else if (FALSE_IT(DEBUG_SYNC(AFTER_WAIT_RESTORE_TO_CONSISTENT_SCN))) {
1183} else if (OB_FAIL(trans.start(sql_proxy_, exec_tenant_id))) {
1184LOG_WARN("fail to start trans", K(ret));
1185} else if (OB_FAIL(set_restore_to_target_scn_(trans, job_info, job_info.get_restore_scn()))) {
1186LOG_WARN("fail to set restore to target scn", KR(ret));
1187} else if (OB_FAIL(try_update_job_status(trans, ret, job_info))) {
1188LOG_WARN("fail to update job status", KR(ret), K(job_info));
1189}
1190if (trans.is_started()) {
1191if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
1192ret = OB_SUCC(ret) ? tmp_ret : ret;
1193LOG_WARN("fail to rollback trans", KR(ret), KR(tmp_ret));
1194}
1195}
1196}
1197return ret;
1198}
1199
1200
1201
1202int ObRestoreScheduler::check_tenant_replay_to_consistent_scn(const uint64_t tenant_id, const share::SCN &scn, bool &is_replay_finish)
1203{
1204int ret = OB_SUCCESS;
1205ObAllTenantInfo tenant_info;
1206if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, sql_proxy_, false/*no update*/, tenant_info))) {
1207LOG_WARN("failed to load tenant info", K(ret));
1208} else if (tenant_info.get_recovery_until_scn() != scn) {
1209ret = OB_INVALID_ARGUMENT;
1210LOG_WARN("unexpected recovery until scn", K(ret), K(tenant_info), K(scn));
1211} else {
1212is_replay_finish = (tenant_info.get_recovery_until_scn() <= tenant_info.get_standby_scn());
1213LOG_INFO("[RESTORE]tenant replay to consistent_scn", K(is_replay_finish));
1214}
1215return ret;
1216}
1217
1218int ObRestoreScheduler::restore_wait_ls_finish(const share::ObPhysicalRestoreJob &job_info)
1219{
1220int ret = OB_SUCCESS;
1221DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_WAIT_LS_FINISH);
1222TenantRestoreStatus tenant_restore_status;
1223const uint64_t tenant_id = job_info.get_tenant_id();
1224const ObTenantSchema *tenant_schema = NULL;
1225ObSchemaGetterGuard schema_guard;
1226
1227if (OB_UNLIKELY(!inited_)) {
1228ret = OB_NOT_INIT;
1229LOG_WARN("not inited", KR(ret));
1230} else if (OB_FAIL(check_tenant_can_restore_(tenant_id))) {
1231LOG_WARN("failed to check tenant can restore", KR(ret), K(tenant_id));
1232} else if (OB_ISNULL(sql_proxy_)) {
1233ret = OB_ERR_UNEXPECTED;
1234LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1235} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
1236OB_SYS_TENANT_ID, schema_guard))) {
1237LOG_WARN("fail to get tenant schema guard", KR(ret));
1238} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
1239LOG_WARN("fail to get tenant schema", KR(ret), K(tenant_id));
1240} else if (OB_ISNULL(tenant_schema) || !tenant_schema->is_restore_tenant_status()) {
1241ret = OB_ERR_UNEXPECTED;
1242LOG_WARN("tenant not exist or tenant is not in physical restore status", KR(ret),
1243KPC(tenant_schema));
1244} else if (OB_FAIL(check_all_ls_restore_finish_(tenant_id, tenant_restore_status))) {
1245LOG_WARN("failed to check all ls restore finish", KR(ret), K(job_info));
1246} else if (tenant_restore_status.is_finish()) {
1247LOG_INFO("[RESTORE] restore wait all ls finish done", K(tenant_id), K(tenant_restore_status));
1248int tmp_ret = OB_SUCCESS;
1249int tenant_restore_result = OB_LS_RESTORE_FAILED;
1250if (tenant_restore_status.is_success()) {
1251tenant_restore_result = OB_SUCCESS;
1252}
1253if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, tenant_restore_result, job_info))) {
1254LOG_WARN("fail to update job status", KR(ret), KR(tmp_ret), KR(tenant_restore_result), K(job_info));
1255}
1256}
1257return ret;
1258}
1259
1260int ObRestoreScheduler::check_all_ls_restore_finish_(
1261const uint64_t tenant_id,
1262TenantRestoreStatus &tenant_restore_status)
1263{
1264int ret = OB_SUCCESS;
1265if (is_sys_tenant(tenant_id) || is_meta_tenant(tenant_id)) {
1266ret = OB_INVALID_ARGUMENT;
1267LOG_WARN("invalid argument", KR(ret), K(tenant_id));
1268} else if (OB_ISNULL(sql_proxy_)) {
1269ret = OB_ERR_UNEXPECTED;
1270LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1271} else {
1272tenant_restore_status = TenantRestoreStatus::SUCCESS;
1273SMART_VAR(common::ObMySQLProxy::MySQLResult, res) {
1274ObSqlString sql;
1275common::sqlclient::ObMySQLResult *result = NULL;
1276if (OB_FAIL(sql.assign_fmt("select a.ls_id, b.restore_status from %s as a "
1277"left join %s as b on a.ls_id = b.ls_id",
1278OB_ALL_LS_STATUS_TNAME, OB_ALL_LS_META_TABLE_TNAME))) {
1279LOG_WARN("failed to assign sql", K(ret));
1280} else if (OB_FAIL(sql_proxy_->read(res, gen_meta_tenant_id(tenant_id), sql.ptr()))) {
1281LOG_WARN("execute sql failed", KR(ret), K(sql));
1282} else if (OB_ISNULL(result = res.get_result())) {
1283ret = OB_ERR_UNEXPECTED;
1284LOG_WARN("result is null", KR(ret), K(sql));
1285} else {
1286int64_t ls_id = 0;
1287share::ObLSRestoreStatus ls_restore_status;
1288int32_t restore_status = -1;
1289//TODO no ls in ls_meta
1290//if one of ls restore failed, make tenant restore failed
1291//
1292while (OB_SUCC(ret) && OB_SUCC(result->next())
1293&& !tenant_restore_status.is_failed()) {
1294EXTRACT_INT_FIELD_MYSQL(*result, "ls_id", ls_id, int64_t);
1295EXTRACT_INT_FIELD_MYSQL(*result, "restore_status", restore_status, int32_t);
1296
1297if (OB_FAIL(ret)) {
1298} else if (OB_FAIL(ls_restore_status.set_status(restore_status))) {
1299LOG_WARN("failed to set status", KR(ret), K(restore_status));
1300} else if (!ls_restore_status.is_in_restore_or_none() || ls_restore_status.is_failed()) {
1301//restore failed
1302tenant_restore_status = TenantRestoreStatus::FAILED;
1303} else if (!ls_restore_status.is_none()
1304&& tenant_restore_status.is_success()) {
1305tenant_restore_status = TenantRestoreStatus::IN_PROGRESS;
1306}
1307} // while
1308if (OB_ITER_END == ret) {
1309ret = OB_SUCCESS;
1310}
1311if (!tenant_restore_status.is_success()) {
1312LOG_INFO("check all ls restore not finish, just wait", KR(ret),
1313K(tenant_id), K(ls_id), K(tenant_restore_status));
1314}
1315}
1316}
1317}
1318return ret;
1319}
1320
1321int ObRestoreScheduler::check_all_ls_restore_to_consistent_scn_finish_(
1322const uint64_t tenant_id,
1323TenantRestoreStatus &tenant_restore_status)
1324{
1325int ret = OB_SUCCESS;
1326bool is_finished = false;
1327bool is_success = false;
1328ObPhysicalRestoreTableOperator restore_op;
1329if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id, share::OBCG_STORAGE/*group_id*/))) {
1330LOG_WARN("fail init", K(ret), K(tenant_id_));
1331} else if (OB_FAIL(restore_op.check_finish_restore_to_consistent_scn(is_finished, is_success))) {
1332LOG_WARN("fail to check finish restore to consistent_scn", K(ret), K(tenant_id));
1333} else if (!is_finished) {
1334tenant_restore_status = TenantRestoreStatus::IN_PROGRESS;
1335} else if (is_success) {
1336tenant_restore_status = TenantRestoreStatus::SUCCESS;
1337} else {
1338tenant_restore_status = TenantRestoreStatus::FAILED;
1339}
1340
1341if (OB_FAIL(ret)) {
1342} else if (!tenant_restore_status.is_success()) {
1343LOG_INFO("check all ls restore to consistent_scn not finish, just wait", KR(ret),
1344K(tenant_id), K(tenant_restore_status));
1345}
1346
1347return ret;
1348}
1349
1350int ObRestoreScheduler::restore_wait_tenant_finish(const share::ObPhysicalRestoreJob &job_info)
1351{
1352int ret = OB_SUCCESS;
1353DEBUG_SYNC(BEFORE_WAIT_RESTORE_TENANT_FINISH);
1354//read tenant restore status from __all_restore_job_history
1355ObPhysicalRestoreTableOperator restore_op;
1356ObPhysicalRestoreJob tenant_job;
1357ObHisRestoreJobPersistInfo user_job_history;
1358if (OB_UNLIKELY(!inited_)) {
1359ret = OB_NOT_INIT;
1360LOG_WARN("not inited", KR(ret));
1361} else if (OB_FAIL(restore_service_->check_stop())) {
1362LOG_WARN("restore scheduler stopped", KR(ret));
1363} else if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_, share::OBCG_STORAGE /*group_id*/))) {
1364LOG_WARN("fail init", K(ret), K(tenant_id_));
1365} else if (OB_FAIL(ObRestoreUtil::get_user_restore_job_history(
1366*sql_proxy_, job_info.get_tenant_id(),
1367job_info.get_restore_key().tenant_id_, job_info.get_job_id(),
1368user_job_history))) {
1369LOG_WARN("failed to get user restore job", KR(ret), K(job_info));
1370} else if (user_job_history.is_restore_success()) {
1371const int64_t tenant_id = job_info.get_tenant_id();
1372ObSchemaGetterGuard schema_guard;
1373const ObTenantSchema *tenant_schema = NULL;
1374
1375if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
1376LOG_WARN("fail to get tenant schema guard", K(ret), K(tenant_id));
1377} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
1378LOG_WARN("fail to get tenant schema", K(ret), K(tenant_id));
1379} else if (OB_ISNULL(tenant_schema)) {
1380ret = OB_ERR_UNEXPECTED;
1381LOG_WARN("tenant schema is null", K(ret), K(tenant_id));
1382} else if (tenant_schema->is_restore_tenant_status() || tenant_schema->is_normal()) {
1383if (tenant_schema->is_restore_tenant_status()) {
1384const int64_t DEFAULT_TIMEOUT = GCONF.internal_sql_execute_timeout;
1385// try finish restore status
1386obrpc::ObCreateTenantEndArg arg;
1387arg.tenant_id_ = tenant_id;
1388arg.exec_tenant_id_ = OB_SYS_TENANT_ID;
1389if (OB_FAIL(restore_service_->check_stop())) {
1390LOG_WARN("restore scheduler stopped", K(ret));
1391} else if (OB_FAIL(rpc_proxy_->timeout(DEFAULT_TIMEOUT)
1392.create_tenant_end(arg))) {
1393LOG_WARN("fail to create tenant end", K(ret), K(arg), K(DEFAULT_TIMEOUT));
1394}
1395}
1396if (OB_SUCC(ret)) {
1397int tmp_ret = OB_SUCCESS;
1398if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
1399LOG_WARN("fail to update job status", K(ret), K(tmp_ret),
1400K(job_info));
1401}
1402}
1403} else {
1404ret = OB_STATE_NOT_MATCH;
1405LOG_WARN("tenant status not match", K(ret), KPC(tenant_schema));
1406}
1407} else {
1408//restore failed
1409int tmp_ret = OB_SUCCESS;
1410ret = OB_ERROR;
1411if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
1412LOG_WARN("fail to update job status", K(ret), K(tmp_ret),
1413K(job_info));
1414}
1415}
1416
1417return ret;
1418}
1419
1420int ObRestoreScheduler::reset_schema_status(const uint64_t tenant_id, common::ObMySQLProxy *sql_proxy)
1421{
1422int ret = OB_SUCCESS;
1423if (OB_UNLIKELY(!is_user_tenant(tenant_id))) {
1424ret = OB_INVALID_ARGUMENT;
1425LOG_WARN("invalid argument", KR(ret), K(tenant_id));
1426} else if (OB_ISNULL(sql_proxy)) {
1427ret = OB_ERR_UNEXPECTED;
1428LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy));
1429} else {
1430ObSchemaStatusProxy proxy(*sql_proxy);
1431ObRefreshSchemaStatus schema_status(tenant_id, OB_INVALID_TIMESTAMP, OB_INVALID_VERSION);
1432if (OB_FAIL(proxy.init())) {
1433LOG_WARN("failed to init schema proxy", KR(ret));
1434} else if (OB_FAIL(proxy.set_tenant_schema_status(schema_status))) {
1435LOG_WARN("failed to update schema status", KR(ret), K(schema_status));
1436}
1437}
1438return ret;
1439}
1440
1441int ObRestoreScheduler::may_update_restore_concurrency_(const uint64_t new_tenant_id, const share::ObPhysicalRestoreJob &job_info)
1442{
1443int ret = OB_SUCCESS;
1444double cpu_count = 0;
1445int64_t ha_high_thread_score = 0;
1446omt::ObTenantConfigGuard tenant_config(TENANT_CONF(new_tenant_id));
1447// restore concurrency controls the number of threads used by restore dag.
1448// if cpu number is less than 10, use the default value.
1449// if cpu number is between 10 ~ 100, let concurrency equals to the cpu number.
1450// if cpu number is exceed 100, let concurrency equals to 100.
1451const int64_t LOW_CPU_LIMIT = 10;
1452const int64_t MAX_CPU_LIMIT = 100;
1453if (!job_info.is_valid()) {
1454ret = OB_INVALID_ARGUMENT;
1455LOG_WARN("get invalid args", K(ret), K(job_info));
1456} else if (tenant_config.is_valid() && OB_FALSE_IT(ha_high_thread_score = tenant_config->ha_high_thread_score)) {
1457} else if (0 != ha_high_thread_score) {
1458LOG_INFO("ha high thread score has been set", K(ha_high_thread_score));
1459} else if (OB_FAIL(ObRestoreUtil::get_restore_tenant_cpu_count(*sql_proxy_, new_tenant_id, cpu_count))) {
1460LOG_WARN("failed to get restore tenant cpu count", K(ret), K(new_tenant_id));
1461} else {
1462int64_t concurrency = job_info.get_concurrency();
1463if (LOW_CPU_LIMIT < cpu_count && MAX_CPU_LIMIT >= cpu_count) {
1464concurrency = std::max(static_cast<int64_t>(cpu_count), concurrency);
1465} else if (MAX_CPU_LIMIT < cpu_count) {
1466concurrency = MAX_CPU_LIMIT;
1467}
1468if (OB_FAIL(update_restore_concurrency_(job_info.get_tenant_name(), new_tenant_id, concurrency))) {
1469LOG_WARN("failed to update restore concurrency", K(ret), K(job_info));
1470}
1471}
1472return ret;
1473}
1474
1475int ObRestoreScheduler::reset_restore_concurrency_(const uint64_t new_tenant_id, const share::ObPhysicalRestoreJob &job_info)
1476{
1477int ret = OB_SUCCESS;
1478const int64_t concurrency = 0;
1479const ObString &tenant_name = job_info.get_tenant_name();
1480if (!job_info.is_valid()) {
1481ret = OB_INVALID_ARGUMENT;
1482LOG_WARN("get invalid args", K(ret), K(job_info));
1483} else if (OB_FAIL(update_restore_concurrency_(tenant_name, new_tenant_id, concurrency))) {
1484LOG_WARN("failed to update restore concurrency", K(ret), K(job_info));
1485}
1486return ret;
1487}
1488
1489int ObRestoreScheduler::update_restore_concurrency_(const common::ObString &tenant_name,
1490const uint64_t tenant_id, const int64_t concurrency)
1491{
1492int ret = OB_SUCCESS;
1493ObSqlString sql;
1494int64_t affected_rows = 0;
1495if (OB_ISNULL(sql_proxy_)) {
1496ret = OB_ERR_UNEXPECTED;
1497LOG_WARN("sql proxy is null", K(ret));
1498} else if (OB_FAIL(sql.append_fmt(
1499"ALTER SYSTEM SET ha_high_thread_score = %ld TENANT = '%.*s'",
1500concurrency, tenant_name.length(), tenant_name.ptr()))) {
1501LOG_WARN("failed to append fmt", K(ret), K(tenant_name));
1502} else if (OB_FAIL(sql_proxy_->write(sql.ptr(), affected_rows))) {
1503LOG_WARN("failed to write sql", K(ret), K(sql));
1504} else {
1505LOG_INFO("update restore concurrency", K(tenant_name), K(concurrency), K(sql));
1506}
1507return ret;
1508}
1509
1510} // end namespace rootserver
1511} // end namespace oceanbase
1512