oceanbase

Форк
0
/
ob_restore_scheduler.cpp 
1511 строк · 60.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS_RESTORE
14

15
#include "ob_restore_scheduler.h"
16
#include "rootserver/ob_ddl_service.h"
17
#include "rootserver/ob_rs_async_rpc_proxy.h"
18
#include "rootserver/ob_rs_event_history_table_operator.h"
19
#include "rootserver/ob_unit_manager.h"//convert_pool_name_lis
20
#include "rootserver/ob_ls_service_helper.h"//create_new_ls_in_trans
21
#include "rootserver/ob_common_ls_service.h"//do_create_user_ls
22
#include "rootserver/ob_tenant_role_transition_service.h"
23
#include "share/ob_schema_status_proxy.h"
24
#include "share/schema/ob_schema_utils.h"
25
#include "share/schema/ob_schema_mgr.h"
26
#include "share/ob_upgrade_utils.h"
27
#include "lib/mysqlclient/ob_mysql_transaction.h" //ObMySQLTransaction
28
#include "share/ls/ob_ls_status_operator.h" //ObLSStatusOperator
29
#include "share/ls/ob_ls_operator.h"//ObLSAttr
30
#include "storage/backup/ob_backup_data_store.h"//ObBackupDataLSAttrDesc
31
#include "share/restore/ob_physical_restore_info.h"//ObPhysicalRestoreInfo
32
#include "share/restore/ob_physical_restore_table_operator.h"//ObPhysicalRestoreTableOperator
33
#include "share/ob_tenant_info_proxy.h"//ObAllTenantInfo
34
#include "share/restore/ob_log_restore_source_mgr.h"
35
#include "share/ls/ob_ls_recovery_stat_operator.h"//ObLSRecoveryStatOperator
36
#include "share/ob_rpc_struct.h"
37
#include "share/ob_primary_standby_service.h"
38
#include "logservice/palf/log_define.h"//scn
39
#include "share/scn.h"
40
#include "ob_restore_service.h"
41
#ifdef OB_BUILD_TDE_SECURITY
42
#include "share/ob_master_key_getter.h"
43
#endif
44

45
namespace oceanbase
46
{
47
namespace rootserver
48
{
49
using namespace common;
50
using namespace share;
51
using namespace share::schema;
52
using namespace obrpc;
53
using namespace palf;
54

55
ObRestoreScheduler::ObRestoreScheduler()
56
  : inited_(false), schema_service_(NULL),
57
    sql_proxy_(NULL), rpc_proxy_(NULL),
58
    srv_rpc_proxy_(NULL), lst_operator_(NULL),
59
    restore_service_(nullptr), self_addr_(),
60
    tenant_id_(OB_INVALID_TENANT_ID)
61
{
62
}
63

64
ObRestoreScheduler::~ObRestoreScheduler()
65
{
66
}
67

68
int ObRestoreScheduler::init(ObRestoreService &restore_service)
69
{
70
  int ret = OB_SUCCESS;
71
  if (inited_) {
72
    ret = OB_INIT_TWICE;
73
    LOG_WARN("init twice", KR(ret));
74
  } else if (OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(GCTX.sql_proxy_)
75
      || OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.srv_rpc_proxy_)
76
      || OB_ISNULL(GCTX.lst_operator_)) {
77
    ret = OB_INVALID_ARGUMENT;
78
    LOG_WARN("invalid argument", KR(ret), KP(GCTX.schema_service_), KP(GCTX.sql_proxy_),
79
        KP(GCTX.rs_rpc_proxy_), KP(GCTX.srv_rpc_proxy_), KP(GCTX.lst_operator_));
80
  } else {
81
    schema_service_ = GCTX.schema_service_;
82
    sql_proxy_ = GCTX.sql_proxy_;
83
    rpc_proxy_ = GCTX.rs_rpc_proxy_;
84
    srv_rpc_proxy_ = GCTX.srv_rpc_proxy_;
85
    lst_operator_ = GCTX.lst_operator_;
86
    restore_service_ = &restore_service;
87
    tenant_id_ = is_sys_tenant(MTL_ID()) ? MTL_ID() : gen_user_tenant_id(MTL_ID());
88
    self_addr_ = GCTX.self_addr();
89
    inited_ = true;
90
  }
91
  return ret;
92
}
93
void ObRestoreScheduler::do_work()
94
{
95
  LOG_INFO("[RESTORE] restore scheduler start");
96
  int ret = OB_SUCCESS;
97
  if (!inited_) {
98
    ret = OB_NOT_INIT;
99
    LOG_WARN("not inited", K(ret));
100
  } else {
101
    ObCurTraceId::init(GCTX.self_addr());
102
    LOG_INFO("[RESTORE] try process restore job");
103
    ObArray<ObPhysicalRestoreJob> job_infos;
104
    ObPhysicalRestoreTableOperator restore_op;
105
    if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_, share::OBCG_STORAGE /*group_id*/))) {
106
      LOG_WARN("fail init", K(ret), K(tenant_id_));
107
    } else if (OB_FAIL(restore_op.get_jobs(job_infos))) {
108
      LOG_WARN("fail to get jobs", KR(ret), K(tenant_id_));
109
    } else {
110
      FOREACH_CNT_X(job_info, job_infos, !restore_service_->has_set_stop()) { // ignore ret
111
        if (OB_ISNULL(job_info)) {
112
          ret = OB_ERR_UNEXPECTED;
113
          LOG_WARN("job info is null", K(ret));
114
        } else if (is_sys_tenant(tenant_id_)) {
115
          if (OB_FAIL(process_sys_restore_job(*job_info))) {
116
            LOG_WARN("failed to process sys restore job", KR(ret), KPC(job_info));
117
          }
118
        } else if (OB_FAIL(process_restore_job(*job_info))) {
119
          LOG_WARN("fail to process restore job", K(ret), KPC(job_info));
120
        }
121
      }
122
    }
123
    ret = OB_SUCCESS;
124
    restore_service_->idle();
125
  }
126
  LOG_INFO("[RESTORE] restore scheduler quit");
127
  return;
128
}
129

130
int ObRestoreScheduler::process_sys_restore_job(const ObPhysicalRestoreJob &job)
131
{
132
  int ret = OB_SUCCESS;
133
  if (!inited_) {
134
    ret = OB_NOT_INIT;
135
    LOG_WARN("not inited", K(ret));
136
  } else if (OB_UNLIKELY(!is_sys_tenant(MTL_ID()))) {
137
    ret = OB_ERR_UNEXPECTED;
138
    LOG_WARN("not sys tenant", KR(ret));
139
  } else {
140
    switch (job.get_status()) {
141
      case PHYSICAL_RESTORE_CREATE_TENANT:
142
        ret = restore_tenant(job);
143
        break;
144
      case PHYSICAL_RESTORE_WAIT_TENANT_RESTORE_FINISH:
145
        ret = restore_wait_tenant_finish(job);
146
        break;
147
      case PHYSICAL_RESTORE_SUCCESS:
148
        ret = tenant_restore_finish(job);
149
        break;
150
      case PHYSICAL_RESTORE_FAIL:
151
        ret = tenant_restore_finish(job);
152
        break;
153
      default:
154
        ret = OB_ERR_UNEXPECTED;
155
        LOG_WARN("status not match", K(ret), K(job));
156
        break;
157
    }
158
    if (PHYSICAL_RESTORE_FAIL != job.get_status()) {
159
      int tmp_ret = OB_SUCCESS;
160
      if (OB_SUCCESS != (tmp_ret = try_recycle_job(job))) {
161
        LOG_WARN("fail to recycle job", K(tmp_ret), K(job));
162
      }
163
    }
164
    LOG_INFO("[RESTORE] doing restore", K(ret), K(job));
165
  }
166
  return ret;
167
}
168

169

170
int ObRestoreScheduler::process_restore_job(const ObPhysicalRestoreJob &job)
171
{
172
  int ret = OB_SUCCESS;
173
  if (!inited_) {
174
    ret = OB_NOT_INIT;
175
    LOG_WARN("not inited", K(ret));
176
  } else if (OB_UNLIKELY(is_sys_tenant(MTL_ID()))) {
177
    ret = OB_ERR_UNEXPECTED;
178
    LOG_WARN("not sys tenant", KR(ret));
179
  } else {
180
    switch (job.get_status()) {
181
      case PHYSICAL_RESTORE_PRE:
182
        ret = restore_pre(job);
183
        break;
184
      case PHYSICAL_RESTORE_CREATE_INIT_LS:
185
        ret = restore_init_ls(job);
186
        break;
187
      case PHYSICAL_RESTORE_WAIT_CONSISTENT_SCN:
188
        ret = restore_wait_to_consistent_scn(job);
189
        break;
190
      case PHYSICAL_RESTORE_WAIT_LS:
191
        ret = restore_wait_ls_finish(job);
192
        break;
193
      case PHYSICAL_RESTORE_POST_CHECK:
194
        ret = post_check(job);
195
        break;
196
      case PHYSICAL_RESTORE_UPGRADE:
197
        ret = restore_upgrade(job);
198
        break;
199
      case PHYSICAL_RESTORE_SUCCESS:
200
        ret = restore_finish(job);
201
        break;
202
      case PHYSICAL_RESTORE_FAIL:
203
        ret = restore_finish(job);
204
        break;
205
      default:
206
        ret = OB_ERR_UNEXPECTED;
207
        LOG_WARN("status not match", K(ret), K(job));
208
        break;
209
    }
210
    //TODO, table restore
211
    LOG_INFO("[RESTORE] doing restore", K(ret), K(job));
212
  }
213
  return ret;
214
}
215

216
// restore_tenant is not reentrant
217
int ObRestoreScheduler::restore_tenant(const ObPhysicalRestoreJob &job_info)
218
{
219
  int ret = OB_SUCCESS;
220
  ObCreateTenantArg arg;
221
  //the pool list of job_info is obstring without '\0'
222
  ObSqlString pool_list;
223
  UInt64 tenant_id = OB_INVALID_TENANT_ID;
224
  DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_TENANT);
225
  int64_t timeout =  GCONF._ob_ddl_timeout;
