oceanbase

Форк
0
/
ob_restore_util.cpp 
1283 строки · 54.1 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS_RESTORE
14

15
#include "ob_restore_util.h"
16
#include "lib/lock/ob_mutex.h"
17
#include "share/restore/ob_restore_uri_parser.h"
18
#include "share/schema/ob_schema_mgr.h"
19
#include "share/schema/ob_schema_getter_guard.h"
20
#include "share/backup/ob_backup_struct.h"
21
#include "share/backup/ob_backup_io_adapter.h"
22
#include "share/backup/ob_backup_path.h"
23
#include "rootserver/ob_rs_event_history_table_operator.h"
24
#include "storage/backup/ob_backup_restore_util.h"
25
#include "share/backup/ob_archive_store.h"
26
#include "storage/backup/ob_backup_data_store.h"
27
#include "share/restore/ob_restore_persist_helper.h"//ObRestorePersistHelper ObRestoreProgressPersistInfo
28
#include "logservice/palf/palf_base_info.h"//PalfBaseInfo
29
#include "storage/ls/ob_ls_meta_package.h"//ls_meta
30
#include "share/backup/ob_archive_path.h"
31
#include "share/ob_upgrade_utils.h"
32
#include "share/ob_unit_table_operator.h"
33

34
using namespace oceanbase::common;
35
using namespace oceanbase;
36
using namespace oceanbase::share;
37
using namespace oceanbase::share::schema;
38
using namespace oceanbase::rootserver;
39

40
/*-------------- physical restore --------------------------*/
41
int ObRestoreUtil::fill_physical_restore_job(
42
    const int64_t job_id,
43
    const obrpc::ObPhysicalRestoreTenantArg &arg,
44
    ObPhysicalRestoreJob &job)
45
{
46
  int ret = OB_SUCCESS;
47

48
  if (job_id < 0 || !arg.is_valid()) {
49
    ret = OB_INVALID_ARGUMENT;
50
    LOG_WARN("invalid arg", K(ret), K(job_id), K(arg));
51
  } else {
52
    job.reset();
53
    job.init_restore_key(OB_SYS_TENANT_ID, job_id); 
54
    job.set_status(PhysicalRestoreStatus::PHYSICAL_RESTORE_CREATE_TENANT);
55
    job.set_tenant_name(arg.tenant_name_);
56
    job.set_initiator_job_id(arg.initiator_job_id_);
57
    job.set_initiator_tenant_id(arg.initiator_tenant_id_);
58
    if (OB_FAIL(job.set_description(arg.description_))) {
59
      LOG_WARN("fail to set description", K(ret));
60
    }
61

62
    // check restore option
63
    if (OB_SUCC(ret)) {
64
      if (OB_FAIL(ObPhysicalRestoreOptionParser::parse(arg.restore_option_, job))) {
65
        LOG_WARN("fail to parse restore_option", K(ret), K(arg), K(job_id));
66
      } else if (OB_FAIL(job.set_restore_option(arg.restore_option_))){
67
        LOG_WARN("failed to set restore option", KR(ret), K(arg));
68
      } else if (job.get_kms_encrypt()) {
69
        if (OB_FAIL(job.set_kms_info(arg.kms_info_))) {
70
          LOG_WARN("failed to fill kms info", KR(ret), K(arg));
71
        }
72
      }
73
    }
74

75
    if (OB_SUCC(ret)) {
76
      if (OB_FAIL(fill_backup_info_(arg, job))) {
77
        LOG_WARN("failed to fill backup info", KR(ret), K(arg), K(job));
78
      }
79
    }
80

81
    if (OB_SUCC(ret)) {
82
      if (OB_FAIL(fill_encrypt_info_(arg, job))) {
83
        LOG_WARN("failed to fill encrypt info", KR(ret), K(arg), K(job));
84
      }
85
    }
86

87
    if (FAILEDx(job.set_passwd_array(arg.passwd_array_))) {
88
      LOG_WARN("failed to copy passwd array", K(ret), K(arg));
89
    }
90

91
    if (OB_SUCC(ret)) {
92
      for (int64_t i = 0; OB_SUCC(ret) && i < arg.table_items_.count(); i++) {
93
        const obrpc::ObTableItem &item = arg.table_items_.at(i);
94
        if (OB_FAIL(job.get_white_list().add_table_item(item))) {
95
          LOG_WARN("fail to add table item", KR(ret), K(item));
96
        }
97
      }
98
    }
99
  }
100

101
  LOG_INFO("finish fill_physical_restore_job", K(job_id), K(arg), K(job));
102
  return ret;
103
}
104

105
int ObRestoreUtil::record_physical_restore_job(
106
    common::ObISQLClient &sql_client,
107
    const ObPhysicalRestoreJob &job)
108
{
109
  int ret = OB_SUCCESS;
110
  if (!job.is_valid()) {
111
    ret = OB_INVALID_ARGUMENT;
112
    LOG_WARN("invalid arg", K(ret), K(job));
113
  } else {
114
    bool has_job = false;
115
    ObPhysicalRestoreTableOperator restore_op;
116
    if (OB_FAIL(check_has_physical_restore_job(sql_client,
117
                                               job.get_tenant_name(),
118
                                               has_job))) {
119
      LOG_WARN("fail to check if job exist", K(ret), K(job));
120
    } else if (has_job) {
121
      ret = OB_RESTORE_IN_PROGRESS;
122
      LOG_WARN("restore tenant job already exist", K(ret), K(job));
123
    } else if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) {
124
      LOG_WARN("fail init restore op", K(ret));
125
    } else if (OB_FAIL(restore_op.insert_job(job))) {
126
      LOG_WARN("fail insert job and partitions", K(ret), K(job));
127
    }
128
  }
129
  return ret;
130
}
131

132
int ObRestoreUtil::insert_user_tenant_restore_job(
133
             common::ObISQLClient &sql_client,
134
             const ObString &tenant_name,
135
             const int64_t user_tenant_id)
136
{
137
  int ret = OB_SUCCESS;
138
  if (OB_UNLIKELY(!is_user_tenant(user_tenant_id))) {
139
    ret = OB_INVALID_ARGUMENT;
140
    LOG_WARN("not user tenant", KR(ret), K(user_tenant_id));
141
  } else {
142
    ObPhysicalRestoreTableOperator restore_op;
143
    ObPhysicalRestoreJob initaitor_job_info;
144
    ObPhysicalRestoreJob job_info;
145
    if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) {
146
      LOG_WARN("failed to init restore op", KR(ret), K(user_tenant_id));
147
    } else if (OB_FAIL(restore_op.get_job_by_tenant_name(
148
            tenant_name, initaitor_job_info))) {
149
      LOG_WARN("failed to get job by tenant name", KR(ret), K(tenant_name));
150
    } else if (OB_FAIL(job_info.assign(initaitor_job_info))) {
151
      LOG_WARN("failed to assign job info", KR(ret), K(initaitor_job_info));
152
    } else {
153
      ObMySQLTransaction trans;
154
      //TODO get tenant job_id, use tenant
155
      const int64_t job_id = initaitor_job_info.get_job_id();
156
      job_info.init_restore_key(user_tenant_id, job_id);
157
      job_info.set_tenant_id(user_tenant_id);
158
      job_info.set_status(share::PHYSICAL_RESTORE_PRE);
159
      job_info.set_initiator_job_id(job_info.get_job_id());
160
      job_info.set_initiator_tenant_id(OB_SYS_TENANT_ID);
161
      ObPhysicalRestoreTableOperator user_restore_op;
162
      ObRestorePersistHelper restore_persist_op;
163
      ObRestoreProgressPersistInfo persist_info;
164
      persist_info.key_.tenant_id_ = user_tenant_id;
165
      persist_info.key_.job_id_ = job_info.get_job_id();
166
      persist_info.restore_scn_ = job_info.get_restore_scn();
167
      const uint64_t exec_tenant_id = gen_meta_tenant_id(user_tenant_id);
168
      if (OB_FAIL(trans.start(&sql_client, exec_tenant_id))) {
169
        LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
170
      } else if (OB_FAIL(user_restore_op.init(&trans, user_tenant_id, share::OBCG_STORAGE /*group_id*/))) {
171
        LOG_WARN("failed to init restore op", KR(ret), K(user_tenant_id));
172
      } else if (OB_FAIL(restore_persist_op.init(user_tenant_id, share::OBCG_STORAGE /*group_id*/))) {
173
        LOG_WARN("failed to init restore persist op", KR(ret), K(user_tenant_id));
174
      } else if (OB_FAIL(user_restore_op.insert_job(job_info))) {
175
        LOG_WARN("failed to insert job", KR(ret), K(job_info));
176
      } else if (OB_FAIL(restore_persist_op.insert_initial_restore_progress(trans, persist_info))) {
177
        LOG_WARN("failed to insert persist info", KR(ret), K(persist_info));
178
      }
179
      if (trans.is_started()) {
180
        int temp_ret = OB_SUCCESS;
181
        bool commit = OB_SUCC(ret);
182
        if (OB_SUCCESS != (temp_ret = trans.end(commit))) {
183
          ret = (OB_SUCC(ret)) ? temp_ret : ret;
184
          LOG_WARN("trans end failed", KR(ret), KR(temp_ret), K(commit));
185
        }
186
      }
187
    }
188
  }