226
  if (!inited_) {
227
    ret = OB_NOT_INIT;
228
    LOG_WARN("not inited", K(ret));
229
  } else if (OB_FAIL(restore_service_->check_stop())) {
230
    LOG_WARN("restore scheduler stopped", K(ret));
231
  } else if (OB_INVALID_TENANT_ID != job_info.get_tenant_id()) {
232
    // restore_tenant can only be executed once.
233
    // only update job status 
234
  } else if (OB_FAIL(pool_list.assign(job_info.get_pool_list()))) {
235
    LOG_WARN("failed to assign pool list", KR(ret), K(job_info));
236
  } else if (OB_FAIL(fill_create_tenant_arg(job_info, pool_list, arg))) {
237
    LOG_WARN("fail to fill create tenant arg", K(ret), K(pool_list), K(job_info));
238
  } else if (OB_FAIL(rpc_proxy_->timeout(timeout).create_tenant(arg, tenant_id))) {
239
    LOG_WARN("fail to create tenant", K(ret), K(arg));
240
  } else {
241
    ObPhysicalRestoreTableOperator restore_op;
242
    const int64_t job_id = job_info.get_job_id();
243
    const uint64_t new_tenant_id = tenant_id;
244
    if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_, share::OBCG_STORAGE /*group_id*/))) {
245
      LOG_WARN("fail init", K(ret), K(tenant_id_));
246
    } else if (OB_FAIL(restore_op.update_restore_option(
247
            job_id, "tenant_id", new_tenant_id))) {
248
      LOG_WARN("update restore option", K(ret), K(new_tenant_id), K(job_id), K(tenant_id_));
249
    } else if (OB_FAIL(may_update_restore_concurrency_(new_tenant_id, job_info))) {
250
      LOG_WARN("failed to update restore concurrency", K(ret), K(new_tenant_id), K(job_info));
251
    } else {
252
      restore_service_->wakeup();
253
    }
254
  }
255
  int tmp_ret = OB_SUCCESS;
256
  if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
257
    LOG_WARN("fail to update job status", K(ret), K(tmp_ret), K(job_info));
258
  }
259
  LOG_INFO("[RESTORE] restore tenant", K(ret), K(arg), K(job_info));
260
  return ret;
261
}
262

263
int ObRestoreScheduler::fill_create_tenant_arg(
264
    const ObPhysicalRestoreJob &job,
265
    const ObSqlString &pool_list,
266
    ObCreateTenantArg &arg)
267
{
268
  int ret = OB_SUCCESS;
269
  ObSchemaGetterGuard schema_guard;
270
  if (!inited_) {
271
    ret = OB_NOT_INIT;
272
    LOG_WARN("not inited", K(ret));
273
  } else if (OB_FAIL(restore_service_->check_stop())) {
274
    LOG_WARN("restore scheduler stopped", K(ret));
275
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
276
                     OB_SYS_TENANT_ID, schema_guard))) {
277
    LOG_WARN("fail to get tenant schema guard", K(ret));
278
  } else if(lib::Worker::CompatMode::ORACLE != job.get_compat_mode() && lib::Worker::CompatMode::MYSQL != job.get_compat_mode()) {
279
    ret = OB_INVALID_ARGUMENT;
280
    LOG_WARN("invalid compat mode", K(ret));
281
  } else {
282
    /*
283
     * restore_tenant will only run trans one when create tenant.
284
     * Consider the following tenant options:
285
     * 1) need backup: tenant_name,compatibility_mode
286
     * 2) need backup and replace(maybe): zone_list,primary_zone,locality,previous_locality
287
     * 3) not backup yet:locked,default_tablegroup_id,info  TODO: (yanmu.ztl)
288
     * 4) no need to backup:drop_tenant_time,status,collation_type
289
     * 6) abandoned: replica_num,read_only,rewrite_merge_version,logonly_replica_num,
290
     *                storage_format_version,storage_format_work_version
291
     */
292
     ObCompatibilityMode mode = lib::Worker::CompatMode::ORACLE == job.get_compat_mode() ?
293
                                ObCompatibilityMode::ORACLE_MODE :
294
                                ObCompatibilityMode::MYSQL_MODE;
295
     arg.exec_tenant_id_ = OB_SYS_TENANT_ID;
296
     arg.tenant_schema_.set_tenant_name(job.get_tenant_name());
297
     arg.tenant_schema_.set_compatibility_mode(mode);
298
     arg.if_not_exist_ = false;
299
     arg.is_restore_ = true;
300
     //  create tmp tenant for recover table
301
     arg.is_tmp_tenant_for_recover_ = job.get_recover_table();
302
     // Physical restore is devided into 2 stages. Recover to 'consistent_scn' which was recorded during
303
     // data backup first, then to user specified scn.
304
     arg.recovery_until_scn_ = job.get_consistent_scn();
305
     arg.compatible_version_ = job.get_source_data_version();
306
     if (OB_FAIL(assign_pool_list(pool_list.ptr(), arg.pool_list_))) {
307
       LOG_WARN("fail to get pool list", K(ret), K(pool_list));
308
     }
309

310
     if (OB_SUCC(ret)) {
311
       ObTenantSchema &tenant_schema = arg.tenant_schema_;
312
       const ObString& locality_str = job.get_locality();
313
       const ObString &primary_zone = job.get_primary_zone();
314
       if (!primary_zone.empty()) {
315
         // specific primary_zone
316
         tenant_schema.set_primary_zone(primary_zone);
317
       }
318
       if (!locality_str.empty()) {
319
         tenant_schema.set_locality(locality_str);
320
       }
321
     }
322
     if (FAILEDx(ObRestoreUtil::get_restore_ls_palf_base_info(job, SYS_LS, arg.palf_base_info_))) {
323
       LOG_WARN("failed to get sys ls palf base info", KR(ret), K(job));
324
     }
325
  }
326
  return ret;
327
}
328

329
int ObRestoreScheduler::assign_pool_list(
330
    const char *str,
331
    common::ObIArray<ObString> &pool_list)
332
{
333
  int ret = OB_SUCCESS;
334
  char *item_str = NULL;
335
  char *save_ptr = NULL;
336
  while (OB_SUCC(ret)) {
337
    item_str = strtok_r((NULL == item_str ? const_cast<char *>(str) : NULL), ",", &save_ptr);
338
    if (NULL != item_str) {
339
      ObString pool(item_str);
340
      if (OB_FAIL(pool_list.push_back(pool))) {
341
        LOG_WARN("push_back failed", K(ret), K(pool));
342
      }
343
    } else {
344
      break;
345
    }
346
  }
347
  return ret;
348
}
349

350
int ObRestoreScheduler::check_locality_valid(
351
    const share::schema::ZoneLocalityIArray &locality)
352
{
353
  int ret = OB_SUCCESS;
354
  int64_t cnt = locality.count();
355
  if (cnt <= 0) {
356
    ret = OB_INVALID_ARGUMENT;
357
    LOG_WARN("invalid cnt", KR(ret), K(cnt));
358
  } else {
359
    for (int64_t i = 0; OB_SUCC(ret) && i < cnt; i++) {
360
      const share::ObZoneReplicaAttrSet &attr = locality.at(i);
361
      if (attr.is_specific_readonly_replica()
362
          || attr.is_allserver_readonly_replica()
363
          || attr.get_encryption_logonly_replica_num() > 0) {
364
        ret = OB_NOT_SUPPORTED;
365
        LOG_WARN("locality with readonly/encrytion_logonly replica is not supported",
366
                 KR(ret), K(locality));
367
      } else if (attr.is_mixed_locality()) {
368
        ret = OB_NOT_SUPPORTED;
369
        LOG_WARN("mixed locality is not supported", KR(ret), K(locality));
370
      } else if (attr.is_specific_replica_attr()) {
371
        ret = OB_NOT_SUPPORTED;
372
        LOG_WARN("locality with memstore_percent is not supported", KR(ret), K(locality));
373
      }
374
    }
375
  }
376
  return ret;
377
}
378

379

380
int ObRestoreScheduler::check_tenant_can_restore_(const uint64_t tenant_id)
381
{
382
  int ret = OB_SUCCESS;
383
  if (OB_UNLIKELY(!inited_)) {
384
    ret = OB_NOT_INIT;
385
    LOG_WARN("not inited", KR(ret));
386
  } else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
387
    ret = OB_INVALID_ARGUMENT;
388
    LOG_WARN("tenant id invalid", KR(ret), K(tenant_id));
389
  } else if (OB_FAIL(restore_service_->check_stop())) {
390
    LOG_WARN("restore scheduler stopped", KR(ret));
391
  } else if (GCONF.in_upgrade_mode()) {
392
    // 2. check in upgrade mode
393
    ret = OB_OP_NOT_ALLOW;
394
    LOG_WARN("[RESTORE] cluster is upgrading, try recycle job",
395
             KR(ret), K(tenant_id));
396
  }
397
  return ret;
398

399
}
400

401
//restore pre :modify parameters
402
int ObRestoreScheduler::restore_pre(const ObPhysicalRestoreJob &job_info)
403
{
404
  int ret = OB_SUCCESS;
405
  if (!inited_) {
406
    ret = OB_NOT_INIT;
407
    LOG_WARN("not inited", K(ret));
408
  } else if (OB_INVALID_TENANT_ID == tenant_id_
409
             || OB_SYS_TENANT_ID == tenant_id_) {
410
    ret = OB_INVALID_ARGUMENT;
411
    LOG_WARN("invalid tenant id", K(ret), K(tenant_id_));
412
  } else if (OB_FAIL(restore_service_->check_stop())) {
413
    LOG_WARN("restore scheduler stopped", K(ret));
414
  } else if (OB_FAIL(restore_root_key(job_info))) {
415
    LOG_WARN("fail to restore root key", K(ret));
416
  } else if (OB_FAIL(restore_keystore(job_info))) {
417
    LOG_WARN("fail to restore keystore", K(ret), K(job_info));
418
  } else {
419
    if (OB_FAIL(fill_restore_statistics(job_info))) {
420
      LOG_WARN("fail to fill restore statistics", K(ret), K(job_info));
421
    }
422
    int tmp_ret = OB_SUCCESS;
423
    if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
424
      LOG_WARN("fail to update job status", K(ret), K(tmp_ret), K(job_info));
425
    }
426
  }
427
  LOG_INFO("[RESTORE] restore pre", K(ret), K(job_info));
428
  return ret;
429
}
430

431
int ObRestoreScheduler::fill_restore_statistics(const share::ObPhysicalRestoreJob &job_info)
432
{
433
  int ret = OB_SUCCESS;
434
  ObRestoreProgressPersistInfo restore_progress_info;
435
  restore_progress_info.key_.job_id_ = job_info.get_job_id();
436
  restore_progress_info.key_.tenant_id_ = job_info.get_tenant_id();
437
  restore_progress_info.restore_scn_ = job_info.get_restore_scn();
438
  int64_t idx = job_info.get_multi_restore_path_list().get_backup_set_path_list().count() - 1;
439
  ObBackupDataLSAttrDesc ls_info;
440
  if (idx < 0) {
441
    ret = OB_ERR_UNEXPECTED;
442
    LOG_WARN("invalid job info", K(ret), K(idx), K(job_info));
443
  } else {
444
    storage::ObBackupDataStore store;
445
    storage::ObExternBackupSetInfoDesc backup_set_info;
446
    const share::ObBackupSetPath &backup_set_path = job_info.get_multi_restore_path_list().get_backup_set_path_list().at(idx);
447
    if (OB_FAIL(store.init(backup_set_path.ptr()))) {
448
      LOG_WARN("fail to init backup data store", K(backup_set_path));
449
    } else if (OB_FAIL(store.read_backup_set_info(backup_set_info))) {
450
      LOG_WARN("fail to read backup set info", K(ret));
451
    } else if (OB_FAIL(store.read_ls_attr_info(backup_set_info.backup_set_file_.meta_turn_id_, ls_info))) {
452
      LOG_WARN("fail to read ls attr info", K(ret));
453
    } else {
454
      restore_progress_info.ls_count_ = ls_info.ls_attr_array_.count();
455
      restore_progress_info.tablet_count_ = backup_set_info.backup_set_file_.stats_.finish_tablet_count_;
456
      restore_progress_info.total_bytes_ = backup_set_info.backup_set_file_.stats_.output_bytes_;
457
    }
458
  }
459
  if (OB_SUCC(ret)) {
460
    share::ObRestorePersistHelper helper;
461
    if (OB_FAIL(helper.init(job_info.get_tenant_id(), share::OBCG_STORAGE /*group_id*/))) {
462
      LOG_WARN("fail to init heler", K(ret));
463
    } else if (OB_FAIL(helper.insert_initial_restore_progress(*sql_proxy_, restore_progress_info))) {
464
      LOG_WARN("fail to insert initail ls restore progress", K(ret));
465
    }
466
  }
467
  return ret;
468
}
469

470
int ObRestoreScheduler::convert_tde_parameters(
471
    const ObPhysicalRestoreJob &job_info)
472
{
473
  int ret = OB_SUCCESS;
474
#ifdef OB_BUILD_TDE_SECURITY
475
  uint64_t tenant_id = tenant_id_;
476
  if (!inited_) {
477
    ret = OB_NOT_INIT;
478
    LOG_WARN("not inited", K(ret));
479
  } else if (OB_INVALID_TENANT_ID == tenant_id
480
             || OB_SYS_TENANT_ID == tenant_id) {
481
    ret = OB_INVALID_ARGUMENT;
482
    LOG_WARN("invalid tenant id", K(ret), K(tenant_id));
483
  } else if (OB_FAIL(restore_service_->check_stop())) {
484
    LOG_WARN("restore scheduler stopped", K(ret));
485
  } else if (job_info.get_kms_dest().empty()) {
486
    // do nothing
487
  } else {
488
    ObArenaAllocator allocator;
489
    int64_t affected_row = 0;
490
    ObString tde_method;
491
    ObString kms_info;
492
    ObSqlString sql;
493
    // set tde_method
494
    if (OB_FAIL(ObMasterKeyUtil::restore_encrypt_params(allocator, job_info.get_kms_dest(),
495
                                          job_info.get_kms_encrypt_key(), tde_method, kms_info))) {
496
      LOG_WARN("failed to restore encrypt params", K(ret));
497
    } else if (OB_UNLIKELY(tde_method.empty())) {
498
      ret = OB_INVALID_ARGUMENT;
499
      LOG_WARN("tde_method is empty", K(ret));
500
    } else if (!job_info.get_kms_info().empty()) {
501
      kms_info = job_info.get_kms_info();
502
    }
503
    if (OB_FAIL(ret)) {
504
    } else if (!ObTdeMethodUtil::is_valid(tde_method)) {
505
      // do nothing
506
    } else if (OB_FAIL(ObRestoreCommonUtil::set_tde_parameters(sql_proxy_, rpc_proxy_,
507
                                    tenant_id, tde_method, kms_info))) {
508
      LOG_WARN("failed to set_tde_parameters", KR(ret), K(tenant_id), K(tde_method));
509
    }
510
  }
511
#endif
512
  return ret;
513
}
514

515
int ObRestoreScheduler::restore_root_key(const share::ObPhysicalRestoreJob &job_info)
516
{
517
  int ret = OB_SUCCESS;
518
#ifdef OB_BUILD_TDE_SECURITY
519
  int64_t idx = job_info.get_multi_restore_path_list().get_backup_set_path_list().count() - 1;
520
  if (idx < 0) {
521
    ret = OB_ERR_UNEXPECTED;
522
    LOG_WARN("invalid job info", K(ret), K(idx), K(job_info));
523
  } else if (OB_ISNULL(srv_rpc_proxy_) || OB_ISNULL(sql_proxy_)) {
524
    ret = OB_ERR_UNEXPECTED;
525
    LOG_WARN("unexpected null svr rpc proxy or sql proxy", K(ret));
526
  } else {
527
    storage::ObBackupDataStore store;
528
    const share::ObBackupSetPath &backup_set_path = job_info.get_multi_restore_path_list().get_backup_set_path_list().at(idx);
529
    ObRootKey root_key;
530
    if (OB_FAIL(store.init(backup_set_path.ptr()))) {
531
      LOG_WARN("fail to init backup data store", K(ret));
532
    } else if (OB_FAIL(store.read_root_key_info(tenant_id_))) {
533
      LOG_WARN("fail to read root key info", K(ret));
534
    } else if (OB_FAIL(ObMasterKeyGetter::instance().get_root_key(tenant_id_, root_key))) {
535
      LOG_WARN("fail to get root key", K(ret));
536
    } else if (obrpc::RootKeyType::INVALID == root_key.key_type_) {
537
      // do nothing
538
    } else if (OB_FAIL(ObRestoreCommonUtil::notify_root_key(srv_rpc_proxy_, sql_proxy_, tenant_id_, root_key))) {
539
      LOG_WARN("failed to notify root key", KR(ret), K(tenant_id_));
540
    }
541
  }
542
#endif
543
  return ret;
544
}
545

546
int ObRestoreScheduler::restore_keystore(const share::ObPhysicalRestoreJob &job_info)
547
{
548
  int ret = OB_SUCCESS;
549
#ifdef OB_BUILD_TDE_SECURITY
550
  const int64_t DEFAULT_TIMEOUT = 10 * 1000 * 1000L;
551
  ObUnitTableOperator unit_operator;
552
  common::ObArray<ObUnit> units;
553
  ObArray<int> return_code_array;
554
  obrpc::ObRestoreKeyArg arg;
555
  if (job_info.get_kms_dest().empty()) {
556
    // do nothing
557
  } else if (OB_ISNULL(srv_rpc_proxy_) || OB_ISNULL(sql_proxy_)) {
558
    ret = OB_ERR_UNEXPECTED;
559
    LOG_WARN("unexpected null svr rpc proxy or sql proxy", K(ret));
560
  } else if (OB_FAIL(unit_operator.init(*sql_proxy_))) {
561
    LOG_WARN("failed to init unit operator", KR(ret));
562
  } else if (OB_FAIL(unit_operator.get_units_by_tenant(tenant_id_, units))) {
563
    LOG_WARN("failed to get tenant unit", KR(ret), K_(tenant_id));
564
  } else {
565
    ObRestoreKeyProxy proxy(*srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::restore_key);
566
    arg.tenant_id_ = job_info.get_tenant_id();
567
    arg.backup_dest_ = job_info.get_kms_dest();
568
    arg.encrypt_key_ = job_info.get_kms_encrypt_key();
569
    for (int64_t i = 0; OB_SUCC(ret) && i < units.count(); i++) {
570
      const ObUnit &unit = units.at(i);
571
      if (OB_FAIL(proxy.call(unit.server_, DEFAULT_TIMEOUT, arg))) {
572
        LOG_WARN("failed to send rpc", KR(ret));
573
      }
574
    }
575

576
    int tmp_ret = OB_SUCCESS;
577
    if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
578
      LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret));
579
      ret = OB_SUCC(ret) ? tmp_ret : ret;
580
    } else if (OB_FAIL(ret)) {
581
    } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) {
582
      LOG_WARN("fail to check return cnt", KR(ret), "return_cnt", return_code_array.count());
583
    } else {
584
      for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) {
585
        ret = return_code_array.at(i);
586
        const ObAddr &addr = proxy.get_dests().at(i);
587
        if (OB_FAIL(ret)) {
588
          LOG_WARN("failed to restore key", KR(ret), K(addr));
589
        }
590
      }
591
    }
592
  }
593
#endif
594
  return ret;
595
}
596

597
int ObRestoreScheduler::post_check(const ObPhysicalRestoreJob &job_info)
598
{
599
  int ret = OB_SUCCESS;
600
  DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_POST_CHECK);
601
  bool sync_satisfied = true;
602

603
  if (!inited_) {
604
    ret = OB_NOT_INIT;
605
    LOG_WARN("not inited", K(ret));
606
  } else if (OB_INVALID_TENANT_ID == tenant_id_
607
             || OB_SYS_TENANT_ID == tenant_id_) {
608
    ret = OB_INVALID_ARGUMENT;
609
    LOG_WARN("invalid tenant id", K(ret), K(tenant_id_));
610
  } else if (OB_FAIL(restore_service_->check_stop())) {
611
    LOG_WARN("restore scheduler stopped", K(ret));
612
  } else if (OB_FAIL(ObRestoreCommonUtil::try_update_tenant_role(sql_proxy_, tenant_id_,
613
                  job_info.get_restore_scn(), false /*is_clone*/, sync_satisfied))) {
614
    LOG_WARN("failed to try update tenant role", KR(ret), K(tenant_id_), K(job_info));
615
  } else if (!sync_satisfied) {
616
    ret = OB_NEED_WAIT;
617
    LOG_WARN("tenant sync scn not equal to restore scn, need wait", KR(ret), K(job_info));
618
  }