189
  return ret;
190
}
191

192

193
int ObRestoreUtil::check_has_physical_restore_job(
194
    common::ObISQLClient &sql_client,
195
    const ObString &tenant_name,
196
    bool &has_job)
197
{
198
  int ret = OB_SUCCESS;
199
  ObArray<ObPhysicalRestoreJob> jobs;
200
  has_job = false;
201
  ObPhysicalRestoreTableOperator restore_op;
202
  if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) {
203
    LOG_WARN("fail init restore op", K(ret));
204
  } else if (OB_FAIL(restore_op.get_jobs(jobs))) {
205
    LOG_WARN("fail get jobs", K(ret));
206
  } else {
207
    int64_t len = common::OB_MAX_TENANT_NAME_LENGTH_STORE;
208
    FOREACH_CNT_X(job, jobs, !has_job) {
209
      if (0 == job->get_tenant_name().case_compare(tenant_name)) {
210
        //nocase compare
211
        has_job = true;
212
      }
213
    }
214
  }
215
  return ret;
216
}
217

218
int ObRestoreUtil::fill_backup_info_(
219
    const obrpc::ObPhysicalRestoreTenantArg &arg,
220
    share::ObPhysicalRestoreJob &job)
221
{
222
  int ret = OB_SUCCESS;
223
  const bool has_multi_url = arg.multi_uri_.length() > 0;
224
  LOG_INFO("start fill backup path", K(arg));
225
  if (has_multi_url) {
226
    if(OB_FAIL(fill_multi_backup_path(arg, job))) {
227
      LOG_WARN("failed to fill multi backup path", K(ret), K(arg));
228
    }
229
  } else {
230
    if (OB_FAIL(fill_compat_backup_path(arg, job))) {
231
      LOG_WARN("failed to fill compat backup path", K(ret), K(arg));
232
    }
233
  }
234
  FLOG_INFO("finish fill backup path", K(arg), K(job));
235
  return ret;
236
}
237

238
int ObRestoreUtil::fill_multi_backup_path(
239
    const obrpc::ObPhysicalRestoreTenantArg &arg,
240
    share::ObPhysicalRestoreJob &job)
241
{
242
  int ret = OB_SUCCESS;
243
  // TODO: use restore preview url
244
  return ret;
245
}
246

247
int ObRestoreUtil::get_encrypt_backup_dest_format_str(
248
    const ObArray<ObString> &original_dest_list,
249
    common::ObArenaAllocator &allocator,
250
    common::ObString &encrypt_dest_str)
251
{
252
  int ret = OB_SUCCESS;
253
  char *buf = NULL;
254
  int64_t length = OB_MAX_BACKUP_DEST_LENGTH * original_dest_list.count();
255
  if (0 == original_dest_list.count()) {
256
    ret = OB_INVALID_ARGUMENT;
257
    LOG_WARN("get invalid argument", KR(ret), K(original_dest_list));
258
  } else if (OB_ISNULL(buf = reinterpret_cast<char *>(allocator.alloc(length)))) {
259
    ret = OB_ALLOCATE_MEMORY_FAILED;
260
    LOG_WARN("allocate memory failed", KR(ret));
261
  } else {
262
    ObBackupDest dest;
263
    char encrypt_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
264
    int64_t pos = 0;
265
    for (int i = 0; OB_SUCC(ret) && i < original_dest_list.count(); i++) {
266
      const common::ObString &item = original_dest_list.at(i);
267
      if (OB_FAIL(dest.set_without_decryption(item))) {
268
        LOG_WARN("failed to push back", KR(ret), K(item));
269
      } else if (OB_FAIL(dest.get_backup_dest_str(encrypt_str, sizeof(encrypt_str)))) {
270
        LOG_WARN("failed to get backup dest str", KR(ret), K(item));
271
      } else if (OB_FAIL(databuff_printf(buf, length, pos, "%s%s", 0 == i ? "" : ",", encrypt_str))) {
272
        LOG_WARN("failed to append uri", KR(ret), K(encrypt_str), K(pos), K(buf)); 
273
      }
274
    }
275
    if (OB_FAIL(ret)) {
276
    } else if (strlen(buf) <= 0) {
277
      ret = OB_ERR_UNEXPECTED;
278
      LOG_WARN("unexpected format str", KR(ret), K(buf)); 
279
    } else {
280
      encrypt_dest_str.assign_ptr(buf, strlen(buf));
281
      LOG_DEBUG("get format encrypt backup dest str", KR(ret), K(encrypt_dest_str));
282
    }
283
  }
284

285
  return ret;
286
}
287

288
int ObRestoreUtil::fill_compat_backup_path(
289
    const obrpc::ObPhysicalRestoreTenantArg &arg,
290
    share::ObPhysicalRestoreJob &job)
291
{
292
  int ret = OB_SUCCESS;
293
  ObArenaAllocator allocator;
294
  ObArray<ObString> tenant_path_array;
295
  ObArray<ObRestoreBackupSetBriefInfo> backup_set_list;
296
  ObArray<ObRestoreLogPieceBriefInfo> backup_piece_list;
297
  ObArray<ObBackupPathString> log_path_list;
298
  ObString tenant_dest_list;
299
  int64_t last_backup_set_idx = -1;
300
  bool restore_using_compl_log = false;
301
  share::SCN restore_scn;
302
  if (!arg.multi_uri_.empty()) {
303
    ret = OB_INVALID_ARGUMENT;
304
    LOG_WARN("invalid args", K(ret), K(arg));
305
  } else if (OB_FAIL(ObPhysicalRestoreUriParser::parse(arg.uri_, allocator, tenant_path_array))) {
306
    LOG_WARN("fail to parse uri", K(ret), K(arg));
307
  } else if (OB_FAIL(get_encrypt_backup_dest_format_str(tenant_path_array, allocator, tenant_dest_list))) {
308
    LOG_WARN("failed to convert uri", K(ret), K(arg), K(tenant_path_array)); 
309
  } else if (OB_FAIL(job.set_backup_dest(tenant_dest_list))) {
310
    LOG_WARN("failed to copy backup dest", K(ret), K(arg));
311
  } else if (OB_FAIL(check_restore_using_complement_log_(tenant_path_array, restore_using_compl_log))) {
312
    LOG_WARN("failed to check only contain backup set", K(ret), K(tenant_path_array));
313
  } else if (OB_FAIL(fill_restore_scn_(
314
      arg.restore_scn_, arg.restore_timestamp_, arg.with_restore_scn_, tenant_path_array, arg.passwd_array_,
315
      restore_using_compl_log, restore_scn))) {
316
    LOG_WARN("fail to fill restore scn", K(ret), K(arg), K(tenant_path_array));
317
  } else if (OB_FALSE_IT(job.set_restore_scn(restore_scn))) {
318
  } else if (OB_FAIL(get_restore_source(restore_using_compl_log, tenant_path_array, arg.passwd_array_, job.get_restore_scn(),
319
      backup_set_list, backup_piece_list, log_path_list))) {
320
    LOG_WARN("fail to get restore source", K(ret), K(tenant_path_array), K(arg));
321
  } else if (OB_FAIL(do_fill_backup_path_(backup_set_list, backup_piece_list, log_path_list, job))) {
322
    LOG_WARN("fail to do fill backup path", K(backup_set_list), K(backup_piece_list), K(log_path_list));
323
  } else if (OB_FALSE_IT(last_backup_set_idx = backup_set_list.count() - 1)) {
324
  } else if (last_backup_set_idx < 0) {
325
    ret = OB_ERR_UNEXPECTED;
326
    LOG_WARN("invalid idx", K(ret), K(last_backup_set_idx), K(backup_set_list));
327
  } else if (OB_FAIL(do_fill_backup_info_(backup_set_list.at(last_backup_set_idx).backup_set_path_, job))) {
328
    LOG_WARN("fail to do fill backup info");
329
  }
330
  return ret;
331
}
332

333
int ObRestoreUtil::fill_restore_scn_(
334
    const share::SCN &src_scn,
335
    const ObString &timestamp,
336
    const bool with_restore_scn,
337
    const ObIArray<ObString> &tenant_path_array,
338
    const common::ObString &passwd,
339
    const bool restore_using_compl_log,
340
    share::SCN &restore_scn)