619

620
  if (FAILEDx(ObRestoreCommonUtil::process_schema(sql_proxy_, tenant_id_))) {
621
    LOG_WARN("failed to process schema", KR(ret));
622
  }
623

624
  if (FAILEDx(convert_tde_parameters(job_info))) {
625
    LOG_WARN("fail to convert parameters", K(ret), K(job_info));
626
  }
627

628
  if (OB_SUCC(ret)) {
629
    int tmp_ret = OB_SUCCESS;
630
    if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
631
      LOG_WARN("fail to update job status", K(ret), K(tmp_ret), K(job_info));
632
    }
633
  }
634
  LOG_INFO("[RESTORE] post check", K(ret), K(job_info));
635
  return ret;
636
}
637

638
int ObRestoreScheduler::restore_finish(const ObPhysicalRestoreJob &job_info)
639
{
640
  int ret = OB_SUCCESS;
641

642
  if (!inited_) {
643
    ret = OB_NOT_INIT;
644
    LOG_WARN("not init", K(ret));
645
  } else if (OB_FAIL(restore_service_->check_stop())) {
646
    LOG_WARN("restore scheduler stopped", K(ret));
647
  } else if (OB_FAIL(ObRestoreUtil::recycle_restore_job(tenant_id_, *sql_proxy_,
648
                                                job_info))) {
649
    LOG_WARN("finish restore tasks failed", K(job_info), K(ret), K(tenant_id_));
650
  } else {
651
    LOG_INFO("[RESTORE] restore tenant success", K(ret), K(job_info));
652
  }
653
  ROOTSERVICE_EVENT_ADD("physical_restore", "restore_finish",
654
                        "restore_stauts", job_info.get_status(),
655
                        "tenant", job_info.get_tenant_name());
656
  return ret;
657
}
658

659
int ObRestoreScheduler::tenant_restore_finish(const ObPhysicalRestoreJob &job_info)
660
{
661
  int ret = OB_SUCCESS;
662
  ObHisRestoreJobPersistInfo history_info;
663
  bool restore_tenant_exist = true;
664
  if (!inited_) {
665
    ret = OB_NOT_INIT;
666
    LOG_WARN("not init", K(ret));
667
  } else if (OB_FAIL(restore_service_->check_stop())) {
668
    LOG_WARN("restore scheduler stopped", K(ret));
669
  } else if (OB_FAIL(try_get_tenant_restore_history_(job_info, history_info, restore_tenant_exist))) {
670
    LOG_WARN("failed to get user tenant restory info", KR(ret), K(job_info));
671
  } else if (restore_tenant_exist && OB_FAIL(reset_restore_concurrency_(job_info.get_tenant_id(), job_info))) {
672
    LOG_WARN("failed to reset restore concurrency", K(ret), K(job_info));
673
  } else if (share::PHYSICAL_RESTORE_SUCCESS == job_info.get_status()) {
674
    //restore success
675
  } else {
676
    int tmp_ret = OB_SUCCESS;
677
    ObRestoreFailureChecker checker;
678
    bool is_concurrent_with_clean = false;
679
    if (OB_TMP_FAIL(checker.init(job_info))) {
680
      LOG_WARN("failed to init restore failure checker", K(tmp_ret), K(job_info));
681
    } else if (OB_TMP_FAIL(checker.check_is_concurrent_with_clean(is_concurrent_with_clean))) {
682
      LOG_WARN("failed to check is clean concurrency failure", K(tmp_ret));
683
    }
684
    if (OB_SUCC(ret) && is_concurrent_with_clean) {
685
      int64_t pos = 0;
686
      if (OB_FAIL(databuff_printf(history_info.comment_.ptr(), history_info.comment_.capacity(), pos,
687
                                  "%s;", "physical restore run concurrently with backup data clean, please check backup and archive jobs"))) {
688
        if (OB_SIZE_OVERFLOW == ret) {
689
          ret = OB_SUCCESS;
690
        } else {
691
          LOG_WARN("failed to databuff printf comment", K(ret));
692
        }
693
      }
694
    }
695
  }
696

697
  if (FAILEDx(ObRestoreUtil::recycle_restore_job(*sql_proxy_,
698
                                                job_info, history_info))) {
699
    LOG_WARN("finish restore tasks failed", KR(ret), K(job_info), K(history_info), K(tenant_id_));
700
  } else {
701
    LOG_INFO("[RESTORE] restore tenant finish", K(ret), K(job_info));
702
  }
703
  ROOTSERVICE_EVENT_ADD("physical_restore", "restore_finish",
704
                        "restore_status", job_info.get_status(),
705
                        "tenant", job_info.get_tenant_name());
706
  return ret;
707
}
708

709
int ObRestoreScheduler::try_get_tenant_restore_history_(
710
    const ObPhysicalRestoreJob &job_info,
711
    ObHisRestoreJobPersistInfo &history_info,
712
    bool &restore_tenant_exist)
713
{
714
  int ret = OB_SUCCESS;
715
  restore_tenant_exist = true;
716
  ObHisRestoreJobPersistInfo user_history_info; 
717
  const uint64_t restore_tenant_id = job_info.get_tenant_id();
718
  if (!inited_) {
719
    ret = OB_NOT_INIT;
720
    LOG_WARN("not inited", KR(ret));
721
  } else if (OB_FAIL(restore_service_->check_stop())) {
722
    LOG_WARN("restore scheduler stopped", KR(ret));
723
  } else if (OB_FAIL(ObRestoreCommonUtil::check_tenant_is_existed(schema_service_,
724
                                            restore_tenant_id, restore_tenant_exist))) {
725
    LOG_WARN("fail to check tenant_is_existed", KR(ret), K(restore_tenant_id), K(job_info));
726
  }
727
  if (OB_FAIL(ret)) {
728
  } else if (!restore_tenant_exist) {
729
    if (OB_FAIL(history_info.init_with_job(job_info))) {
730
      LOG_WARN("failed to init with job", KR(ret), K(job_info));
731
    }
732
  } else if (OB_FAIL(ObRestoreUtil::get_user_restore_job_history(
733
                 *sql_proxy_, job_info.get_tenant_id(),
734
                 job_info.get_restore_key().tenant_id_, job_info.get_job_id(),
735
                 user_history_info))) {
736
    LOG_WARN("failed to get user restore job history", KR(ret), K(job_info));
737
  } else if (OB_FAIL(history_info.init_initiator_job_history(job_info, user_history_info))) {
738
    LOG_WARN("failed to init restore job history", KR(ret), K(job_info), K(user_history_info));
739
  }
740
  return ret;
741
}
742

743
/*
744
 * 1. Physical restore is not allowed when cluster is in upgrade mode or is standby.
745
 * 2. Physical restore jobs will be recycled asynchronously when restore tenant has been dropped.
746
 * 3. Physical restore jobs will be used to avoid duplicate tenant_name when tenant is creating.
747
 */
748
int ObRestoreScheduler::try_recycle_job(const ObPhysicalRestoreJob &job)
749
{
750
  int ret = OB_SUCCESS;
751
  ObSchemaGetterGuard schema_guard;
752
  bool is_dropped = false;
753
  int failed_ret = OB_SUCCESS;
754
  DEBUG_SYNC(BEFORE_RECYCLE_PHYSICAL_RESTORE_JOB);
755
  
756
  if (!inited_) {
757
    ret = OB_NOT_INIT;
758
    LOG_WARN("not inited", KR(ret));
759
  } else if (OB_FAIL(check_tenant_can_restore_(tenant_id_))) {
760
    LOG_WARN("tenant cannot restore", KR(ret), K(tenant_id_));
761
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
762
    LOG_WARN("fail to get tenant schema guard", KR(ret));
763
  } else if (OB_SUCCESS != schema_guard.check_formal_guard()) {
764
    // skip
765
  } else if (OB_INVALID_TENANT_ID == job.get_tenant_id()) {
766
    //restore tenant may be failed to create, will to restore failed
767
  } else if (OB_FAIL(schema_guard.check_if_tenant_has_been_dropped(job.get_tenant_id(), is_dropped))) {
768
    LOG_WARN("fail to get tenant id", KR(ret), K(job));
769
  } else if (!is_dropped) {
770
    // skip
771
  } else {
772
    // 3. tenant has been dropped
773
    failed_ret = OB_TENANT_NOT_EXIST;
774
    LOG_WARN("[RESTORE] tenant has been dropped, try recycle job",
775
             KR(ret), K(tenant_id_));
776
  }
777
  if (OB_SUCC(ret) && OB_SUCCESS != failed_ret) {
778
    int tmp_ret = OB_SUCCESS;
779
    if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, failed_ret, job))) {
780
      LOG_WARN("fail to update job status", KR(ret), K(tmp_ret), K(failed_ret), K(job));
781
    }
782
  }
783
  return ret;
784
}
785

786
int ObRestoreScheduler::try_update_job_status(
787
    common::ObISQLClient &sql_client,
788
    int return_ret,
789
    const ObPhysicalRestoreJob &job,
790
    share::PhysicalRestoreMod mod)