341
{
342
  int ret = OB_SUCCESS;
343
  if (tenant_path_array.empty()) {
344
    ret = OB_INVALID_ARGUMENT;
345
    LOG_WARN("invalid argument", K(ret), K(tenant_path_array));
346
  } else if (with_restore_scn) {
347
    // restore scn which is specified by user
348
    restore_scn = src_scn;
349
  } else if (!with_restore_scn) {
350
    if (restore_using_compl_log) {
351
      SCN min_restore_scn = SCN::min_scn();
352
      ARRAY_FOREACH_X(tenant_path_array, i, cnt, OB_SUCC(ret)) {
353
        const ObString &tenant_path = tenant_path_array.at(i);
354
        storage::ObBackupDataStore store;
355
        share::ObBackupDest backup_dest;
356
        ObBackupFormatDesc format_desc;
357
        share::ObBackupSetFileDesc backup_set_file_desc;
358
        if (OB_FAIL(backup_dest.set(tenant_path.ptr()))) {
359
          LOG_WARN("fail to set backup dest", K(ret), K(tenant_path));
360
        } else if (OB_FAIL(store.init(backup_dest))) {
361
          LOG_WARN("failed to init backup store", K(ret), K(tenant_path));
362
        } else if (OB_FAIL(store.read_format_file(format_desc))) {
363
          LOG_WARN("failed to read format file", K(ret), K(store));
364
        } else if (ObBackupDestType::DEST_TYPE_BACKUP_DATA != format_desc.dest_type_) {
365
          LOG_INFO("skip log dir", K(tenant_path), K(format_desc));
366
        } else if (OB_FAIL(store.get_max_backup_set_file_info(passwd, backup_set_file_desc))) {
367
          LOG_WARN("fail to get backup set array", K(ret));
368
        } else {
369
          min_restore_scn = backup_set_file_desc.min_restore_scn_;
370
        }
371
      }
372
      if (OB_SUCC(ret)) {
373
        if (SCN::min_scn() == min_restore_scn) {
374
          ret = OB_ERR_UNEXPECTED;
375
          LOG_WARN("invalid min restore scn, do not find available backup tenant path to restore", K(ret), K(tenant_path_array));
376
        } else {
377
          restore_scn = min_restore_scn;
378
        }
379
      }
380
    } else if (!timestamp.empty()) {
381
      common::ObTimeZoneInfoWrap time_zone_wrap;
382
      if (OB_FAIL(get_backup_sys_time_zone_(tenant_path_array, time_zone_wrap))) {
383
        LOG_WARN("failed to get backup sys time zone", K(ret), K(tenant_path_array));
384
      } else if (OB_FAIL(convert_restore_timestamp_to_scn_(timestamp, time_zone_wrap, restore_scn))) {
385
        LOG_WARN("failed to convert restore timestamp to scn", K(ret));
386
      }
387
    } else {
388
      int64_t round_id = 0;
389
      int64_t piece_id = 0;
390
      SCN max_checkpoint_scn = SCN::min_scn();
391
      // restore to max checkpoint scn of log
392
      ARRAY_FOREACH_X(tenant_path_array, i, cnt, OB_SUCC(ret)) {
393
        const ObString &tenant_path = tenant_path_array.at(i);
394
        ObArchiveStore store;
395
        ObBackupDest dest;
396
        ObBackupFormatDesc format_desc;
397
        SCN cur_max_checkpoint_scn = SCN::min_scn();
398
        if (OB_FAIL(dest.set(tenant_path))) {
399
          LOG_WARN("fail to set dest", K(ret), K(tenant_path));
400
        } else if (OB_FAIL(store.init(dest))) {
401
          LOG_WARN("failed to init archive store", K(ret), K(tenant_path));
402
        } else if (OB_FAIL(store.read_format_file(format_desc))) {
403
          LOG_WARN("failed to read format file", K(ret), K(tenant_path));
404
        } else if (ObBackupDestType::TYPE::DEST_TYPE_ARCHIVE_LOG != format_desc.dest_type_) {
405
          LOG_INFO("skip data dir", K(tenant_path), K(format_desc));
406
        } else if (OB_FAIL(store.get_max_checkpoint_scn(format_desc.dest_id_, round_id, piece_id, cur_max_checkpoint_scn))) {
407
          LOG_WARN("fail to get max checkpoint scn", K(ret), K(format_desc));
408
        } else {
409
          max_checkpoint_scn = std::max(max_checkpoint_scn, cur_max_checkpoint_scn);
410
        }
411
      }
412
      if (OB_SUCC(ret)) {
413
        if (SCN::min_scn() == max_checkpoint_scn) {
414
          ret = OB_ERR_UNEXPECTED;
415
          LOG_WARN("invalid max checkpoint scn, no archvie tenant path", K(ret), K(tenant_path_array));
416
        } else {
417
          restore_scn = max_checkpoint_scn;
418
        }
419
      }
420
    }
421
  } 
422
  return ret;
423
}
424

425
int ObRestoreUtil::fill_encrypt_info_(
426
    const obrpc::ObPhysicalRestoreTenantArg &arg,
427
    share::ObPhysicalRestoreJob &job)
428
{
429
  int ret = OB_SUCCESS;
430
#ifdef OB_BUILD_TDE_SECURITY
431
  ObArenaAllocator allocator;
432
  ObArray<ObString> kms_path_array;
433
  ObString kms_dest_str;
434
  ObBackupDest dest;
435
  ObBackupIoAdapter util;
436
  bool is_exist = false;
437
  if (OB_FAIL(job.set_encrypt_key(arg.encrypt_key_))) {
438
    LOG_WARN("failed to fill encrypt key", KR(ret), K(arg));
439
  } else if (arg.kms_uri_.empty()) {
440
    // do nothing
441
  } else if (OB_FAIL(ObPhysicalRestoreUriParser::parse(arg.kms_uri_, allocator, kms_path_array))) {
442
    LOG_WARN("fail to parse uri", K(ret), K(arg));
443
  } else if (OB_FAIL(get_encrypt_backup_dest_format_str(kms_path_array, allocator, kms_dest_str))) {
444
    LOG_WARN("failed to convert uri", K(ret), K(arg), K(kms_path_array));
445
  } else if (OB_FAIL(dest.set(kms_dest_str))) {
446
    LOG_WARN("failed to set dest", K(ret));
447
  } else if (OB_FAIL(util.is_exist(dest.get_root_path(), dest.get_storage_info(), is_exist))) {
448
    LOG_WARN("failed to check file is exists", K(ret));
449
  } else if (OB_UNLIKELY(!is_exist)) {
450
    ret = OB_BACKUP_FILE_NOT_EXIST;
451
    LOG_WARN("kms backup file is not exist", K(ret));
452
  } else if (OB_FAIL(job.set_kms_dest(kms_dest_str))) {
453
    LOG_WARN("failed to copy kms dest", K(ret), K(arg));
454
  } else if (OB_FAIL(job.set_kms_encrypt_key(arg.kms_encrypt_key_))) {
455
    LOG_WARN("failed to fill kms encrypt key", KR(ret), K(arg));
456
  }
457
#endif
458
  return ret;
459
}
460

461
int ObRestoreUtil::get_restore_source(
462
    const bool restore_using_compl_log,
463
    const ObIArray<ObString>& tenant_path_array,
464
    const common::ObString &passwd_array,
465
    const SCN &restore_scn,
466
    ObIArray<ObRestoreBackupSetBriefInfo> &backup_set_list,
467
    ObIArray<ObRestoreLogPieceBriefInfo> &backup_piece_list,
468
    ObIArray<ObBackupPathString> &log_path_list)
469
{
470
  int ret = OB_SUCCESS;
471
  SCN restore_start_scn = SCN::min_scn();
472
  if (OB_FAIL(get_restore_backup_set_array_(tenant_path_array, passwd_array, restore_scn,
473
      restore_start_scn, backup_set_list))) {
474
    LOG_WARN("fail to get restore backup set array", K(ret), K(tenant_path_array), K(restore_scn));
475
  } else if (!restore_using_compl_log && OB_FAIL(get_restore_log_piece_array_(
476
      tenant_path_array, restore_start_scn, restore_scn, backup_piece_list, log_path_list))) {
477
    LOG_WARN("fail to get restore log piece array", K(ret), K(tenant_path_array), K(restore_start_scn),
478
        K(restore_scn));
479
  } else if (restore_using_compl_log && OB_FAIL(get_restore_log_array_for_complement_log_(
480
      backup_set_list, restore_start_scn, restore_scn, backup_piece_list, log_path_list))) {
481
    LOG_WARN("fail to get restore log piece array", K(ret), K(backup_set_list), K(restore_start_scn), K(restore_scn));
482
  } else if (backup_set_list.empty() || backup_piece_list.empty() || log_path_list.empty()) {
483
    ret = OB_ERR_UNEXPECTED;
484
    LOG_WARN("no backup set path or log piece can be used to restore", K(ret),
485
        K(tenant_path_array), K(backup_set_list), K(backup_piece_list), K(log_path_list), K(restore_start_scn),
486
        K(restore_scn));
487
  }
488
  return ret;
489
}
490

491
int ObRestoreUtil::check_restore_using_complement_log_(
492
    const ObIArray<ObString> &tenant_path_array,
493
    bool &restore_using_compl_log)
494
{
495
  int ret = OB_SUCCESS;
496
  restore_using_compl_log = true;
497
  if (tenant_path_array.empty()) {
498
    ret = OB_INVALID_ARGUMENT;
499
    LOG_WARN("invalid argument", K(ret), K(tenant_path_array));
500
  } else {
501
    ARRAY_FOREACH_X(tenant_path_array, i, cnt, OB_SUCC(ret)) {
502
      const ObString &tenant_path = tenant_path_array.at(i);
503
      storage::ObBackupDataStore store;
504
      share::ObBackupDest backup_dest;
505
      ObBackupFormatDesc format_desc;
506
      if (OB_FAIL(backup_dest.set(tenant_path.ptr()))) {
507
        LOG_WARN("fail to set backup dest", K(ret), K(tenant_path));
508
      } else if (OB_FAIL(store.init(backup_dest))) {
509
        LOG_WARN("failed to init backup store", K(ret), K(tenant_path));
510
      } else if (OB_FAIL(store.read_format_file(format_desc))) {
511
        LOG_WARN("failed to read format file", K(ret), K(store));
512
      } else if (ObBackupDestType::DEST_TYPE_ARCHIVE_LOG == format_desc.dest_type_) {
513
        restore_using_compl_log = false;
514
        LOG_INFO("not only contain backup data path", K(tenant_path), K(format_desc));
515
        break;
516
      }
517
    }
518
  }
519
  return ret;
520
}
521

522
int ObRestoreUtil::get_restore_backup_set_array_(
523
    const ObIArray<ObString> &tenant_path_array,
524
    const common::ObString &passwd_array,
525
    const SCN &restore_scn,
526
    SCN &restore_start_scn,
527
    ObIArray<ObRestoreBackupSetBriefInfo> &backup_set_list)
528
{
529
  int ret = OB_SUCCESS;
530
  if (tenant_path_array.empty()) {
531
    ret = OB_INVALID_ARGUMENT;
532
    LOG_WARN("invaldi argument", K(ret), K(tenant_path_array));
533
  } else {
534
    ARRAY_FOREACH_X(tenant_path_array, i, cnt, OB_SUCC(ret)) {
535
      const ObString &tenant_path = tenant_path_array.at(i);
536
      storage::ObBackupDataStore store;
537
      share::ObBackupDest backup_dest;
538
      ObBackupFormatDesc format_desc;
539
      if (OB_FAIL(backup_dest.set(tenant_path.ptr()))) {
540
        LOG_WARN("fail to set backup dest", K(ret), K(tenant_path));
541
      } else if (OB_FAIL(store.init(backup_dest))) {
542
        LOG_WARN("failed to init backup store", K(ret), K(tenant_path));
543
      } else if (OB_FAIL(store.read_format_file(format_desc))) {
544
        LOG_WARN("failed to read format file", K(ret), K(store));
545
      } else if (ObBackupDestType::DEST_TYPE_BACKUP_DATA != format_desc.dest_type_) {
546
        LOG_INFO("skip log dir", K(tenant_path), K(format_desc));
547
      } else if (!backup_set_list.empty()) {
548
        ret = OB_NOT_SUPPORTED;
549
        LOG_WARN("It is not support to restore from multiple tenant backup paths", K(ret));
550
        LOG_USER_ERROR(OB_NOT_SUPPORTED, "It is not support to restore from multiple tenant backup paths.");
551
      } else if (OB_FAIL(store.get_backup_set_array(passwd_array, restore_scn, restore_start_scn, backup_set_list))) {
552
        LOG_WARN("fail to get backup set array", K(ret));
553
      }
554
    }
555
  }
556
  return ret;
557
}
558

559
int ObRestoreUtil::get_restore_backup_piece_list_(
560
    const ObBackupDest &dest,
561
    const ObArray<share::ObRestoreLogPieceBriefInfo> &piece_array,
562
    ObIArray<ObRestoreLogPieceBriefInfo> &backup_piece_list)
563
{
564
  int ret = OB_SUCCESS; 
565
  if (!dest.is_valid()) {
566
    ret = OB_INVALID_ARGUMENT;
567
    LOG_WARN("dest is invalid", K(ret), K(dest));
568
  } else {
569
    for (int64_t j = 0; OB_SUCC(ret) && j < piece_array.count(); ++j) {
570
      const share::ObRestoreLogPieceBriefInfo &piece_path = piece_array.at(j);
571
      ObRestoreLogPieceBriefInfo backup_piece_path;
572
      backup_piece_path.piece_id_ = piece_path.piece_id_;
573
      backup_piece_path.start_scn_ = piece_path.start_scn_;
574
      backup_piece_path.checkpoint_scn_ = piece_path.checkpoint_scn_;
575
      ObBackupDest piece_dest;
576
      if (OB_FAIL(piece_dest.set(piece_path.piece_path_.ptr(), dest.get_storage_info()))) {
577
        LOG_WARN("fail to set piece dest", K(ret), K(piece_path), K(dest)); 
578
      } else if (OB_FAIL(piece_dest.get_backup_dest_str(backup_piece_path.piece_path_.ptr(), backup_piece_path.piece_path_.capacity()))) {
579
        LOG_WARN("fail to get piece dest str", K(ret), K(piece_dest));
580
      } else if (OB_FAIL(backup_piece_list.push_back(backup_piece_path))) {
581
        LOG_WARN("fail to push backup piece list", K(ret));
582
      }
583
    }
584
  }
585

586
  return ret;
587
}
588

589
int ObRestoreUtil::get_restore_backup_piece_list_(
590
    const ObBackupDest &dest,
591
    const ObArray<share::ObPieceKey> &piece_array,
592
    ObIArray<ObRestoreLogPieceBriefInfo> &backup_piece_list)
593
{
594
  int ret = OB_SUCCESS;
595
  if (!dest.is_valid()) {
596
    ret = OB_INVALID_ARGUMENT;
597
    LOG_WARN("dest is invalid", K(ret), K(dest));
598
  } else {
599
    for (int64_t j = 0; OB_SUCC(ret) && j < piece_array.count(); ++j) {
600
      const share::ObPieceKey &piece_key = piece_array.at(j);
601
      ObRestoreLogPieceBriefInfo backup_piece_path;
602
      backup_piece_path.piece_id_ = piece_key.piece_id_;
603
      ObBackupPath backup_path;
604
      ObBackupDest piece_dest;
605
      if (OB_FAIL(ObArchivePathUtil::get_piece_dir_path(dest, piece_key.dest_id_,
606
          piece_key.round_id_, piece_key.piece_id_, backup_path))) {
607
        LOG_WARN("failed to get piece dir path", K(ret), K(dest), K(piece_key));
608
      } else if (OB_FAIL(piece_dest.set(backup_path.get_ptr(), dest.get_storage_info()))) {
609
        LOG_WARN("fail to set piece dest", K(ret), K(backup_path), K(dest));
610
      } else if (OB_FAIL(piece_dest.get_backup_dest_str(backup_piece_path.piece_path_.ptr(), backup_piece_path.piece_path_.capacity()))) {
611
        LOG_WARN("fail to get piece dest str", K(ret), K(piece_dest));
612
      } else if (OB_FAIL(backup_piece_list.push_back(backup_piece_path))) {
613
        LOG_WARN("fail to push backup piece list", K(ret));
614
      }
615
    }
616
  }
617
  return ret;
618
}
619

620
int ObRestoreUtil::get_restore_log_path_list_(
621
    const ObBackupDest &dest,
622
    ObIArray<share::ObBackupPathString> &log_path_list)
623
{
624
  int ret = OB_SUCCESS;
625
  ObBackupPathString log_path;
626
  if (!dest.is_valid()) {
627
    ret = OB_INVALID_ARGUMENT;
628
    LOG_WARN("dest is invalid", K(ret), K(dest));
629
  } else if (OB_FAIL(dest.get_backup_dest_str(log_path.ptr(), log_path.capacity()))) {
630
    LOG_WARN("fail to get backup dest str", K(ret), K(dest));
631
  } else if (OB_FAIL(log_path_list.push_back(log_path))) {
632
    LOG_WARN("fail to push backup log path", K(ret), K(log_path));
633
  }
634
  return ret;
635
}
636

637
int ObRestoreUtil::get_restore_log_piece_array_(
638
    const ObIArray<ObString> &tenant_path_array,
639
    const SCN &restore_start_scn,
640
    const SCN &restore_end_scn,
641
    ObIArray<ObRestoreLogPieceBriefInfo> &backup_piece_list,
642
    ObIArray<share::ObBackupPathString> &log_path_list)