791
{
792
  int ret = OB_SUCCESS;
793
  ObPhysicalRestoreTableOperator restore_op;
794
  if (!inited_) {
795
    ret = OB_NOT_INIT;
796
    LOG_WARN("not inited", K(ret));
797
  } else if (OB_FAIL(restore_service_->check_stop())) {
798
    LOG_WARN("restore scheduler stopped", K(ret));
799
  } else if (OB_FAIL(restore_op.init(&sql_client, tenant_id_, share::OBCG_STORAGE /*group_id*/))) {
800
    LOG_WARN("fail init", K(ret), K(tenant_id_));
801
  } else {
802
    PhysicalRestoreStatus next_status = get_next_status(return_ret, job.get_status());
803
    const common::ObCurTraceId::TraceId trace_id = *ObCurTraceId::get_trace_id();
804

805
    if (PHYSICAL_RESTORE_FAIL == next_status && OB_LS_RESTORE_FAILED != return_ret
806
        && OB_FAIL(restore_op.update_job_error_info(job.get_job_id(), return_ret, mod, trace_id, self_addr_))) {
807
      // if restore failed at wait ls, observer has record error info,
808
      // rs no need to record error info again.
809
      LOG_WARN("fail to update job error info", K(ret), K(job), K(return_ret), K(mod), K(tenant_id_));
810
    } else if (OB_FAIL(restore_op.update_job_status(job.get_job_id(), next_status))) {
811
      LOG_WARN("fail update job status", K(ret), K(job), K(next_status), K(tenant_id_));
812
    } else {
813
      //can not be zero
814
      restore_service_->wakeup();
815
      LOG_INFO("[RESTORE] switch job status", K(ret), K(job), K(next_status));
816
      (void)record_rs_event(job, next_status);
817
    }
818
  }
819
  return ret;
820
}
821

822
void ObRestoreScheduler::record_rs_event(
823
  const ObPhysicalRestoreJob &job,
824
  const PhysicalRestoreStatus status)
825
{
826
  const char *status_str = ObPhysicalRestoreTableOperator::get_restore_status_str(
827
                             static_cast<PhysicalRestoreStatus>(status));
828
  ROOTSERVICE_EVENT_ADD("physical_restore", "change_restore_status",
829
                        "job_id", job.get_job_id(),
830
                        "tenant", job.get_tenant_name(),
831
                        "status", status_str);
832
}
833

834
PhysicalRestoreStatus ObRestoreScheduler::get_sys_next_status(
835
  PhysicalRestoreStatus current_status)
836
{
837
  PhysicalRestoreStatus next_status = PHYSICAL_RESTORE_MAX_STATUS;
838
  switch (current_status) {
839
    case PHYSICAL_RESTORE_CREATE_TENANT : {
840
      next_status = PHYSICAL_RESTORE_WAIT_TENANT_RESTORE_FINISH;
841
      break;
842
    }
843
    case PHYSICAL_RESTORE_WAIT_TENANT_RESTORE_FINISH : {
844
       next_status = PHYSICAL_RESTORE_SUCCESS;
845
       break;
846
    }
847
    default : {
848
      // do nothing
849
    }
850
  }
851
  return next_status;
852
}
853

854

855

856
PhysicalRestoreStatus ObRestoreScheduler::get_next_status(
857
  int return_ret,
858
  PhysicalRestoreStatus current_status)
859
{
860
  PhysicalRestoreStatus next_status = PHYSICAL_RESTORE_MAX_STATUS;
861
  if (OB_SUCCESS != return_ret) {
862
    next_status = PHYSICAL_RESTORE_FAIL;
863
  } else if (is_sys_tenant(MTL_ID())) {
864
    next_status = get_sys_next_status(current_status);
865
  } else {
866
    switch (current_status) {
867
      case PHYSICAL_RESTORE_PRE : {
868
        next_status = PHYSICAL_RESTORE_CREATE_INIT_LS;
869
        break;
870
      }
871
      case PHYSICAL_RESTORE_CREATE_INIT_LS : {
872
        next_status = PHYSICAL_RESTORE_WAIT_CONSISTENT_SCN;
873
        break;
874
      }
875
      case PHYSICAL_RESTORE_WAIT_CONSISTENT_SCN : {
876
        next_status = PHYSICAL_RESTORE_WAIT_LS;
877
        break;
878
      }
879
      case PHYSICAL_RESTORE_WAIT_LS : {
880
        next_status = PHYSICAL_RESTORE_POST_CHECK;
881
        break;
882
      }
883
      case PHYSICAL_RESTORE_POST_CHECK : {
884
        next_status = PHYSICAL_RESTORE_UPGRADE;
885
        break;
886
      }
887
      case PHYSICAL_RESTORE_UPGRADE : {
888
        next_status = PHYSICAL_RESTORE_SUCCESS;
889
        break;
890
      }
891
      default : {
892
        // do nothing
893
      }
894
    }
895
  }
896
  return next_status;
897
}
898

899
int ObRestoreScheduler::restore_upgrade(const ObPhysicalRestoreJob &job_info)
900
{
901
  int ret = OB_SUCCESS;
902
  DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_UPGRADE_PRE);
903
  if (!inited_) {
904
    ret = OB_NOT_INIT;
905
    LOG_WARN("not inited", KR(ret));
906
  } else if (OB_INVALID_TENANT_ID == tenant_id_
907
             || OB_SYS_TENANT_ID == tenant_id_) {
908
    ret = OB_INVALID_ARGUMENT;
909
    LOG_WARN("invalid tenant id", KR(ret), K(tenant_id_));
910
  } else if (OB_FAIL(restore_service_->check_stop())) {
911
    LOG_WARN("restore scheduler stopped", KR(ret));
912
  } else {
913
    if (OB_SUCC(ret)) {
914
      int tmp_ret = OB_SUCCESS;
915
      if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
916
        LOG_WARN("fail to update job status", K(ret), K(tmp_ret), K(job_info));
917
      }
918
    }
919
  }
920
  LOG_INFO("[RESTORE] upgrade pre finish", KR(ret), K(job_info));
921
  return ret;
922
}
923

924
int ObRestoreScheduler::restore_init_ls(const share::ObPhysicalRestoreJob &job_info)
925
{
926
  int ret = OB_SUCCESS;
927
  DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_INIT_LS);
928
  ObSchemaGetterGuard schema_guard;
929
  const share::schema::ObTenantSchema *tenant_schema = NULL;
930
  const common::ObSArray<share::ObBackupSetPath> &backup_set_path_array = 
931
    job_info.get_multi_restore_path_list().get_backup_set_path_list();
932
  const common::ObSArray<share::ObBackupPathString> &log_path_array = job_info.get_multi_restore_path_list().get_log_path_list();
933
  if (OB_UNLIKELY(!inited_)) {
934
    ret = OB_NOT_INIT;
935
    LOG_WARN("not inited", KR(ret));
936
  } else if (OB_FAIL(check_tenant_can_restore_(tenant_id_))) {
937
    LOG_WARN("tenant can not restore", KR(ret), K(tenant_id_));
938
  } else if (OB_ISNULL(sql_proxy_)) {
939
    ret = OB_ERR_UNEXPECTED;
940
    LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
941
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
942
          OB_SYS_TENANT_ID, schema_guard))) {
943
    LOG_WARN("fail to get tenant schema guard", KR(ret));
944
  } else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id_, tenant_schema))) {
945
    LOG_WARN("fail to get tenant schema", KR(ret), K(job_info));
946
  } else if (OB_ISNULL(tenant_schema) || !tenant_schema->is_restore_tenant_status()) {
947
    ret = OB_ERR_UNEXPECTED;
948
    LOG_WARN("tenant not exist or tenant is not in physical restore status", KR(ret),
949
        K(tenant_schema));
950
  } else if (OB_UNLIKELY(0 == backup_set_path_array.count())) {
951
    ret = OB_ERR_UNEXPECTED;
952
    LOG_WARN("backup piece path not expected", KR(ret), K(job_info));
953
  } else {
954
    const int64_t backup_path_count = backup_set_path_array.count();
955
    const ObString &backup_set_path = backup_set_path_array.at(backup_path_count - 1).ptr();
956
    storage::ObBackupDataLSAttrDesc backup_ls_attr;
957
    ObLogRestoreSourceMgr restore_source_mgr;
958
    storage::ObBackupDataStore store;
959
    if (OB_FAIL(store.init(backup_set_path.ptr()))) {
960
      LOG_WARN("fail to ini backup extern mgr", K(ret));
961
    } else if (OB_FAIL(store.read_ls_attr_info(backup_ls_attr))) {
962
      LOG_WARN("failed to read ls info", KR(ret));
963
    } else {
964
      const SCN &sync_scn = backup_ls_attr.backup_scn_;
965
      ObLSRecoveryStatOperator ls_recovery;
966
      const uint64_t exec_tenant_id = get_private_table_exec_tenant_id(tenant_id_);
967
      START_TRANSACTION(sql_proxy_, exec_tenant_id)
968
      LOG_INFO("start to create ls and set sync scn", K(sync_scn), K(backup_ls_attr), KR(ret));
969
      if (FAILEDx(ls_recovery.update_sys_ls_sync_scn(tenant_id_, trans, sync_scn))) {
970
        LOG_WARN("failed to update sync ls sync scn", KR(ret), K(sync_scn));
971
      }
972
      END_TRANSACTION(trans)
973
    }
974
    if (FAILEDx(create_all_ls_(*tenant_schema, backup_ls_attr.ls_attr_array_))) {
975
      LOG_WARN("failed to create all ls", KR(ret), K(backup_ls_attr), KPC(tenant_schema));
976
    } else if (OB_FAIL(wait_all_ls_created_(*tenant_schema, job_info))) {
977
      LOG_WARN("failed to wait all ls created", KR(ret), KPC(tenant_schema));
978
    } else if (OB_FAIL(finish_create_ls_(*tenant_schema, backup_ls_attr.ls_attr_array_))) {
979
      LOG_WARN("failed to finish create ls", KR(ret), KPC(tenant_schema));
980
    } else if (OB_FAIL(restore_source_mgr.init(tenant_id_, sql_proxy_))) {
981
      LOG_WARN("failed to init restore_source_mgr", KR(ret));
982
    } else if (1 == log_path_array.count() 
983
      && OB_FAIL(restore_source_mgr.add_location_source(job_info.get_restore_scn(), log_path_array.at(0).str()))) {
984
      LOG_WARN("failed to add log restore source", KR(ret), K(job_info), K(log_path_array));
985
    }
986
  }
987

988
#ifdef ERRSIM
989
    ret = OB_E(EventTable::EN_RESTORE_CREATE_LS_FAILED) OB_SUCCESS;
990
#endif
991

992
  TenantRestoreStatus tenant_restore_status;