643
{
644
  int ret = OB_SUCCESS;
645
  ObArray<share::ObRestoreLogPieceBriefInfo> piece_array;
646
  if (tenant_path_array.empty()) {
647
    ret = OB_INVALID_ARGUMENT;
648
    LOG_WARN("invaldi argument", K(ret), K(tenant_path_array));
649
  } else {
650
    for (int64_t i = 0; OB_SUCC(ret) && i < tenant_path_array.count(); ++i) {
651
      piece_array.reset();
652
      const ObString &tenant_path = tenant_path_array.at(i);
653
      ObArchiveStore store;
654
      ObBackupDest dest;
655
      ObBackupFormatDesc format_desc;
656
      if (OB_FAIL(dest.set(tenant_path))) {
657
        LOG_WARN("fail to set dest", K(ret), K(tenant_path));
658
      } else if (OB_FAIL(store.init(dest))) {
659
        LOG_WARN("failed to init archive store", K(ret), K(tenant_path));
660
      } else if (OB_FAIL(store.read_format_file(format_desc))) {
661
        LOG_WARN("failed to read format file", K(ret), K(tenant_path));
662
      } else if (ObBackupDestType::TYPE::DEST_TYPE_ARCHIVE_LOG != format_desc.dest_type_) {
663
        LOG_INFO("skip data dir", K(tenant_path), K(format_desc));
664
      } else if (OB_FAIL(store.get_piece_paths_in_range(restore_start_scn, restore_end_scn, piece_array))) {
665
        LOG_WARN("fail to get restore pieces", K(ret), K(restore_start_scn), K(restore_end_scn));
666
      } else if (OB_FAIL(get_restore_log_path_list_(dest, log_path_list))) {
667
        LOG_WARN("fail to get restore log path list", K(ret), K(dest));
668
      } else if (OB_FAIL(get_restore_backup_piece_list_(dest, piece_array, backup_piece_list))){
669
        LOG_WARN("fail to get restore backup piece list", K(ret), K(dest), K(piece_array));
670
      }
671
    }
672
  }
673
  return ret;
674
}
675

676
int ObRestoreUtil::get_restore_log_array_for_complement_log_(
677
    const ObIArray<ObRestoreBackupSetBriefInfo> &backup_set_list,
678
    const share::SCN &restore_start_scn,
679
    const share::SCN &restore_end_scn,
680
    ObIArray<share::ObRestoreLogPieceBriefInfo> &backup_piece_list,
681
    ObIArray<share::ObBackupPathString> &log_path_list)
682
{
683
  int ret = OB_SUCCESS;
684
  if (backup_set_list.empty()) {
685
    ret = OB_INVALID_ARGUMENT;
686
    LOG_WARN("invaldi argument", K(ret), K(backup_set_list));
687
  } else {
688
    const ObRestoreBackupSetBriefInfo &info = backup_set_list.at(backup_set_list.count() - 1);
689
    ObBackupDest dest;
690
    ObBackupDest compl_dest;
691
    ObArchiveStore archive_store;
692
    ObArray<ObPieceKey> piece_array;
693
    if (OB_FAIL(dest.set(info.backup_set_path_.str()))) {
694
      LOG_WARN("failed to set backup set path", K(ret), K(info));
695
    } else if (OB_FAIL(ObBackupPathUtil::construct_backup_complement_log_dest(dest, compl_dest))) {
696
      LOG_WARN("failed to construct backup complement log dest", K(ret), K(dest), K(info));
697
    } else if (OB_FAIL(archive_store.init(compl_dest))) {
698
        LOG_WARN("failed to init archive store", K(ret), K(compl_dest));
699
    } else if (OB_FAIL(get_restore_log_path_list_(compl_dest, log_path_list))) {
700
      LOG_WARN("fail to get restore log path list", K(ret), K(dest));
701
    } else if (OB_FAIL(archive_store.get_all_piece_keys(piece_array))) {
702
        LOG_WARN("fail to get restore pieces", K(ret), K(restore_start_scn), K(restore_end_scn));
703
    } else if (OB_FAIL(get_restore_backup_piece_list_(compl_dest, piece_array, backup_piece_list))){
704
        LOG_WARN("fail to get restore backup piece list", K(ret), K(dest), K(piece_array));
705
    } else {
706
      LOG_INFO("get restore log path list", K(backup_set_list), K(log_path_list));
707
    }
708
  }
709
  return ret;
710
}
711

712
int ObRestoreUtil::do_fill_backup_path_(
713
    const ObIArray<ObRestoreBackupSetBriefInfo> &backup_set_list,
714
    const ObIArray<ObRestoreLogPieceBriefInfo> &backup_piece_list,
715
    const ObIArray<ObBackupPathString> &log_path_list,
716
    share::ObPhysicalRestoreJob &job)
717
{
718
  int ret = OB_SUCCESS;
719
  if (backup_set_list.empty() || backup_piece_list.empty()) {
720
    ret = OB_INVALID_ARGUMENT;
721
    LOG_WARN("invalid argument", K(ret), K(backup_set_list), K(backup_piece_list));
722
  } else {
723
    ObArray<share::ObBackupPiecePath> backup_piece_path_list;
724
    for (int64_t i = 0; OB_SUCC(ret) && i < backup_piece_list.count(); ++i) {
725
      if (OB_FAIL(backup_piece_path_list.push_back(backup_piece_list.at(i).piece_path_))) {
726
        LOG_WARN("failed to push backup piece", K(ret));
727
      }
728
    }
729
    if (OB_FAIL(ret)) {
730
    } else if (OB_FAIL(job.get_multi_restore_path_list().set(backup_set_list, backup_piece_path_list, log_path_list))) {
731
      LOG_WARN("failed to set mutli restore path list", KR(ret));
732
    }
733
  }
734
  return ret;
735
}
736

737
int ObRestoreUtil::do_fill_backup_info_(
738
    const share::ObBackupSetPath & backup_set_path,
739
    share::ObPhysicalRestoreJob &job)
740
{
741
  int ret = OB_SUCCESS;
742
  storage::ObBackupDataStore store;
743
  ObBackupDataLSAttrDesc ls_info;
744
  HEAP_VARS_2((ObExternBackupSetInfoDesc, backup_set_info),
745
    (ObExternTenantLocalityInfoDesc, locality_info)) {
746
    if (backup_set_path.is_empty()) {
747
      ret = OB_INVALID_ARGUMENT;
748
      LOG_WARN("invalid argument", K(ret), K(backup_set_path));
749
    } else if (OB_FAIL(store.init(backup_set_path.ptr()))) {
750
      LOG_WARN("fail to init mgr", K(ret));
751
    } else if (OB_FAIL(store.read_backup_set_info(backup_set_info))) {
752
      LOG_WARN("fail to read backup set info", K(ret));
753
    } else if (OB_FAIL(store.read_tenant_locality_info(locality_info))) {
754
      LOG_WARN("fail to read locality info", K(ret));
755
    } else if (!backup_set_info.is_valid()) {
756
      ret = OB_ERR_UNEXPECTED;
757
      LOG_WARN("invalid backup set file", K(ret), K(backup_set_info));
758
    } else if (OB_FAIL(store.read_ls_attr_info(backup_set_info.backup_set_file_.meta_turn_id_, ls_info))) {
759
      LOG_WARN("failed to read ls attr info", K(ret), K(backup_set_info));
760
    } else if (OB_FAIL(check_backup_set_version_match_(backup_set_info.backup_set_file_))) {
761
      LOG_WARN("failed to check backup set version match", K(ret));
762
    } else if (OB_FAIL(job.set_backup_tenant_name(locality_info.tenant_name_.ptr()))) {
763
      LOG_WARN("fail to set backup tenant name", K(ret), "tenant name", locality_info.tenant_name_);
764
    } else if (OB_FAIL(job.set_backup_cluster_name(locality_info.cluster_name_.ptr()))) {
765
      LOG_WARN("fail to set backup cluster name", K(ret), "cluster name", locality_info.cluster_name_);
766
    } else {
767
      job.set_source_data_version(backup_set_info.backup_set_file_.tenant_compatible_);
768
      job.set_source_cluster_version(backup_set_info.backup_set_file_.cluster_version_);
769
      job.set_compat_mode(locality_info.compat_mode_);
770
      job.set_backup_tenant_id(backup_set_info.backup_set_file_.tenant_id_);
771
      // becuase of no consistent scn in 4.1.x backup set, using ls_info.backup_scn to set the restore consisitent scn
772
      // ls_info.backup_scn is the default replayable scn when create restore tenant,
773
      // so using it as the consistet scn can also make recovery service work normally
774
      const SCN &scn = backup_set_info.backup_set_file_.tenant_compatible_ < DATA_VERSION_4_2_0_0
775
                     ? ls_info.backup_scn_
776
                     : backup_set_info.backup_set_file_.consistent_scn_;
777
      job.set_consistent_scn(scn);
778
    }
779
  }
780
  return ret;
781
}
782

783
int ObRestoreUtil::check_backup_set_version_match_(share::ObBackupSetFileDesc &backup_file_desc)
784
{
785
  int ret = OB_SUCCESS;
786
  uint64_t data_version = 0;
787
  if (!backup_file_desc.is_valid()) {
788
    ret = OB_INVALID_ARGUMENT;
789
    LOG_WARN("invalid argument", K(ret), K(backup_file_desc));
790
  } else if (!ObUpgradeChecker::check_cluster_version_exist(backup_file_desc.cluster_version_)) {
791
    ret = OB_INVALID_ARGUMENT;
792
    LOG_WARN("cluster version are not exist", K(ret));
793
    LOG_USER_ERROR(OB_INVALID_ARGUMENT, "cluster version of backup set");
794
  } else if (!ObUpgradeChecker::check_data_version_exist(backup_file_desc.tenant_compatible_)) {
795
    ret = OB_INVALID_ARGUMENT;
796
    LOG_WARN("data version are not exist", K(ret));
797
    LOG_USER_ERROR(OB_INVALID_ARGUMENT, "tenant compatible of backup set");
798
  } else if (GET_MIN_CLUSTER_VERSION() < backup_file_desc.cluster_version_) {
799
    ret = OB_OP_NOT_ALLOW;
800
    LOG_WARN("restore from higher cluster version is not allowed", K(ret));
801
    LOG_USER_ERROR(OB_OP_NOT_ALLOW, "restore from higher cluster version is");
802
  } else if (OB_FAIL(ObUpgradeChecker::get_data_version_by_cluster_version(GET_MIN_CLUSTER_VERSION(), data_version))) {
803
    LOG_WARN("failed to get data version", K(ret));
804
  } else if (data_version < backup_file_desc.tenant_compatible_) {
805
    ret = OB_OP_NOT_ALLOW;
806
    LOG_WARN("restore from higher data version is not allowed", K(ret), K(data_version), K(backup_file_desc.tenant_compatible_));
807
    LOG_USER_ERROR(OB_OP_NOT_ALLOW, "restore from higher data version is");
808
  } else if (backup_file_desc.tenant_compatible_ < DATA_VERSION_4_1_0_0 && data_version >= DATA_VERSION_4_1_0_0) {
809
    ret = OB_OP_NOT_ALLOW;
810
    LOG_WARN("restore from version 4.0 is not allowd", K(ret), K(backup_file_desc.tenant_compatible_), K(data_version));
811
    LOG_USER_ERROR(OB_OP_NOT_ALLOW, "restore from version 4.0 is");
812
  }
813
  return ret;
814
}
815

816
int ObRestoreUtil::recycle_restore_job(const uint64_t tenant_id,
817
                               common::ObMySQLProxy &sql_proxy,
818
                               const ObPhysicalRestoreJob &job_info)
819
{
820
  int ret = OB_SUCCESS;
821
  ObMySQLTransaction trans;
822
  const int64_t job_id = job_info.get_job_id();
823
  const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
824
  if (OB_UNLIKELY(!is_user_tenant(tenant_id))) {
825
    ret = OB_INVALID_ARGUMENT;
826
    LOG_WARN("invalid argument", KR(ret), K(exec_tenant_id));
827
  } else if (OB_FAIL(trans.start(&sql_proxy, exec_tenant_id))) {
828
    LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
829
  } else {
830
    ObPhysicalRestoreTableOperator restore_op;
831
    if (OB_FAIL(restore_op.init(&trans, tenant_id, share::OBCG_STORAGE /*group_id*/))) {
832
      LOG_WARN("failed to init restore op", KR(ret), K(tenant_id));
833
    } else if (OB_FAIL(restore_op.remove_job(job_id))) {
834
      LOG_WARN("failed to remove job", KR(ret), K(tenant_id), K(job_id));
835
    } else {
836
      ObHisRestoreJobPersistInfo history_info;
837
      ObRestoreProgressPersistInfo restore_progress;
838
      ObRestorePersistHelper persist_helper;
839
      ObRestoreJobPersistKey key;
840
      common::ObArray<share::ObLSRestoreProgressPersistInfo> ls_restore_progress_infos;
841
      key.tenant_id_ = tenant_id;
842
      key.job_id_ = job_info.get_job_id();
843
      if (OB_FAIL(persist_helper.init(tenant_id, share::OBCG_STORAGE /*group_id*/))) {
844
        LOG_WARN("failed to init persist helper", KR(ret), K(tenant_id));
845
      } else if (OB_FAIL(persist_helper.get_restore_process(
846
                     trans, key, restore_progress))) {
847
        LOG_WARN("failed to get restore progress", KR(ret), K(key));
848
      } else if (OB_FAIL(history_info.init_with_job_process(
849
                     job_info, restore_progress))) {
850
        LOG_WARN("failed to init history", KR(ret), K(job_info), K(restore_progress));
851
      } else if (history_info.is_restore_success()) { // restore succeed, no need to record comment
852
      } else if (OB_FAIL(persist_helper.get_all_ls_restore_progress(trans, ls_restore_progress_infos))) {
853
        LOG_WARN("failed to get ls restore progress", K(ret));
854
      } else {
855
        int64_t pos = 0;
856
        ARRAY_FOREACH_X(ls_restore_progress_infos, i, cnt, OB_SUCC(ret)) {
857
          const ObLSRestoreProgressPersistInfo &ls_restore_info = ls_restore_progress_infos.at(i);
858
          if (ls_restore_info.status_.is_failed()) {
859
            if (OB_FAIL(databuff_printf(history_info.comment_.ptr(), history_info.comment_.capacity(), pos,
860
                                        "%s;", ls_restore_info.comment_.ptr()))) {
861
              if (OB_SIZE_OVERFLOW == ret) {
862
                ret = OB_SUCCESS;
863
                break;
864
              } else {
865
                LOG_WARN("failed to databuff printf comment", K(ret));
866
              }
867
            }
868
          }
869
        }
870
      }
871
      if (OB_FAIL(ret)) {
872
      } else if (OB_FAIL(persist_helper.insert_restore_job_history(
873
                     trans, history_info))) {
874
        LOG_WARN("failed to insert restore job history", KR(ret), K(history_info));
875
      }
876
    }
877
  }
878
  if (trans.is_started()) {
879
    int tmp_ret = OB_SUCCESS;
880
    if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
881
      ret = OB_SUCC(ret) ? tmp_ret : ret;
882
      LOG_WARN("failed to end trans", KR(ret), K(tmp_ret));
883
    }
884
  }
885
  return ret;
886
}
887

888
int ObRestoreUtil::recycle_restore_job(common::ObMySQLProxy &sql_proxy,
889
                          const share::ObPhysicalRestoreJob &job_info,
890
                          const ObHisRestoreJobPersistInfo &history_info)
891
{
892
  int ret = OB_SUCCESS;
893
  ObMySQLTransaction trans;
894
  const int64_t job_id = job_info.get_job_id();
895
  const int64_t tenant_id = job_info.get_restore_key().tenant_id_;
896
  const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
897
  ObRestorePersistHelper persist_helper;
898
  if (OB_UNLIKELY(OB_INVALID_TENANT_ID == exec_tenant_id)) {
899
    ret = OB_INVALID_ARGUMENT;
900
    LOG_WARN("invalid argument", KR(ret), K(exec_tenant_id));
901
  } else if (OB_FAIL(trans.start(&sql_proxy, exec_tenant_id))) {
902
    LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
903
  } else if (OB_FAIL(persist_helper.init(tenant_id, share::OBCG_STORAGE /*group_id*/))) {
904
    LOG_WARN("failed to init persist helper", KR(ret));
905
   } else if (OB_FAIL(persist_helper.insert_restore_job_history(trans, history_info))) {
906
    LOG_WARN("failed to insert restore job history", KR(ret), K(history_info));
907
  } else {
908
    ObPhysicalRestoreTableOperator restore_op;
909
    if (OB_FAIL(restore_op.init(&trans, tenant_id, share::OBCG_STORAGE /*group_id*/))) {
910
      LOG_WARN("failed to init restore op", KR(ret), K(tenant_id));
911
    } else if (OB_FAIL(restore_op.remove_job(job_id))) {
912
      LOG_WARN("failed to remove job", KR(ret), K(tenant_id), K(job_id));
913
    } else if (is_sys_tenant(tenant_id)) {
914
      //finish __all_rootservice_job
915
      int tmp_ret = PHYSICAL_RESTORE_SUCCESS == job_info.get_status() ? OB_SUCCESS : OB_ERROR;
916
      if (OB_FAIL(RS_JOB_COMPLETE(job_id, tmp_ret, trans))) {
917
        LOG_WARN("failed to complete job", KR(ret), K(job_id));
918
      }
919
    }
920
  }
921
  if (trans.is_started()) {
922
    int tmp_ret = OB_SUCCESS;
923
    if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
924
      ret = OB_SUCC(ret) ? tmp_ret : ret;
925
      LOG_WARN("failed to end trans", KR(ret), K(tmp_ret));
926
    }
927
  }
928
  return ret;
929
}
930
int ObRestoreUtil::get_user_restore_job_history(common::ObISQLClient &sql_client,
931
                                          const uint64_t user_tenant_id,
932
                                         const uint64_t initiator_tenant_id,
933
                                         const int64_t initiator_job_id,
934
                                         ObHisRestoreJobPersistInfo &history_info)
935
{
936
  int ret = OB_SUCCESS;
937
  if (OB_UNLIKELY(!is_user_tenant(user_tenant_id)
938
                  || OB_INVALID_TENANT_ID == initiator_tenant_id
939
                  || 0 > initiator_job_id)) {
940
    ret = OB_INVALID_ARGUMENT;
941
    LOG_WARN("invalid argument", KR(ret), K(user_tenant_id),
942
    K(initiator_job_id), K(initiator_tenant_id));
943
  } else {
944
    ObRestorePersistHelper user_persist_helper;
945
    if (OB_FAIL(user_persist_helper.init(user_tenant_id, share::OBCG_STORAGE /*group_id*/))) {
946
      LOG_WARN("failed to init persist helper", KR(ret), K(user_tenant_id));
947
    } else if (OB_FAIL(user_persist_helper.get_restore_job_history(
948
                   sql_client, initiator_job_id, initiator_tenant_id,
949
                   history_info))) {
950
      LOG_WARN("failed to get restore progress", KR(ret), K(initiator_job_id), K(initiator_tenant_id));
951
    }
952
  }
953
  return ret;
954
}
955