993
  if (OB_FAIL(ret)) {
994
    int tmp_ret = OB_SUCCESS;
995
    if (OB_TMP_FAIL(check_all_ls_restore_to_consistent_scn_finish_(tenant_id_, tenant_restore_status))) {
996
      LOG_WARN("failed to check all ls restore to consistent scn finish", K(ret));
997
    }
998
  }
999

1000
  if (OB_SUCC(ret) || tenant_restore_status.is_failed()) {
1001
    int tmp_ret = OB_SUCCESS;
1002
    if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
1003
      tmp_ret = OB_SUCC(ret) ? tmp_ret : ret;
1004
      LOG_WARN("fail to update job status", K(ret), K(tmp_ret), K(job_info));
1005
    }
1006
  }
1007
  if (OB_FAIL(ret)) {
1008
    restore_service_->wakeup();
1009
  }
1010
  LOG_INFO("[RESTORE] create init ls", KR(ret), K(job_info));
1011

1012
  return ret;
1013
}
1014

1015
int ObRestoreScheduler::set_restore_to_target_scn_(
1016
    common::ObMySQLTransaction &trans, const share::ObPhysicalRestoreJob &job_info, const share::SCN &scn)
1017
{
1018
  int ret = OB_SUCCESS;
1019
  const uint64_t tenant_id = job_info.get_tenant_id();
1020
  ObAllTenantInfo tenant_info;
1021
  ObLSRecoveryStatOperator ls_recovery_operator;
1022
  ObLSRecoveryStat sys_ls_recovery;
1023
  ObLogRestoreSourceMgr restore_source_mgr;
1024
  if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, &trans, true/*for_update*/, tenant_info))) {
1025
    LOG_WARN("failed to load all tenant info", KR(ret), "tenant_id", job_info.get_tenant_id());
1026
  } else if (OB_FAIL(ls_recovery_operator.get_ls_recovery_stat(tenant_id, share::SYS_LS,
1027
                     true /*for_update*/, sys_ls_recovery, trans))) {
1028
    LOG_WARN("failed to get ls recovery stat", KR(ret), K(tenant_id));
1029
  } else if (scn < tenant_info.get_sync_scn() || scn < sys_ls_recovery.get_sync_scn()) {
1030
    ret = OB_INVALID_ARGUMENT;
1031
    LOG_WARN("recover before tenant sync_scn or SYS LS sync_scn is not allow", KR(ret), K(tenant_info),
1032
             K(tenant_id), K(scn), K(sys_ls_recovery));
1033
  } else if (tenant_info.get_recovery_until_scn() == scn) {
1034
    LOG_INFO("recovery_until_scn is same with original", K(tenant_info), K(tenant_id), K(scn));
1035
  } else if (OB_FAIL(restore_source_mgr.init(tenant_id, &trans))) {
1036
    LOG_WARN("failed to init restore_source_mgr", KR(ret), K(tenant_id), K(scn));
1037
  } else if (OB_FAIL(restore_source_mgr.update_recovery_until_scn(scn))) {
1038
    LOG_WARN("failed to update_recovery_until_scn", KR(ret), K(tenant_id), K(scn));
1039
  } else if (OB_FAIL(ObAllTenantInfoProxy::update_tenant_recovery_until_scn(
1040
                  tenant_id, trans, tenant_info.get_switchover_epoch(), scn))) {
1041
    LOG_WARN("failed to update_tenant_recovery_until_scn", KR(ret), K(tenant_id), K(scn));
1042
  } else {
1043
    LOG_INFO("succeed to set recover until scn", K(scn));
1044
  }
1045
  return ret;
1046
}
1047
int ObRestoreScheduler::create_all_ls_(
1048
    const share::schema::ObTenantSchema &tenant_schema,
1049
    const common::ObIArray<ObLSAttr> &ls_attr_array)
1050
{
1051
  int ret = OB_SUCCESS;
1052
  if (OB_UNLIKELY(!inited_)) {
1053
    ret = OB_NOT_INIT;
1054
    LOG_WARN("not inited", KR(ret));
1055
  } else if (OB_FAIL(restore_service_->check_stop())) {
1056
    LOG_WARN("restore scheduler stopped", KR(ret));
1057
  } else if (OB_ISNULL(sql_proxy_)) {
1058
    ret = OB_ERR_UNEXPECTED;
1059
    LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1060
  } else if (OB_FAIL(ObRestoreCommonUtil::create_all_ls(sql_proxy_, tenant_id_, tenant_schema, ls_attr_array))) {
1061
    LOG_WARN("fail to create all ls", KR(ret), K(tenant_id_), K(tenant_schema), K(ls_attr_array));
1062
  }
1063
  return ret;
1064
}
1065

1066
int ObRestoreScheduler::wait_all_ls_created_(const share::schema::ObTenantSchema &tenant_schema,
1067
      const share::ObPhysicalRestoreJob &job_info)
1068
{
1069
  int ret = OB_SUCCESS;
1070
  if (OB_UNLIKELY(!inited_)) {
1071
    ret = OB_NOT_INIT;
1072
    LOG_WARN("not inited", KR(ret));
1073
  } else if (OB_FAIL(restore_service_->check_stop())) {
1074
    LOG_WARN("restore scheduler stopped", KR(ret));
1075
  } else if (OB_UNLIKELY(!tenant_schema.is_valid())) {
1076
    ret = OB_INVALID_ARGUMENT;
1077
    LOG_WARN("invalid argument", KR(ret), K(tenant_schema));
1078
  } else if (OB_ISNULL(sql_proxy_)) {
1079
    ret = OB_ERR_UNEXPECTED;
1080
    LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1081
  } else {
1082
    const uint64_t tenant_id = tenant_schema.get_tenant_id();
1083
    ObLSStatusOperator status_op;
1084
    ObLSStatusInfoArray ls_array;
1085
    palf::PalfBaseInfo palf_base_info;
1086
    ObLSRecoveryStat recovery_stat;
1087
    ObLSRecoveryStatOperator ls_recovery_operator;
1088
    
1089
    if (OB_FAIL(status_op.get_all_ls_status_by_order(tenant_id, ls_array,
1090
                                                     *sql_proxy_))) {
1091
      LOG_WARN("failed to get all ls status", KR(ret), K(tenant_id));
1092
  }
1093
    for (int64_t i = 0; OB_SUCC(ret) && i < ls_array.count(); ++i) {
1094
      const ObLSStatusInfo &info = ls_array.at(i);
1095
      if (info.ls_is_creating()) {
1096
        recovery_stat.reset();
1097
        if (OB_FAIL(ls_recovery_operator.get_ls_recovery_stat(tenant_id, info.ls_id_,
1098
              false/*for_update*/, recovery_stat, *sql_proxy_))) {
1099
          LOG_WARN("failed to get ls recovery stat", KR(ret), K(tenant_id), K(info));
1100
        } else if (OB_FAIL(ObRestoreUtil::get_restore_ls_palf_base_info(
1101
                job_info, info.ls_id_, palf_base_info))) {
1102
          LOG_WARN("failed to get restore ls palf info", KR(ret), K(info),
1103
                   K(job_info));
1104
        } else if (OB_FAIL(ObCommonLSService::do_create_user_ls(
1105
                       tenant_schema, info, recovery_stat.get_create_scn(),
1106
                       true, /*create with palf*/
1107
                       palf_base_info, OB_INVALID_TENANT_ID/*source_tenant_id*/))) {
1108
          LOG_WARN("failed to create ls with palf", KR(ret), K(info), K(tenant_schema),
1109
                   K(palf_base_info));
1110
        }
1111
      }
1112
    }// end for
1113
    LOG_INFO("[RESTORE] wait ls created", KR(ret), K(tenant_id), K(ls_array));
1114
  }
1115
  return ret;
1116
}
1117

1118
int ObRestoreScheduler::finish_create_ls_(
1119
    const share::schema::ObTenantSchema &tenant_schema,
1120
    const common::ObIArray<share::ObLSAttr> &ls_attr_array)
1121
{
1122
  int ret = OB_SUCCESS;
1123
  if (OB_UNLIKELY(!inited_)) {
1124
    ret = OB_NOT_INIT;
1125
    LOG_WARN("not inited", KR(ret));
1126
  } else if (OB_FAIL(restore_service_->check_stop())) {
1127
    LOG_WARN("restore scheduler stopped", KR(ret));
1128
  } else if (OB_UNLIKELY(!tenant_schema.is_valid())) {
1129
    ret = OB_INVALID_ARGUMENT;
1130
    LOG_WARN("invalid argument", KR(ret), K(tenant_schema));
1131
  } else if (OB_ISNULL(sql_proxy_)) {
1132
    ret = OB_ERR_UNEXPECTED;
1133
    LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1134
  } else if (OB_FAIL(ObRestoreCommonUtil::finish_create_ls(sql_proxy_, tenant_schema, ls_attr_array))) {
1135
    LOG_WARN("fail to finish create ls", KR(ret), K(tenant_schema), K(ls_attr_array));
1136
  }
1137
  return ret;
1138
}
1139

1140
int ObRestoreScheduler::restore_wait_to_consistent_scn(const share::ObPhysicalRestoreJob &job_info)
1141
{
1142
  int ret = OB_SUCCESS;
1143
  TenantRestoreStatus tenant_restore_status;
1144
  const uint64_t tenant_id = job_info.get_tenant_id();
1145
  const ObTenantSchema *tenant_schema = NULL;
1146
  ObSchemaGetterGuard schema_guard;
1147
  bool is_replay_finish = false;
1148
  DEBUG_SYNC(BEFORE_WAIT_RESTORE_TO_CONSISTENT_SCN);
1149
  if (OB_UNLIKELY(!inited_)) {
1150
    ret = OB_NOT_INIT;
1151
    LOG_WARN("not inited", KR(ret));
1152
  } else if (OB_FAIL(check_tenant_can_restore_(tenant_id))) {
1153
    LOG_WARN("failed to check tenant can restore", KR(ret), K(tenant_id));
1154
  } else if (OB_ISNULL(sql_proxy_)) {
1155
    ret = OB_ERR_UNEXPECTED;
1156
    LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1157
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
1158
          OB_SYS_TENANT_ID, schema_guard))) {
1159
    LOG_WARN("fail to get tenant schema guard", KR(ret));
1160
  } else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
1161
    LOG_WARN("fail to get tenant schema", KR(ret), K(tenant_id));
1162
  } else if (OB_ISNULL(tenant_schema) || !tenant_schema->is_restore()) {
1163
    ret = OB_ERR_UNEXPECTED;
1164
    LOG_WARN("tenant not exist or tenant is not in physical restore status", KR(ret),
1165
               KPC(tenant_schema));
1166
  } else if (OB_FAIL(check_all_ls_restore_to_consistent_scn_finish_(tenant_id, tenant_restore_status))) {
1167
    LOG_WARN("fail to check all ls restore finish", KR(ret), K(job_info));
1168
  } else if (tenant_restore_status.is_finish()) {
1169
    LOG_INFO("[RESTORE] restore wait all ls restore to consistent scn done", K(tenant_id), K(tenant_restore_status));
1170
    int tmp_ret = OB_SUCCESS;
1171
    ObMySQLTransaction trans;
1172
    const uint64_t exec_tenant_id = gen_meta_tenant_id(job_info.get_tenant_id());
1173
    if (tenant_restore_status.is_failed()) {
1174
      ret = OB_LS_RESTORE_FAILED;
1175
      LOG_INFO("[RESTORE]restore wait all ls restore to consistent scn failed", K(ret));
1176
      if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
1177
        LOG_WARN("fail to update job status", KR(ret), K(job_info));
1178
      }
1179
    } else if (OB_FAIL(check_tenant_replay_to_consistent_scn(tenant_id, job_info.get_consistent_scn(), is_replay_finish))) {
1180
      LOG_WARN("fail to check tenant replay to consistent scn", K(ret));
1181
    } else if (!is_replay_finish) {
1182
    } else if (FALSE_IT(DEBUG_SYNC(AFTER_WAIT_RESTORE_TO_CONSISTENT_SCN))) {
1183
    } else if (OB_FAIL(trans.start(sql_proxy_, exec_tenant_id))) {
1184
      LOG_WARN("fail to start trans", K(ret));
1185
    } else if (OB_FAIL(set_restore_to_target_scn_(trans, job_info, job_info.get_restore_scn()))) {
1186
      LOG_WARN("fail to set restore to target scn", KR(ret));
1187
    } else if (OB_FAIL(try_update_job_status(trans, ret, job_info))) {
1188
      LOG_WARN("fail to update job status", KR(ret), K(job_info));
1189
    }
1190
    if (trans.is_started()) {
1191
      if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
1192
        ret = OB_SUCC(ret) ? tmp_ret : ret;
1193
        LOG_WARN("fail to rollback trans", KR(ret), KR(tmp_ret));
1194
      }
1195
    }
1196
  }
1197
  return ret;
1198
}
1199

1200

1201

1202
int ObRestoreScheduler::check_tenant_replay_to_consistent_scn(const uint64_t tenant_id, const share::SCN &scn, bool &is_replay_finish)
1203
{
1204
  int ret = OB_SUCCESS;
1205
  ObAllTenantInfo tenant_info;
1206
  if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, sql_proxy_, false/*no update*/, tenant_info))) {
1207
    LOG_WARN("failed to load tenant info", K(ret));
1208
  } else if (tenant_info.get_recovery_until_scn() != scn) {
1209
    ret = OB_INVALID_ARGUMENT;
1210
    LOG_WARN("unexpected recovery until scn", K(ret), K(tenant_info), K(scn));
1211
  } else {
1212
    is_replay_finish = (tenant_info.get_recovery_until_scn() <= tenant_info.get_standby_scn());
1213
    LOG_INFO("[RESTORE]tenant replay to consistent_scn", K(is_replay_finish));
1214
  }
1215
  return ret;
1216
}
1217

1218
int ObRestoreScheduler::restore_wait_ls_finish(const share::ObPhysicalRestoreJob &job_info)
1219
{
1220
  int ret = OB_SUCCESS;
1221
  DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_WAIT_LS_FINISH);
1222
  TenantRestoreStatus tenant_restore_status;
1223
  const uint64_t tenant_id = job_info.get_tenant_id();
1224
  const ObTenantSchema *tenant_schema = NULL;
1225
  ObSchemaGetterGuard schema_guard;
1226

1227
  if (OB_UNLIKELY(!inited_)) {
1228
    ret = OB_NOT_INIT;
1229
    LOG_WARN("not inited", KR(ret));
1230
  } else if (OB_FAIL(check_tenant_can_restore_(tenant_id))) {
1231
    LOG_WARN("failed to check tenant can restore", KR(ret), K(tenant_id));
1232
  } else if (OB_ISNULL(sql_proxy_)) {
1233
    ret = OB_ERR_UNEXPECTED;
1234
    LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1235
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
1236
          OB_SYS_TENANT_ID, schema_guard))) {
1237
    LOG_WARN("fail to get tenant schema guard", KR(ret));
1238
  } else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
1239
    LOG_WARN("fail to get tenant schema", KR(ret), K(tenant_id));
1240
  } else if (OB_ISNULL(tenant_schema) || !tenant_schema->is_restore_tenant_status()) {
1241
    ret = OB_ERR_UNEXPECTED;
1242
    LOG_WARN("tenant not exist or tenant is not in physical restore status", KR(ret),
1243
               KPC(tenant_schema));
1244
  } else if (OB_FAIL(check_all_ls_restore_finish_(tenant_id, tenant_restore_status))) {
1245
    LOG_WARN("failed to check all ls restore finish", KR(ret), K(job_info));
1246
  } else if (tenant_restore_status.is_finish()) {
1247
    LOG_INFO("[RESTORE] restore wait all ls finish done", K(tenant_id), K(tenant_restore_status));
1248
    int tmp_ret = OB_SUCCESS;
1249
    int tenant_restore_result = OB_LS_RESTORE_FAILED;
1250
    if (tenant_restore_status.is_success()) {
1251
      tenant_restore_result = OB_SUCCESS;
1252
    } 
1253
    if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, tenant_restore_result, job_info))) {
1254
      LOG_WARN("fail to update job status", KR(ret), KR(tmp_ret), KR(tenant_restore_result), K(job_info));
1255
    }
1256
  }
1257
  return ret;
1258
}
1259

1260
int ObRestoreScheduler::check_all_ls_restore_finish_(
1261
    const uint64_t tenant_id,
1262
    TenantRestoreStatus &tenant_restore_status)
1263
{
1264
  int ret = OB_SUCCESS;
1265
  if (is_sys_tenant(tenant_id) || is_meta_tenant(tenant_id)) {
1266
    ret = OB_INVALID_ARGUMENT;
1267
    LOG_WARN("invalid argument", KR(ret), K(tenant_id));
1268
  } else if (OB_ISNULL(sql_proxy_)) {
1269
    ret = OB_ERR_UNEXPECTED;
1270
    LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy_));
1271
  } else {
1272
    tenant_restore_status = TenantRestoreStatus::SUCCESS;
1273
    SMART_VAR(common::ObMySQLProxy::MySQLResult, res) {
1274
      ObSqlString sql;
1275
      common::sqlclient::ObMySQLResult *result = NULL;
1276
      if (OB_FAIL(sql.assign_fmt("select a.ls_id, b.restore_status from %s as a "
1277
              "left join %s as b on a.ls_id = b.ls_id",
1278
              OB_ALL_LS_STATUS_TNAME, OB_ALL_LS_META_TABLE_TNAME))) {
1279
        LOG_WARN("failed to assign sql", K(ret));
1280
      } else if (OB_FAIL(sql_proxy_->read(res, gen_meta_tenant_id(tenant_id), sql.ptr()))) {
1281
        LOG_WARN("execute sql failed", KR(ret), K(sql));
1282
      } else if (OB_ISNULL(result = res.get_result())) {
1283
        ret = OB_ERR_UNEXPECTED;
1284
        LOG_WARN("result is null", KR(ret), K(sql));
1285
      } else {
1286
        int64_t ls_id = 0;
1287
        share::ObLSRestoreStatus ls_restore_status;
1288
        int32_t restore_status = -1;
1289
        //TODO no ls in ls_meta
1290
        //if one of ls restore failed, make tenant restore failed
1291
        //
1292
        while (OB_SUCC(ret) && OB_SUCC(result->next())
1293
            && !tenant_restore_status.is_failed()) {
1294
          EXTRACT_INT_FIELD_MYSQL(*result, "ls_id", ls_id, int64_t);
1295
          EXTRACT_INT_FIELD_MYSQL(*result, "restore_status", restore_status, int32_t);
1296

1297
          if (OB_FAIL(ret)) {
1298
          } else if (OB_FAIL(ls_restore_status.set_status(restore_status))) {
1299
            LOG_WARN("failed to set status", KR(ret), K(restore_status));
1300
          } else if (!ls_restore_status.is_in_restore_or_none() || ls_restore_status.is_failed()) {
1301
            //restore failed
1302
            tenant_restore_status = TenantRestoreStatus::FAILED;
1303
          } else if (!ls_restore_status.is_none()
1304
                     && tenant_restore_status.is_success()) {
1305
            tenant_restore_status = TenantRestoreStatus::IN_PROGRESS;
1306
          }
1307
        } // while
1308
        if (OB_ITER_END == ret) {
1309
          ret = OB_SUCCESS;
1310
        }
1311
        if (!tenant_restore_status.is_success()) {
1312
          LOG_INFO("check all ls restore not finish, just wait", KR(ret),
1313
              K(tenant_id), K(ls_id), K(tenant_restore_status));
1314
        }
1315
      }
1316
    }
1317
  }
1318
  return ret;
1319
}
1320

1321
int ObRestoreScheduler::check_all_ls_restore_to_consistent_scn_finish_(
1322
    const uint64_t tenant_id,
1323
    TenantRestoreStatus &tenant_restore_status)
1324
{
1325
  int ret = OB_SUCCESS;
1326
  bool is_finished = false;
1327
  bool is_success = false;
1328
  ObPhysicalRestoreTableOperator restore_op;
1329
  if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id, share::OBCG_STORAGE/*group_id*/))) {
1330
    LOG_WARN("fail init", K(ret), K(tenant_id_));
1331
  } else if (OB_FAIL(restore_op.check_finish_restore_to_consistent_scn(is_finished, is_success))) {
1332
    LOG_WARN("fail to check finish restore to consistent_scn", K(ret), K(tenant_id));
1333
  } else if (!is_finished) {
1334
    tenant_restore_status = TenantRestoreStatus::IN_PROGRESS;
1335
  } else if (is_success) {
1336
    tenant_restore_status = TenantRestoreStatus::SUCCESS;
1337
  } else {
1338
    tenant_restore_status = TenantRestoreStatus::FAILED;
1339
  }