956
int ObRestoreUtil::get_restore_ls_palf_base_info(
957
    const share::ObPhysicalRestoreJob &job_info, const ObLSID &ls_id,
958
    palf::PalfBaseInfo &palf_base_info)
959
{
960
  int ret = OB_SUCCESS;
961
  storage::ObBackupDataStore store;
962
  const common::ObSArray<share::ObBackupSetPath> &backup_set_array = 
963
    job_info.get_multi_restore_path_list().get_backup_set_path_list();
964
  const int64_t idx = backup_set_array.count() - 1;
965
  storage::ObLSMetaPackage ls_meta_package;
966
  if (idx < 0) {
967
    ret = OB_ERR_UNEXPECTED;
968
    LOG_WARN("backup_set_array can't empty", KR(ret), K(job_info));
969
  } else if (OB_FAIL(store.init(backup_set_array.at(idx).ptr()))) {
970
    LOG_WARN("fail to init backup data store", KR(ret));
971
  } else if (OB_FAIL(store.read_ls_meta_infos(ls_id, ls_meta_package))) {
972
    LOG_WARN("fail to read backup set info", KR(ret));
973
  } else if (!ls_meta_package.is_valid()) {
974
    ret = OB_INVALID_ARGUMENT;
975
    LOG_WARN("invalid backup set info", KR(ret), K(ls_meta_package));
976
  } else {
977
    palf_base_info = ls_meta_package.palf_meta_;
978
    LOG_INFO("[RESTORE] get restore ls palf base info", K(palf_base_info));
979
  }
980
  return ret;
981
}
982

983
int ObRestoreUtil::check_physical_restore_finish(
984
    common::ObISQLClient &proxy, const int64_t job_id, bool &is_finish, bool &is_failed) {
985
  int ret = OB_SUCCESS;
986
  is_failed = false;
987
  is_finish = false;
988
  ObSqlString sql;
989
  char status_str[OB_DEFAULT_STATUS_LENTH] = "";
990
  int64_t real_length = 0;
991
  HEAP_VAR(ObMySQLProxy::ReadResult, res) {
992
    common::sqlclient::ObMySQLResult *result = nullptr;
993
    int64_t cnt = 0;
994
    if (OB_FAIL(sql.assign_fmt("select status from %s where tenant_id=%lu and job_id=%ld",
995
        OB_ALL_RESTORE_JOB_HISTORY_TNAME, OB_SYS_TENANT_ID, job_id))) {
996
      LOG_WARN("failed to assign fmt", K(ret));
997
    } else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
998
      LOG_WARN("failed to exec sql", K(ret), K(sql));
999
    } else if (OB_ISNULL(result = res.get_result())) {
1000
      ret = OB_ERR_UNEXPECTED;
1001
      LOG_WARN("result is null", K(ret));
1002
    } else if (OB_FAIL(result->next())) {
1003
      if (OB_ITER_END == ret) {
1004
        ret = OB_SUCCESS;
1005
      } else {
1006
        LOG_WARN("failed to get next", K(ret), K(job_id));
1007
      }
1008
    } else {
1009
      EXTRACT_STRBUF_FIELD_MYSQL(*result, OB_STR_STATUS, status_str, OB_DEFAULT_STATUS_LENTH, real_length);
1010
      if (OB_SUCC(ret)) {
1011
        is_finish = true;
1012
        is_failed = 0 == STRCMP(status_str, "FAIL");
1013
      }
1014
    }
1015
  }
1016
  return ret;
1017
}
1018

1019
int ObRestoreUtil::get_restore_job_comment(
1020
    common::ObISQLClient &proxy, const int64_t job_id, char *buf, const int64_t buf_size)
1021
{
1022
  int ret = OB_SUCCESS;
1023
  ObSqlString sql;
1024
  int real_length = 0;
1025
  HEAP_VAR(ObMySQLProxy::ReadResult, res) {
1026
    common::sqlclient::ObMySQLResult *result = nullptr;
1027
    int64_t cnt = 0;
1028
    if (OB_FAIL(sql.assign_fmt("select comment from %s where tenant_id=%lu and job_id=%ld",
1029
        OB_ALL_RESTORE_JOB_HISTORY_TNAME, OB_SYS_TENANT_ID, job_id))) {
1030
      LOG_WARN("failed to assign fmt", K(ret));
1031
    } else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
1032
      LOG_WARN("failed to exec sql", K(ret), K(sql));
1033
    } else if (OB_ISNULL(result = res.get_result())) {
1034
      ret = OB_ERR_UNEXPECTED;
1035
      LOG_WARN("result is null", K(ret));
1036
    } else if (OB_FAIL(result->next())) {
1037
      if (OB_ITER_END == ret) {
1038
        ret = OB_ENTRY_NOT_EXIST;
1039
        LOG_WARN("restore job comment not exist", K(ret));
1040
      } else {
1041
        LOG_WARN("failed to get next", K(ret), K(job_id));
1042
      }
1043
    } else {
1044
      EXTRACT_STRBUF_FIELD_MYSQL(*result, OB_STR_COMMENT, buf, buf_size, real_length);
1045
    }
1046
  }
1047
  return ret;
1048
}
1049

1050
int ObRestoreUtil::get_restore_tenant_cpu_count(
1051
    common::ObMySQLProxy &proxy, const uint64_t tenant_id, double &cpu_count)
1052
{
1053
  int ret = OB_SUCCESS;
1054
  share::ObUnitTableOperator unit_op;
1055
  common::ObArray<share::ObResourcePool> pools;
1056
  common::ObArray<uint64_t> unit_config_ids;
1057
  common::ObArray<ObUnitConfig> configs;
1058
  if (OB_FAIL(unit_op.init(proxy))) {
1059
    LOG_WARN("failed to init proxy", K(ret));
1060
  } else if (OB_FAIL(unit_op.get_resource_pools(tenant_id, pools))) {
1061
    LOG_WARN("failed to get resource pool", K(ret), K(tenant_id));
1062
  }
1063
  ARRAY_FOREACH(pools, i) {
1064
    if (OB_FAIL(unit_config_ids.push_back(pools.at(i).unit_config_id_))) {
1065
      LOG_WARN("failed to push back unit config", K(ret));
1066
    }
1067
  }
1068
  if (FAILEDx(unit_op.get_unit_configs(unit_config_ids, configs))) {
1069
    LOG_WARN("failed to get unit configs", K(ret));
1070
  }
1071
  double max_cpu = OB_MAX_CPU_NUM;
1072
  ARRAY_FOREACH(configs, i) {
1073
    max_cpu = std::min(max_cpu, configs.at(i).max_cpu());
1074
  }
1075
  if (OB_SUCC(ret)) {
1076
    cpu_count = max_cpu;
1077
  }
1078
  return ret;
1079
}
1080

1081
int ObRestoreUtil::convert_restore_timestamp_to_scn_(
1082
    const ObString &timestamp,
1083
    const common::ObTimeZoneInfoWrap &time_zone_wrap,
1084
    share::SCN &scn)
1085
{
1086
  int ret = OB_SUCCESS;
1087
  uint64_t scn_value = 0;
1088
  const ObTimeZoneInfo *time_zone_info = time_zone_wrap.get_time_zone_info();
1089
  if (timestamp.empty() || !time_zone_wrap.is_valid()) {
1090
    ret = OB_INVALID_ARGUMENT;
1091
    LOG_WARN("invalid time zone wrap", K(ret));
1092
  } else if (OB_FAIL(ObTimeConverter::str_to_scn_value(timestamp, time_zone_info, time_zone_info, ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT, true/*oracle mode*/, scn_value))) {
1093
    LOG_WARN("failed to str to scn value", K(ret), K(timestamp), K(time_zone_info));
1094
  } else if (OB_FAIL(scn.convert_for_sql(scn_value))) {
1095
    LOG_WARN("failed to convert for sql scn", K(ret), K(scn_value));
1096
  }
1097
  return ret;
1098
}
1099

1100
int ObRestoreUtil::get_backup_sys_time_zone_(
1101
    const ObIArray<ObString> &tenant_path_array,
1102
    common::ObTimeZoneInfoWrap &time_zone_wrap)
1103
{
1104
  int ret = OB_SUCCESS;
1105
  ARRAY_FOREACH_X(tenant_path_array, i, cnt, OB_SUCC(ret)) {
1106
    const ObString &tenant_path = tenant_path_array.at(i);
1107
    storage::ObBackupDataStore store;
1108
    share::ObBackupDest backup_dest;
1109
    ObBackupFormatDesc format_desc;
1110
    if (OB_FAIL(backup_dest.set(tenant_path.ptr()))) {
1111
      LOG_WARN("fail to set backup dest", K(ret), K(tenant_path));
1112
    } else if (OB_FAIL(store.init(backup_dest))) {
1113
      LOG_WARN("failed to init backup store", K(ret), K(tenant_path));
1114
    } else if (OB_FAIL(store.read_format_file(format_desc))) {
1115
      LOG_WARN("failed to read format file", K(ret), K(store));
1116
    } else if (ObBackupDestType::DEST_TYPE_BACKUP_DATA != format_desc.dest_type_) {
1117
      LOG_INFO("skip log dir", K(tenant_path), K(format_desc));
1118
    } else if (OB_FAIL(store.get_backup_sys_time_zone_wrap(time_zone_wrap))) {
1119
      LOG_WARN("fail to get locality_info", K(ret));
1120
    } else {
1121
      break;
1122
    }
1123
  }
1124
  return ret;
1125
}
1126

1127
ObRestoreFailureChecker::ObRestoreFailureChecker()
1128
  : is_inited_(false),
1129
    job_()
1130
{
1131
}
1132

1133
ObRestoreFailureChecker::~ObRestoreFailureChecker()
1134
{
1135
}
1136

1137
int ObRestoreFailureChecker::init(const share::ObPhysicalRestoreJob &job)
1138
{
1139
  int ret = OB_SUCCESS;
1140
  if (IS_INIT) {
1141
    ret = OB_INIT_TWICE;
1142
    LOG_WARN("restore failure checker init twice", K(ret));
1143
  } else if (!job.is_valid()) {
1144
    ret = OB_INVALID_ARGUMENT;
1145
    LOG_WARN("get invalid arg", K(ret), K(job));
1146
  } else if (OB_FAIL(job_.assign(job))) {
1147
    LOG_WARN("failed to assign job", K(ret), K(job));
1148
  } else {
1149
    is_inited_ = true;
1150
  }
1151
  return ret;
1152
}
1153

1154
int ObRestoreFailureChecker::check_is_concurrent_with_clean(bool &is_concurrent_with_clean)
1155
{
1156
  int ret = OB_SUCCESS;
1157
  is_concurrent_with_clean = false;
1158
  if (IS_NOT_INIT) {
1159
    ret = OB_NOT_INIT;
1160
    LOG_WARN("[RESTORE_FAILURE_CHECKER]restore failure checker not do init", K(ret));
1161
  } else if (OB_FAIL(loop_path_list_(job_, is_concurrent_with_clean))) {
1162
    LOG_WARN("failed to loop path list", K(ret), K_(job));
1163
  }
1164
  FLOG_INFO("[RESTORE_FAILURE_CHECKER]check is concurrent with clean", K(ret), K(is_concurrent_with_clean), K_(job));
1165
  return ret;
1166
}
1167

1168
int ObRestoreFailureChecker::loop_path_list_(const share::ObPhysicalRestoreJob &job, bool &has_been_cleaned)
1169
{
1170
  int ret = OB_SUCCESS;
1171
  has_been_cleaned = false;
1172
  ObBackupDest backup_tenant_dest;
1173
  const ObPhysicalRestoreBackupDestList& list = job.get_multi_restore_path_list();
1174
  const common::ObSArray<share::ObBackupSetPath> &backup_set_path_list = list.get_backup_set_path_list();
1175
  const common::ObSArray<share::ObBackupPiecePath> &backup_piece_path_list = list.get_backup_piece_path_list();
1176

1177
  ARRAY_FOREACH_X(backup_set_path_list, idx, cnt, OB_SUCC(ret) && !has_been_cleaned) {
1178
    backup_tenant_dest.reset();
1179
    const share::ObBackupSetPath &backup_set_path = backup_set_path_list.at(idx);
1180
    bool is_exist = true;
1181
    if (OB_FAIL(backup_tenant_dest.set(backup_set_path.ptr()))) {
1182
      LOG_WARN("failed to set backup tenant dest", K(ret), K(backup_set_path));
1183
    } else if (OB_FAIL(check_tenant_backup_set_infos_path_exist_(backup_tenant_dest, is_exist))) {
1184
      LOG_WARN("failed to check tenant backup set infos path exist", K(ret), K(backup_tenant_dest));
1185
    } else {
1186
      has_been_cleaned = !is_exist;
1187
    }
1188
  }
1189

1190
  ARRAY_FOREACH_X(backup_piece_path_list, idx, cnt, OB_SUCC(ret) && !has_been_cleaned) {
1191
    backup_tenant_dest.reset();
1192
    const share::ObBackupPiecePath &backup_piece_path = backup_piece_path_list.at(idx);
1193
    bool is_exist = true;
1194
    bool is_empty = false;
1195
    if (OB_FAIL(backup_tenant_dest.set(backup_piece_path.ptr()))) {
1196
      LOG_WARN("failed to set backup tenant dest", K(ret), K(backup_piece_path));
1197
    } else if (OB_FAIL(check_tenant_archive_piece_infos_path_exist_(backup_tenant_dest, is_exist))) {
1198
      LOG_WARN("failed to check archive piece infos path exist", K(ret), K(backup_tenant_dest));
1199
    } else if (OB_FAIL(check_checkpoint_dir_emtpy_(backup_tenant_dest, is_empty))) {
1200
      LOG_WARN("failed to check checkpoint dir empty", K(ret), K(backup_tenant_dest));
1201
    } else {
1202
      has_been_cleaned = !is_exist && is_empty;
1203
    }
1204
  }
1205
  return ret;
1206
}
1207

1208
// single_backup_set_info
1209
int ObRestoreFailureChecker::check_tenant_backup_set_infos_path_exist_(
1210
    const share::ObBackupDest &backup_set_dest,
1211
    bool &is_exist)
1212
{
1213
  int ret = OB_SUCCESS;
1214
  is_exist = false;
1215
  ObBackupPath backup_path;
1216
  if (OB_FAIL(ObBackupPathUtil::get_backup_set_info_path(backup_set_dest, backup_path))) {
1217
    LOG_WARN("failed to get backup set info path", K(ret), K(backup_set_dest));
1218
  } else if (OB_FAIL(check_path_exist_(backup_path, backup_set_dest.get_storage_info(), is_exist))) {
1219
    LOG_WARN("failed to check path exist", K(ret));
1220
  }
1221
  return ret;
1222
}
1223

1224
// tenant_archive_piece_infos
1225
int ObRestoreFailureChecker::check_tenant_archive_piece_infos_path_exist_(
1226
    const share::ObBackupDest &backup_set_dest,
1227
    bool &is_exist)
1228
{
1229
  int ret = OB_SUCCESS;
1230
  is_exist = false;
1231
  ObBackupPath backup_path;
1232
  if (OB_FAIL(ObArchivePathUtil::get_tenant_archive_piece_infos_file_path(backup_set_dest, backup_path))) {
1233
    LOG_WARN("failed to get tenant archive piece infos file path", K(ret), K(backup_set_dest));
1234
  } else if (OB_FAIL(check_path_exist_(backup_path, backup_set_dest.get_storage_info(), is_exist))) {
1235
    LOG_WARN("failed to check path exist", K(ret));
1236
  }
1237
  return ret;
1238
}
1239

1240
int ObRestoreFailureChecker::check_checkpoint_dir_emtpy_(
1241
    const share::ObBackupDest &backup_tenant_dest,
1242
    bool &is_empty)
1243
{
1244
  int ret = OB_SUCCESS;
1245
  is_empty = false;
1246
  ObBackupPath backup_path;
1247
  if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_dir_path(backup_tenant_dest, backup_path))) {
1248
    LOG_WARN("failed to get tenant archive piece infos file path", K(ret), K(backup_tenant_dest));
1249
  } else if (OB_FAIL(check_dir_empty_(backup_path, backup_tenant_dest.get_storage_info(), is_empty))) {
1250
    LOG_WARN("failed to check dir empty", K(ret));
1251
  }
1252
  return ret;
1253
}
1254

1255
int ObRestoreFailureChecker::check_path_exist_(
1256
    const share::ObBackupPath &backup_path,
1257
    const share::ObBackupStorageInfo *storage_info,
1258
    bool &is_exist)
1259
{
1260
  int ret = OB_SUCCESS;
1261
  is_exist = false;
1262
  ObBackupIoAdapter util;
1263
  if (OB_FAIL(util.is_exist(backup_path.get_ptr(), storage_info, is_exist))) {
1264
    LOG_WARN("failed to check is exist", K(ret));
1265
  }
1266
  return ret;
1267
}
1268

1269
int ObRestoreFailureChecker::check_dir_empty_(
1270
    const share::ObBackupPath &backup_path,
1271
    const share::ObBackupStorageInfo *storage_info,
1272
    bool &is_empty)
1273
{
1274
  int ret = OB_SUCCESS;
1275
  is_empty = false;
1276
  ObBackupIoAdapter util;
1277
  if (OB_FAIL(util.is_empty_directory(backup_path.get_ptr(), storage_info, is_empty))) {
1278
    LOG_WARN("fail to init store", K(ret), K(backup_path));
1279
  } else {
1280
    LOG_INFO("is empty dir", K(backup_path), K(is_empty));
1281
  }
1282
  return ret;
1283
}
1284

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.