1340

1341
  if (OB_FAIL(ret)) {
1342
  } else if (!tenant_restore_status.is_success()) {
1343
    LOG_INFO("check all ls restore to consistent_scn not finish, just wait", KR(ret),
1344
        K(tenant_id), K(tenant_restore_status));
1345
  }
1346

1347
  return ret;
1348
}
1349

1350
int ObRestoreScheduler::restore_wait_tenant_finish(const share::ObPhysicalRestoreJob &job_info)
1351
{
1352
  int ret = OB_SUCCESS;
1353
  DEBUG_SYNC(BEFORE_WAIT_RESTORE_TENANT_FINISH);
1354
  //read tenant restore status from __all_restore_job_history
1355
  ObPhysicalRestoreTableOperator restore_op;
1356
  ObPhysicalRestoreJob tenant_job;
1357
  ObHisRestoreJobPersistInfo user_job_history;
1358
  if (OB_UNLIKELY(!inited_)) {
1359
    ret = OB_NOT_INIT;
1360
    LOG_WARN("not inited", KR(ret));
1361
  } else if (OB_FAIL(restore_service_->check_stop())) {
1362
    LOG_WARN("restore scheduler stopped", KR(ret));
1363
  } else if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_, share::OBCG_STORAGE /*group_id*/))) {
1364
    LOG_WARN("fail init", K(ret), K(tenant_id_));
1365
  } else if (OB_FAIL(ObRestoreUtil::get_user_restore_job_history(
1366
                 *sql_proxy_, job_info.get_tenant_id(),
1367
                 job_info.get_restore_key().tenant_id_, job_info.get_job_id(),
1368
                 user_job_history))) {
1369
    LOG_WARN("failed to get user restore job", KR(ret), K(job_info));
1370
  } else if (user_job_history.is_restore_success()) {
1371
    const int64_t tenant_id = job_info.get_tenant_id();
1372
    ObSchemaGetterGuard schema_guard;
1373
    const ObTenantSchema *tenant_schema = NULL;
1374

1375
    if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
1376
      LOG_WARN("fail to get tenant schema guard", K(ret), K(tenant_id));
1377
    } else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
1378
      LOG_WARN("fail to get tenant schema", K(ret), K(tenant_id));
1379
    } else if (OB_ISNULL(tenant_schema)) {
1380
      ret = OB_ERR_UNEXPECTED;
1381
      LOG_WARN("tenant schema is null", K(ret), K(tenant_id));
1382
    } else if (tenant_schema->is_restore_tenant_status() || tenant_schema->is_normal()) {
1383
      if (tenant_schema->is_restore_tenant_status()) {
1384
        const int64_t DEFAULT_TIMEOUT = GCONF.internal_sql_execute_timeout;
1385
        // try finish restore status
1386
        obrpc::ObCreateTenantEndArg arg;
1387
        arg.tenant_id_ = tenant_id;
1388
        arg.exec_tenant_id_ = OB_SYS_TENANT_ID;
1389
        if (OB_FAIL(restore_service_->check_stop())) {
1390
          LOG_WARN("restore scheduler stopped", K(ret));
1391
        } else if (OB_FAIL(rpc_proxy_->timeout(DEFAULT_TIMEOUT)
1392
                               .create_tenant_end(arg))) {
1393
          LOG_WARN("fail to create tenant end", K(ret), K(arg), K(DEFAULT_TIMEOUT));
1394
        }
1395
      }
1396
      if (OB_SUCC(ret)) {
1397
        int tmp_ret = OB_SUCCESS;
1398
        if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
1399
          LOG_WARN("fail to update job status", K(ret), K(tmp_ret),
1400
                   K(job_info));
1401
        }
1402
      }
1403
    } else {
1404
      ret = OB_STATE_NOT_MATCH;
1405
      LOG_WARN("tenant status not match", K(ret), KPC(tenant_schema));
1406
    }
1407
  } else {
1408
    //restore failed
1409
    int tmp_ret = OB_SUCCESS;
1410
    ret = OB_ERROR;
1411
    if (OB_SUCCESS != (tmp_ret = try_update_job_status(*sql_proxy_, ret, job_info))) {
1412
      LOG_WARN("fail to update job status", K(ret), K(tmp_ret),
1413
          K(job_info));
1414
    }
1415
  }
1416

1417
  return ret;
1418
}
1419

1420
int ObRestoreScheduler::reset_schema_status(const uint64_t tenant_id, common::ObMySQLProxy *sql_proxy)
1421
{
1422
  int ret = OB_SUCCESS;
1423
  if (OB_UNLIKELY(!is_user_tenant(tenant_id))) {
1424
    ret = OB_INVALID_ARGUMENT;
1425
    LOG_WARN("invalid argument", KR(ret), K(tenant_id));
1426
  } else if (OB_ISNULL(sql_proxy)) {
1427
    ret = OB_ERR_UNEXPECTED;
1428
    LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy));
1429
  } else {
1430
    ObSchemaStatusProxy proxy(*sql_proxy);
1431
    ObRefreshSchemaStatus schema_status(tenant_id, OB_INVALID_TIMESTAMP, OB_INVALID_VERSION);
1432
    if (OB_FAIL(proxy.init())) {
1433
      LOG_WARN("failed to init schema proxy", KR(ret));
1434
    } else if (OB_FAIL(proxy.set_tenant_schema_status(schema_status))) {
1435
      LOG_WARN("failed to update schema status", KR(ret), K(schema_status));
1436
    }
1437
  }
1438
  return ret;
1439
}
1440

1441
int ObRestoreScheduler::may_update_restore_concurrency_(const uint64_t new_tenant_id, const share::ObPhysicalRestoreJob &job_info)
1442
{
1443
  int ret = OB_SUCCESS;
1444
  double cpu_count = 0;
1445
  int64_t ha_high_thread_score = 0;
1446
  omt::ObTenantConfigGuard tenant_config(TENANT_CONF(new_tenant_id));
1447
  // restore concurrency controls the number of threads used by restore dag.
1448
  // if cpu number is less than 10, use the default value.
1449
  // if cpu number is between 10 ~ 100, let concurrency equals to the cpu number.
1450
  // if cpu number is exceed 100,  let concurrency equals to 100.
1451
  const int64_t LOW_CPU_LIMIT = 10;
1452
  const int64_t MAX_CPU_LIMIT = 100;
1453
  if (!job_info.is_valid()) {
1454
    ret = OB_INVALID_ARGUMENT;
1455
    LOG_WARN("get invalid args", K(ret), K(job_info));
1456
  } else if (tenant_config.is_valid() && OB_FALSE_IT(ha_high_thread_score = tenant_config->ha_high_thread_score)) {
1457
  } else if (0 != ha_high_thread_score) {
1458
    LOG_INFO("ha high thread score has been set", K(ha_high_thread_score));
1459
  } else if (OB_FAIL(ObRestoreUtil::get_restore_tenant_cpu_count(*sql_proxy_, new_tenant_id, cpu_count))) {
1460
    LOG_WARN("failed to get restore tenant cpu count", K(ret), K(new_tenant_id));
1461
  } else {
1462
    int64_t concurrency = job_info.get_concurrency();
1463
    if (LOW_CPU_LIMIT < cpu_count && MAX_CPU_LIMIT >= cpu_count) {
1464
      concurrency = std::max(static_cast<int64_t>(cpu_count), concurrency);
1465
    } else if (MAX_CPU_LIMIT < cpu_count) {
1466
      concurrency = MAX_CPU_LIMIT;
1467
    }
1468
    if (OB_FAIL(update_restore_concurrency_(job_info.get_tenant_name(), new_tenant_id, concurrency))) {
1469
      LOG_WARN("failed to update restore concurrency", K(ret), K(job_info));
1470
    }
1471
  }
1472
  return ret;
1473
}
1474

1475
int ObRestoreScheduler::reset_restore_concurrency_(const uint64_t new_tenant_id, const share::ObPhysicalRestoreJob &job_info)
1476
{
1477
  int ret = OB_SUCCESS;
1478
  const int64_t concurrency = 0;
1479
  const ObString &tenant_name = job_info.get_tenant_name();
1480
  if (!job_info.is_valid()) {
1481
    ret = OB_INVALID_ARGUMENT;
1482
    LOG_WARN("get invalid args", K(ret), K(job_info));
1483
  } else if (OB_FAIL(update_restore_concurrency_(tenant_name, new_tenant_id, concurrency))) {
1484
    LOG_WARN("failed to update restore concurrency", K(ret), K(job_info));
1485
  }
1486
  return ret;
1487
}
1488

1489
int ObRestoreScheduler::update_restore_concurrency_(const common::ObString &tenant_name,
1490
    const uint64_t tenant_id, const int64_t concurrency)
1491
{
1492
  int ret = OB_SUCCESS;
1493
  ObSqlString sql;
1494
  int64_t affected_rows = 0;
1495
  if (OB_ISNULL(sql_proxy_)) {
1496
    ret = OB_ERR_UNEXPECTED;
1497
    LOG_WARN("sql proxy is null", K(ret));
1498
  } else if (OB_FAIL(sql.append_fmt(
1499
      "ALTER SYSTEM SET ha_high_thread_score = %ld TENANT = '%.*s'",
1500
      concurrency, tenant_name.length(), tenant_name.ptr()))) {
1501
    LOG_WARN("failed to append fmt", K(ret), K(tenant_name));
1502
  } else if (OB_FAIL(sql_proxy_->write(sql.ptr(), affected_rows))) {
1503
    LOG_WARN("failed to write sql", K(ret), K(sql));
1504
  } else {
1505
    LOG_INFO("update restore concurrency", K(tenant_name), K(concurrency), K(sql));
1506
  }
1507
  return ret;
1508
}
1509

1510
} // end namespace rootserver
1511
} // end namespace oceanbase
1512

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.