oceanbase
1283 строки · 54.1 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS_RESTORE
14
15#include "ob_restore_util.h"
16#include "lib/lock/ob_mutex.h"
17#include "share/restore/ob_restore_uri_parser.h"
18#include "share/schema/ob_schema_mgr.h"
19#include "share/schema/ob_schema_getter_guard.h"
20#include "share/backup/ob_backup_struct.h"
21#include "share/backup/ob_backup_io_adapter.h"
22#include "share/backup/ob_backup_path.h"
23#include "rootserver/ob_rs_event_history_table_operator.h"
24#include "storage/backup/ob_backup_restore_util.h"
25#include "share/backup/ob_archive_store.h"
26#include "storage/backup/ob_backup_data_store.h"
27#include "share/restore/ob_restore_persist_helper.h"//ObRestorePersistHelper ObRestoreProgressPersistInfo
28#include "logservice/palf/palf_base_info.h"//PalfBaseInfo
29#include "storage/ls/ob_ls_meta_package.h"//ls_meta
30#include "share/backup/ob_archive_path.h"
31#include "share/ob_upgrade_utils.h"
32#include "share/ob_unit_table_operator.h"
33
34using namespace oceanbase::common;
35using namespace oceanbase;
36using namespace oceanbase::share;
37using namespace oceanbase::share::schema;
38using namespace oceanbase::rootserver;
39
40/*-------------- physical restore --------------------------*/
41int ObRestoreUtil::fill_physical_restore_job(
42const int64_t job_id,
43const obrpc::ObPhysicalRestoreTenantArg &arg,
44ObPhysicalRestoreJob &job)
45{
46int ret = OB_SUCCESS;
47
48if (job_id < 0 || !arg.is_valid()) {
49ret = OB_INVALID_ARGUMENT;
50LOG_WARN("invalid arg", K(ret), K(job_id), K(arg));
51} else {
52job.reset();
53job.init_restore_key(OB_SYS_TENANT_ID, job_id);
54job.set_status(PhysicalRestoreStatus::PHYSICAL_RESTORE_CREATE_TENANT);
55job.set_tenant_name(arg.tenant_name_);
56job.set_initiator_job_id(arg.initiator_job_id_);
57job.set_initiator_tenant_id(arg.initiator_tenant_id_);
58if (OB_FAIL(job.set_description(arg.description_))) {
59LOG_WARN("fail to set description", K(ret));
60}
61
62// check restore option
63if (OB_SUCC(ret)) {
64if (OB_FAIL(ObPhysicalRestoreOptionParser::parse(arg.restore_option_, job))) {
65LOG_WARN("fail to parse restore_option", K(ret), K(arg), K(job_id));
66} else if (OB_FAIL(job.set_restore_option(arg.restore_option_))){
67LOG_WARN("failed to set restore option", KR(ret), K(arg));
68} else if (job.get_kms_encrypt()) {
69if (OB_FAIL(job.set_kms_info(arg.kms_info_))) {
70LOG_WARN("failed to fill kms info", KR(ret), K(arg));
71}
72}
73}
74
75if (OB_SUCC(ret)) {
76if (OB_FAIL(fill_backup_info_(arg, job))) {
77LOG_WARN("failed to fill backup info", KR(ret), K(arg), K(job));
78}
79}
80
81if (OB_SUCC(ret)) {
82if (OB_FAIL(fill_encrypt_info_(arg, job))) {
83LOG_WARN("failed to fill encrypt info", KR(ret), K(arg), K(job));
84}
85}
86
87if (FAILEDx(job.set_passwd_array(arg.passwd_array_))) {
88LOG_WARN("failed to copy passwd array", K(ret), K(arg));
89}
90
91if (OB_SUCC(ret)) {
92for (int64_t i = 0; OB_SUCC(ret) && i < arg.table_items_.count(); i++) {
93const obrpc::ObTableItem &item = arg.table_items_.at(i);
94if (OB_FAIL(job.get_white_list().add_table_item(item))) {
95LOG_WARN("fail to add table item", KR(ret), K(item));
96}
97}
98}
99}
100
101LOG_INFO("finish fill_physical_restore_job", K(job_id), K(arg), K(job));
102return ret;
103}
104
105int ObRestoreUtil::record_physical_restore_job(
106common::ObISQLClient &sql_client,
107const ObPhysicalRestoreJob &job)
108{
109int ret = OB_SUCCESS;
110if (!job.is_valid()) {
111ret = OB_INVALID_ARGUMENT;
112LOG_WARN("invalid arg", K(ret), K(job));
113} else {
114bool has_job = false;
115ObPhysicalRestoreTableOperator restore_op;
116if (OB_FAIL(check_has_physical_restore_job(sql_client,
117job.get_tenant_name(),
118has_job))) {
119LOG_WARN("fail to check if job exist", K(ret), K(job));
120} else if (has_job) {
121ret = OB_RESTORE_IN_PROGRESS;
122LOG_WARN("restore tenant job already exist", K(ret), K(job));
123} else if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) {
124LOG_WARN("fail init restore op", K(ret));
125} else if (OB_FAIL(restore_op.insert_job(job))) {
126LOG_WARN("fail insert job and partitions", K(ret), K(job));
127}
128}
129return ret;
130}
131
132int ObRestoreUtil::insert_user_tenant_restore_job(
133common::ObISQLClient &sql_client,
134const ObString &tenant_name,
135const int64_t user_tenant_id)
136{
137int ret = OB_SUCCESS;
138if (OB_UNLIKELY(!is_user_tenant(user_tenant_id))) {
139ret = OB_INVALID_ARGUMENT;
140LOG_WARN("not user tenant", KR(ret), K(user_tenant_id));
141} else {
142ObPhysicalRestoreTableOperator restore_op;
143ObPhysicalRestoreJob initaitor_job_info;
144ObPhysicalRestoreJob job_info;
145if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) {
146LOG_WARN("failed to init restore op", KR(ret), K(user_tenant_id));
147} else if (OB_FAIL(restore_op.get_job_by_tenant_name(
148tenant_name, initaitor_job_info))) {
149LOG_WARN("failed to get job by tenant name", KR(ret), K(tenant_name));
150} else if (OB_FAIL(job_info.assign(initaitor_job_info))) {
151LOG_WARN("failed to assign job info", KR(ret), K(initaitor_job_info));
152} else {
153ObMySQLTransaction trans;
154//TODO get tenant job_id, use tenant
155const int64_t job_id = initaitor_job_info.get_job_id();
156job_info.init_restore_key(user_tenant_id, job_id);
157job_info.set_tenant_id(user_tenant_id);
158job_info.set_status(share::PHYSICAL_RESTORE_PRE);
159job_info.set_initiator_job_id(job_info.get_job_id());
160job_info.set_initiator_tenant_id(OB_SYS_TENANT_ID);
161ObPhysicalRestoreTableOperator user_restore_op;
162ObRestorePersistHelper restore_persist_op;
163ObRestoreProgressPersistInfo persist_info;
164persist_info.key_.tenant_id_ = user_tenant_id;
165persist_info.key_.job_id_ = job_info.get_job_id();
166persist_info.restore_scn_ = job_info.get_restore_scn();
167const uint64_t exec_tenant_id = gen_meta_tenant_id(user_tenant_id);
168if (OB_FAIL(trans.start(&sql_client, exec_tenant_id))) {
169LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
170} else if (OB_FAIL(user_restore_op.init(&trans, user_tenant_id, share::OBCG_STORAGE /*group_id*/))) {
171LOG_WARN("failed to init restore op", KR(ret), K(user_tenant_id));
172} else if (OB_FAIL(restore_persist_op.init(user_tenant_id, share::OBCG_STORAGE /*group_id*/))) {
173LOG_WARN("failed to init restore persist op", KR(ret), K(user_tenant_id));
174} else if (OB_FAIL(user_restore_op.insert_job(job_info))) {
175LOG_WARN("failed to insert job", KR(ret), K(job_info));
176} else if (OB_FAIL(restore_persist_op.insert_initial_restore_progress(trans, persist_info))) {
177LOG_WARN("failed to insert persist info", KR(ret), K(persist_info));
178}
179if (trans.is_started()) {
180int temp_ret = OB_SUCCESS;
181bool commit = OB_SUCC(ret);
182if (OB_SUCCESS != (temp_ret = trans.end(commit))) {
183ret = (OB_SUCC(ret)) ? temp_ret : ret;
184LOG_WARN("trans end failed", KR(ret), KR(temp_ret), K(commit));
185}
186}
187}
188}
189return ret;
190}
191
192
193int ObRestoreUtil::check_has_physical_restore_job(
194common::ObISQLClient &sql_client,
195const ObString &tenant_name,
196bool &has_job)
197{
198int ret = OB_SUCCESS;
199ObArray<ObPhysicalRestoreJob> jobs;
200has_job = false;
201ObPhysicalRestoreTableOperator restore_op;
202if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) {
203LOG_WARN("fail init restore op", K(ret));
204} else if (OB_FAIL(restore_op.get_jobs(jobs))) {
205LOG_WARN("fail get jobs", K(ret));
206} else {
207int64_t len = common::OB_MAX_TENANT_NAME_LENGTH_STORE;
208FOREACH_CNT_X(job, jobs, !has_job) {
209if (0 == job->get_tenant_name().case_compare(tenant_name)) {
210//nocase compare
211has_job = true;
212}
213}
214}
215return ret;
216}
217
218int ObRestoreUtil::fill_backup_info_(
219const obrpc::ObPhysicalRestoreTenantArg &arg,
220share::ObPhysicalRestoreJob &job)
221{
222int ret = OB_SUCCESS;
223const bool has_multi_url = arg.multi_uri_.length() > 0;
224LOG_INFO("start fill backup path", K(arg));
225if (has_multi_url) {
226if(OB_FAIL(fill_multi_backup_path(arg, job))) {
227LOG_WARN("failed to fill multi backup path", K(ret), K(arg));
228}
229} else {
230if (OB_FAIL(fill_compat_backup_path(arg, job))) {
231LOG_WARN("failed to fill compat backup path", K(ret), K(arg));
232}
233}
234FLOG_INFO("finish fill backup path", K(arg), K(job));
235return ret;
236}
237
238int ObRestoreUtil::fill_multi_backup_path(
239const obrpc::ObPhysicalRestoreTenantArg &arg,
240share::ObPhysicalRestoreJob &job)
241{
242int ret = OB_SUCCESS;
243// TODO: use restore preview url
244return ret;
245}
246
247int ObRestoreUtil::get_encrypt_backup_dest_format_str(
248const ObArray<ObString> &original_dest_list,
249common::ObArenaAllocator &allocator,
250common::ObString &encrypt_dest_str)
251{
252int ret = OB_SUCCESS;
253char *buf = NULL;
254int64_t length = OB_MAX_BACKUP_DEST_LENGTH * original_dest_list.count();
255if (0 == original_dest_list.count()) {
256ret = OB_INVALID_ARGUMENT;
257LOG_WARN("get invalid argument", KR(ret), K(original_dest_list));
258} else if (OB_ISNULL(buf = reinterpret_cast<char *>(allocator.alloc(length)))) {
259ret = OB_ALLOCATE_MEMORY_FAILED;
260LOG_WARN("allocate memory failed", KR(ret));
261} else {
262ObBackupDest dest;
263char encrypt_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
264int64_t pos = 0;
265for (int i = 0; OB_SUCC(ret) && i < original_dest_list.count(); i++) {
266const common::ObString &item = original_dest_list.at(i);
267if (OB_FAIL(dest.set_without_decryption(item))) {
268LOG_WARN("failed to push back", KR(ret), K(item));
269} else if (OB_FAIL(dest.get_backup_dest_str(encrypt_str, sizeof(encrypt_str)))) {
270LOG_WARN("failed to get backup dest str", KR(ret), K(item));
271} else if (OB_FAIL(databuff_printf(buf, length, pos, "%s%s", 0 == i ? "" : ",", encrypt_str))) {
272LOG_WARN("failed to append uri", KR(ret), K(encrypt_str), K(pos), K(buf));
273}
274}
275if (OB_FAIL(ret)) {
276} else if (strlen(buf) <= 0) {
277ret = OB_ERR_UNEXPECTED;
278LOG_WARN("unexpected format str", KR(ret), K(buf));
279} else {
280encrypt_dest_str.assign_ptr(buf, strlen(buf));
281LOG_DEBUG("get format encrypt backup dest str", KR(ret), K(encrypt_dest_str));
282}
283}
284
285return ret;
286}
287
288int ObRestoreUtil::fill_compat_backup_path(
289const obrpc::ObPhysicalRestoreTenantArg &arg,
290share::ObPhysicalRestoreJob &job)
291{
292int ret = OB_SUCCESS;
293ObArenaAllocator allocator;
294ObArray<ObString> tenant_path_array;
295ObArray<ObRestoreBackupSetBriefInfo> backup_set_list;
296ObArray<ObRestoreLogPieceBriefInfo> backup_piece_list;
297ObArray<ObBackupPathString> log_path_list;
298ObString tenant_dest_list;
299int64_t last_backup_set_idx = -1;
300bool restore_using_compl_log = false;
301share::SCN restore_scn;
302if (!arg.multi_uri_.empty()) {
303ret = OB_INVALID_ARGUMENT;
304LOG_WARN("invalid args", K(ret), K(arg));
305} else if (OB_FAIL(ObPhysicalRestoreUriParser::parse(arg.uri_, allocator, tenant_path_array))) {
306LOG_WARN("fail to parse uri", K(ret), K(arg));
307} else if (OB_FAIL(get_encrypt_backup_dest_format_str(tenant_path_array, allocator, tenant_dest_list))) {
308LOG_WARN("failed to convert uri", K(ret), K(arg), K(tenant_path_array));
309} else if (OB_FAIL(job.set_backup_dest(tenant_dest_list))) {
310LOG_WARN("failed to copy backup dest", K(ret), K(arg));
311} else if (OB_FAIL(check_restore_using_complement_log_(tenant_path_array, restore_using_compl_log))) {
312LOG_WARN("failed to check only contain backup set", K(ret), K(tenant_path_array));
313} else if (OB_FAIL(fill_restore_scn_(
314arg.restore_scn_, arg.restore_timestamp_, arg.with_restore_scn_, tenant_path_array, arg.passwd_array_,
315restore_using_compl_log, restore_scn))) {
316LOG_WARN("fail to fill restore scn", K(ret), K(arg), K(tenant_path_array));
317} else if (OB_FALSE_IT(job.set_restore_scn(restore_scn))) {
318} else if (OB_FAIL(get_restore_source(restore_using_compl_log, tenant_path_array, arg.passwd_array_, job.get_restore_scn(),
319backup_set_list, backup_piece_list, log_path_list))) {
320LOG_WARN("fail to get restore source", K(ret), K(tenant_path_array), K(arg));
321} else if (OB_FAIL(do_fill_backup_path_(backup_set_list, backup_piece_list, log_path_list, job))) {
322LOG_WARN("fail to do fill backup path", K(backup_set_list), K(backup_piece_list), K(log_path_list));
323} else if (OB_FALSE_IT(last_backup_set_idx = backup_set_list.count() - 1)) {
324} else if (last_backup_set_idx < 0) {
325ret = OB_ERR_UNEXPECTED;
326LOG_WARN("invalid idx", K(ret), K(last_backup_set_idx), K(backup_set_list));
327} else if (OB_FAIL(do_fill_backup_info_(backup_set_list.at(last_backup_set_idx).backup_set_path_, job))) {
328LOG_WARN("fail to do fill backup info");
329}
330return ret;
331}
332
333int ObRestoreUtil::fill_restore_scn_(
334const share::SCN &src_scn,
335const ObString ×tamp,
336const bool with_restore_scn,
337const ObIArray<ObString> &tenant_path_array,
338const common::ObString &passwd,
339const bool restore_using_compl_log,
340share::SCN &restore_scn)
341{
342int ret = OB_SUCCESS;
343if (tenant_path_array.empty()) {
344ret = OB_INVALID_ARGUMENT;
345LOG_WARN("invalid argument", K(ret), K(tenant_path_array));
346} else if (with_restore_scn) {
347// restore scn which is specified by user
348restore_scn = src_scn;
349} else if (!with_restore_scn) {
350if (restore_using_compl_log) {
351SCN min_restore_scn = SCN::min_scn();
352ARRAY_FOREACH_X(tenant_path_array, i, cnt, OB_SUCC(ret)) {
353const ObString &tenant_path = tenant_path_array.at(i);
354storage::ObBackupDataStore store;
355share::ObBackupDest backup_dest;
356ObBackupFormatDesc format_desc;
357share::ObBackupSetFileDesc backup_set_file_desc;
358if (OB_FAIL(backup_dest.set(tenant_path.ptr()))) {
359LOG_WARN("fail to set backup dest", K(ret), K(tenant_path));
360} else if (OB_FAIL(store.init(backup_dest))) {
361LOG_WARN("failed to init backup store", K(ret), K(tenant_path));
362} else if (OB_FAIL(store.read_format_file(format_desc))) {
363LOG_WARN("failed to read format file", K(ret), K(store));
364} else if (ObBackupDestType::DEST_TYPE_BACKUP_DATA != format_desc.dest_type_) {
365LOG_INFO("skip log dir", K(tenant_path), K(format_desc));
366} else if (OB_FAIL(store.get_max_backup_set_file_info(passwd, backup_set_file_desc))) {
367LOG_WARN("fail to get backup set array", K(ret));
368} else {
369min_restore_scn = backup_set_file_desc.min_restore_scn_;
370}
371}
372if (OB_SUCC(ret)) {
373if (SCN::min_scn() == min_restore_scn) {
374ret = OB_ERR_UNEXPECTED;
375LOG_WARN("invalid min restore scn, do not find available backup tenant path to restore", K(ret), K(tenant_path_array));
376} else {
377restore_scn = min_restore_scn;
378}
379}
380} else if (!timestamp.empty()) {
381common::ObTimeZoneInfoWrap time_zone_wrap;
382if (OB_FAIL(get_backup_sys_time_zone_(tenant_path_array, time_zone_wrap))) {
383LOG_WARN("failed to get backup sys time zone", K(ret), K(tenant_path_array));
384} else if (OB_FAIL(convert_restore_timestamp_to_scn_(timestamp, time_zone_wrap, restore_scn))) {
385LOG_WARN("failed to convert restore timestamp to scn", K(ret));
386}
387} else {
388int64_t round_id = 0;
389int64_t piece_id = 0;
390SCN max_checkpoint_scn = SCN::min_scn();
391// restore to max checkpoint scn of log
392ARRAY_FOREACH_X(tenant_path_array, i, cnt, OB_SUCC(ret)) {
393const ObString &tenant_path = tenant_path_array.at(i);
394ObArchiveStore store;
395ObBackupDest dest;
396ObBackupFormatDesc format_desc;
397SCN cur_max_checkpoint_scn = SCN::min_scn();
398if (OB_FAIL(dest.set(tenant_path))) {
399LOG_WARN("fail to set dest", K(ret), K(tenant_path));
400} else if (OB_FAIL(store.init(dest))) {
401LOG_WARN("failed to init archive store", K(ret), K(tenant_path));
402} else if (OB_FAIL(store.read_format_file(format_desc))) {
403LOG_WARN("failed to read format file", K(ret), K(tenant_path));
404} else if (ObBackupDestType::TYPE::DEST_TYPE_ARCHIVE_LOG != format_desc.dest_type_) {
405LOG_INFO("skip data dir", K(tenant_path), K(format_desc));
406} else if (OB_FAIL(store.get_max_checkpoint_scn(format_desc.dest_id_, round_id, piece_id, cur_max_checkpoint_scn))) {
407LOG_WARN("fail to get max checkpoint scn", K(ret), K(format_desc));
408} else {
409max_checkpoint_scn = std::max(max_checkpoint_scn, cur_max_checkpoint_scn);
410}
411}
412if (OB_SUCC(ret)) {
413if (SCN::min_scn() == max_checkpoint_scn) {
414ret = OB_ERR_UNEXPECTED;
415LOG_WARN("invalid max checkpoint scn, no archvie tenant path", K(ret), K(tenant_path_array));
416} else {
417restore_scn = max_checkpoint_scn;
418}
419}
420}
421}
422return ret;
423}
424
425int ObRestoreUtil::fill_encrypt_info_(
426const obrpc::ObPhysicalRestoreTenantArg &arg,
427share::ObPhysicalRestoreJob &job)
428{
429int ret = OB_SUCCESS;
430#ifdef OB_BUILD_TDE_SECURITY
431ObArenaAllocator allocator;
432ObArray<ObString> kms_path_array;
433ObString kms_dest_str;
434ObBackupDest dest;
435ObBackupIoAdapter util;
436bool is_exist = false;
437if (OB_FAIL(job.set_encrypt_key(arg.encrypt_key_))) {
438LOG_WARN("failed to fill encrypt key", KR(ret), K(arg));
439} else if (arg.kms_uri_.empty()) {
440// do nothing
441} else if (OB_FAIL(ObPhysicalRestoreUriParser::parse(arg.kms_uri_, allocator, kms_path_array))) {
442LOG_WARN("fail to parse uri", K(ret), K(arg));
443} else if (OB_FAIL(get_encrypt_backup_dest_format_str(kms_path_array, allocator, kms_dest_str))) {
444LOG_WARN("failed to convert uri", K(ret), K(arg), K(kms_path_array));
445} else if (OB_FAIL(dest.set(kms_dest_str))) {
446LOG_WARN("failed to set dest", K(ret));
447} else if (OB_FAIL(util.is_exist(dest.get_root_path(), dest.get_storage_info(), is_exist))) {
448LOG_WARN("failed to check file is exists", K(ret));
449} else if (OB_UNLIKELY(!is_exist)) {
450ret = OB_BACKUP_FILE_NOT_EXIST;
451LOG_WARN("kms backup file is not exist", K(ret));
452} else if (OB_FAIL(job.set_kms_dest(kms_dest_str))) {
453LOG_WARN("failed to copy kms dest", K(ret), K(arg));
454} else if (OB_FAIL(job.set_kms_encrypt_key(arg.kms_encrypt_key_))) {
455LOG_WARN("failed to fill kms encrypt key", KR(ret), K(arg));
456}
457#endif
458return ret;
459}
460
461int ObRestoreUtil::get_restore_source(
462const bool restore_using_compl_log,
463const ObIArray<ObString>& tenant_path_array,
464const common::ObString &passwd_array,
465const SCN &restore_scn,
466ObIArray<ObRestoreBackupSetBriefInfo> &backup_set_list,
467ObIArray<ObRestoreLogPieceBriefInfo> &backup_piece_list,
468ObIArray<ObBackupPathString> &log_path_list)
469{
470int ret = OB_SUCCESS;
471SCN restore_start_scn = SCN::min_scn();
472if (OB_FAIL(get_restore_backup_set_array_(tenant_path_array, passwd_array, restore_scn,
473restore_start_scn, backup_set_list))) {
474LOG_WARN("fail to get restore backup set array", K(ret), K(tenant_path_array), K(restore_scn));
475} else if (!restore_using_compl_log && OB_FAIL(get_restore_log_piece_array_(
476tenant_path_array, restore_start_scn, restore_scn, backup_piece_list, log_path_list))) {
477LOG_WARN("fail to get restore log piece array", K(ret), K(tenant_path_array), K(restore_start_scn),
478K(restore_scn));
479} else if (restore_using_compl_log && OB_FAIL(get_restore_log_array_for_complement_log_(
480backup_set_list, restore_start_scn, restore_scn, backup_piece_list, log_path_list))) {
481LOG_WARN("fail to get restore log piece array", K(ret), K(backup_set_list), K(restore_start_scn), K(restore_scn));
482} else if (backup_set_list.empty() || backup_piece_list.empty() || log_path_list.empty()) {
483ret = OB_ERR_UNEXPECTED;
484LOG_WARN("no backup set path or log piece can be used to restore", K(ret),
485K(tenant_path_array), K(backup_set_list), K(backup_piece_list), K(log_path_list), K(restore_start_scn),
486K(restore_scn));
487}
488return ret;
489}
490
491int ObRestoreUtil::check_restore_using_complement_log_(
492const ObIArray<ObString> &tenant_path_array,
493bool &restore_using_compl_log)
494{
495int ret = OB_SUCCESS;
496restore_using_compl_log = true;
497if (tenant_path_array.empty()) {
498ret = OB_INVALID_ARGUMENT;
499LOG_WARN("invalid argument", K(ret), K(tenant_path_array));
500} else {
501ARRAY_FOREACH_X(tenant_path_array, i, cnt, OB_SUCC(ret)) {
502const ObString &tenant_path = tenant_path_array.at(i);
503storage::ObBackupDataStore store;
504share::ObBackupDest backup_dest;
505ObBackupFormatDesc format_desc;
506if (OB_FAIL(backup_dest.set(tenant_path.ptr()))) {
507LOG_WARN("fail to set backup dest", K(ret), K(tenant_path));
508} else if (OB_FAIL(store.init(backup_dest))) {
509LOG_WARN("failed to init backup store", K(ret), K(tenant_path));
510} else if (OB_FAIL(store.read_format_file(format_desc))) {
511LOG_WARN("failed to read format file", K(ret), K(store));
512} else if (ObBackupDestType::DEST_TYPE_ARCHIVE_LOG == format_desc.dest_type_) {
513restore_using_compl_log = false;
514LOG_INFO("not only contain backup data path", K(tenant_path), K(format_desc));
515break;
516}
517}
518}
519return ret;
520}
521
522int ObRestoreUtil::get_restore_backup_set_array_(
523const ObIArray<ObString> &tenant_path_array,
524const common::ObString &passwd_array,
525const SCN &restore_scn,
526SCN &restore_start_scn,
527ObIArray<ObRestoreBackupSetBriefInfo> &backup_set_list)
528{
529int ret = OB_SUCCESS;
530if (tenant_path_array.empty()) {
531ret = OB_INVALID_ARGUMENT;
532LOG_WARN("invaldi argument", K(ret), K(tenant_path_array));
533} else {
534ARRAY_FOREACH_X(tenant_path_array, i, cnt, OB_SUCC(ret)) {
535const ObString &tenant_path = tenant_path_array.at(i);
536storage::ObBackupDataStore store;
537share::ObBackupDest backup_dest;
538ObBackupFormatDesc format_desc;
539if (OB_FAIL(backup_dest.set(tenant_path.ptr()))) {
540LOG_WARN("fail to set backup dest", K(ret), K(tenant_path));
541} else if (OB_FAIL(store.init(backup_dest))) {
542LOG_WARN("failed to init backup store", K(ret), K(tenant_path));
543} else if (OB_FAIL(store.read_format_file(format_desc))) {
544LOG_WARN("failed to read format file", K(ret), K(store));
545} else if (ObBackupDestType::DEST_TYPE_BACKUP_DATA != format_desc.dest_type_) {
546LOG_INFO("skip log dir", K(tenant_path), K(format_desc));
547} else if (!backup_set_list.empty()) {
548ret = OB_NOT_SUPPORTED;
549LOG_WARN("It is not support to restore from multiple tenant backup paths", K(ret));
550LOG_USER_ERROR(OB_NOT_SUPPORTED, "It is not support to restore from multiple tenant backup paths.");
551} else if (OB_FAIL(store.get_backup_set_array(passwd_array, restore_scn, restore_start_scn, backup_set_list))) {
552LOG_WARN("fail to get backup set array", K(ret));
553}
554}
555}
556return ret;
557}
558
559int ObRestoreUtil::get_restore_backup_piece_list_(
560const ObBackupDest &dest,
561const ObArray<share::ObRestoreLogPieceBriefInfo> &piece_array,
562ObIArray<ObRestoreLogPieceBriefInfo> &backup_piece_list)
563{
564int ret = OB_SUCCESS;
565if (!dest.is_valid()) {
566ret = OB_INVALID_ARGUMENT;
567LOG_WARN("dest is invalid", K(ret), K(dest));
568} else {
569for (int64_t j = 0; OB_SUCC(ret) && j < piece_array.count(); ++j) {
570const share::ObRestoreLogPieceBriefInfo &piece_path = piece_array.at(j);
571ObRestoreLogPieceBriefInfo backup_piece_path;
572backup_piece_path.piece_id_ = piece_path.piece_id_;
573backup_piece_path.start_scn_ = piece_path.start_scn_;
574backup_piece_path.checkpoint_scn_ = piece_path.checkpoint_scn_;
575ObBackupDest piece_dest;
576if (OB_FAIL(piece_dest.set(piece_path.piece_path_.ptr(), dest.get_storage_info()))) {
577LOG_WARN("fail to set piece dest", K(ret), K(piece_path), K(dest));
578} else if (OB_FAIL(piece_dest.get_backup_dest_str(backup_piece_path.piece_path_.ptr(), backup_piece_path.piece_path_.capacity()))) {
579LOG_WARN("fail to get piece dest str", K(ret), K(piece_dest));
580} else if (OB_FAIL(backup_piece_list.push_back(backup_piece_path))) {
581LOG_WARN("fail to push backup piece list", K(ret));
582}
583}
584}
585
586return ret;
587}
588
589int ObRestoreUtil::get_restore_backup_piece_list_(
590const ObBackupDest &dest,
591const ObArray<share::ObPieceKey> &piece_array,
592ObIArray<ObRestoreLogPieceBriefInfo> &backup_piece_list)
593{
594int ret = OB_SUCCESS;
595if (!dest.is_valid()) {
596ret = OB_INVALID_ARGUMENT;
597LOG_WARN("dest is invalid", K(ret), K(dest));
598} else {
599for (int64_t j = 0; OB_SUCC(ret) && j < piece_array.count(); ++j) {
600const share::ObPieceKey &piece_key = piece_array.at(j);
601ObRestoreLogPieceBriefInfo backup_piece_path;
602backup_piece_path.piece_id_ = piece_key.piece_id_;
603ObBackupPath backup_path;
604ObBackupDest piece_dest;
605if (OB_FAIL(ObArchivePathUtil::get_piece_dir_path(dest, piece_key.dest_id_,
606piece_key.round_id_, piece_key.piece_id_, backup_path))) {
607LOG_WARN("failed to get piece dir path", K(ret), K(dest), K(piece_key));
608} else if (OB_FAIL(piece_dest.set(backup_path.get_ptr(), dest.get_storage_info()))) {
609LOG_WARN("fail to set piece dest", K(ret), K(backup_path), K(dest));
610} else if (OB_FAIL(piece_dest.get_backup_dest_str(backup_piece_path.piece_path_.ptr(), backup_piece_path.piece_path_.capacity()))) {
611LOG_WARN("fail to get piece dest str", K(ret), K(piece_dest));
612} else if (OB_FAIL(backup_piece_list.push_back(backup_piece_path))) {
613LOG_WARN("fail to push backup piece list", K(ret));
614}
615}
616}
617return ret;
618}
619
620int ObRestoreUtil::get_restore_log_path_list_(
621const ObBackupDest &dest,
622ObIArray<share::ObBackupPathString> &log_path_list)
623{
624int ret = OB_SUCCESS;
625ObBackupPathString log_path;
626if (!dest.is_valid()) {
627ret = OB_INVALID_ARGUMENT;
628LOG_WARN("dest is invalid", K(ret), K(dest));
629} else if (OB_FAIL(dest.get_backup_dest_str(log_path.ptr(), log_path.capacity()))) {
630LOG_WARN("fail to get backup dest str", K(ret), K(dest));
631} else if (OB_FAIL(log_path_list.push_back(log_path))) {
632LOG_WARN("fail to push backup log path", K(ret), K(log_path));
633}
634return ret;
635}
636
637int ObRestoreUtil::get_restore_log_piece_array_(
638const ObIArray<ObString> &tenant_path_array,
639const SCN &restore_start_scn,
640const SCN &restore_end_scn,
641ObIArray<ObRestoreLogPieceBriefInfo> &backup_piece_list,
642ObIArray<share::ObBackupPathString> &log_path_list)
643{
644int ret = OB_SUCCESS;
645ObArray<share::ObRestoreLogPieceBriefInfo> piece_array;
646if (tenant_path_array.empty()) {
647ret = OB_INVALID_ARGUMENT;
648LOG_WARN("invaldi argument", K(ret), K(tenant_path_array));
649} else {
650for (int64_t i = 0; OB_SUCC(ret) && i < tenant_path_array.count(); ++i) {
651piece_array.reset();
652const ObString &tenant_path = tenant_path_array.at(i);
653ObArchiveStore store;
654ObBackupDest dest;
655ObBackupFormatDesc format_desc;
656if (OB_FAIL(dest.set(tenant_path))) {
657LOG_WARN("fail to set dest", K(ret), K(tenant_path));
658} else if (OB_FAIL(store.init(dest))) {
659LOG_WARN("failed to init archive store", K(ret), K(tenant_path));
660} else if (OB_FAIL(store.read_format_file(format_desc))) {
661LOG_WARN("failed to read format file", K(ret), K(tenant_path));
662} else if (ObBackupDestType::TYPE::DEST_TYPE_ARCHIVE_LOG != format_desc.dest_type_) {
663LOG_INFO("skip data dir", K(tenant_path), K(format_desc));
664} else if (OB_FAIL(store.get_piece_paths_in_range(restore_start_scn, restore_end_scn, piece_array))) {
665LOG_WARN("fail to get restore pieces", K(ret), K(restore_start_scn), K(restore_end_scn));
666} else if (OB_FAIL(get_restore_log_path_list_(dest, log_path_list))) {
667LOG_WARN("fail to get restore log path list", K(ret), K(dest));
668} else if (OB_FAIL(get_restore_backup_piece_list_(dest, piece_array, backup_piece_list))){
669LOG_WARN("fail to get restore backup piece list", K(ret), K(dest), K(piece_array));
670}
671}
672}
673return ret;
674}
675
676int ObRestoreUtil::get_restore_log_array_for_complement_log_(
677const ObIArray<ObRestoreBackupSetBriefInfo> &backup_set_list,
678const share::SCN &restore_start_scn,
679const share::SCN &restore_end_scn,
680ObIArray<share::ObRestoreLogPieceBriefInfo> &backup_piece_list,
681ObIArray<share::ObBackupPathString> &log_path_list)
682{
683int ret = OB_SUCCESS;
684if (backup_set_list.empty()) {
685ret = OB_INVALID_ARGUMENT;
686LOG_WARN("invaldi argument", K(ret), K(backup_set_list));
687} else {
688const ObRestoreBackupSetBriefInfo &info = backup_set_list.at(backup_set_list.count() - 1);
689ObBackupDest dest;
690ObBackupDest compl_dest;
691ObArchiveStore archive_store;
692ObArray<ObPieceKey> piece_array;
693if (OB_FAIL(dest.set(info.backup_set_path_.str()))) {
694LOG_WARN("failed to set backup set path", K(ret), K(info));
695} else if (OB_FAIL(ObBackupPathUtil::construct_backup_complement_log_dest(dest, compl_dest))) {
696LOG_WARN("failed to construct backup complement log dest", K(ret), K(dest), K(info));
697} else if (OB_FAIL(archive_store.init(compl_dest))) {
698LOG_WARN("failed to init archive store", K(ret), K(compl_dest));
699} else if (OB_FAIL(get_restore_log_path_list_(compl_dest, log_path_list))) {
700LOG_WARN("fail to get restore log path list", K(ret), K(dest));
701} else if (OB_FAIL(archive_store.get_all_piece_keys(piece_array))) {
702LOG_WARN("fail to get restore pieces", K(ret), K(restore_start_scn), K(restore_end_scn));
703} else if (OB_FAIL(get_restore_backup_piece_list_(compl_dest, piece_array, backup_piece_list))){
704LOG_WARN("fail to get restore backup piece list", K(ret), K(dest), K(piece_array));
705} else {
706LOG_INFO("get restore log path list", K(backup_set_list), K(log_path_list));
707}
708}
709return ret;
710}
711
712int ObRestoreUtil::do_fill_backup_path_(
713const ObIArray<ObRestoreBackupSetBriefInfo> &backup_set_list,
714const ObIArray<ObRestoreLogPieceBriefInfo> &backup_piece_list,
715const ObIArray<ObBackupPathString> &log_path_list,
716share::ObPhysicalRestoreJob &job)
717{
718int ret = OB_SUCCESS;
719if (backup_set_list.empty() || backup_piece_list.empty()) {
720ret = OB_INVALID_ARGUMENT;
721LOG_WARN("invalid argument", K(ret), K(backup_set_list), K(backup_piece_list));
722} else {
723ObArray<share::ObBackupPiecePath> backup_piece_path_list;
724for (int64_t i = 0; OB_SUCC(ret) && i < backup_piece_list.count(); ++i) {
725if (OB_FAIL(backup_piece_path_list.push_back(backup_piece_list.at(i).piece_path_))) {
726LOG_WARN("failed to push backup piece", K(ret));
727}
728}
729if (OB_FAIL(ret)) {
730} else if (OB_FAIL(job.get_multi_restore_path_list().set(backup_set_list, backup_piece_path_list, log_path_list))) {
731LOG_WARN("failed to set mutli restore path list", KR(ret));
732}
733}
734return ret;
735}
736
737int ObRestoreUtil::do_fill_backup_info_(
738const share::ObBackupSetPath & backup_set_path,
739share::ObPhysicalRestoreJob &job)
740{
741int ret = OB_SUCCESS;
742storage::ObBackupDataStore store;
743ObBackupDataLSAttrDesc ls_info;
744HEAP_VARS_2((ObExternBackupSetInfoDesc, backup_set_info),
745(ObExternTenantLocalityInfoDesc, locality_info)) {
746if (backup_set_path.is_empty()) {
747ret = OB_INVALID_ARGUMENT;
748LOG_WARN("invalid argument", K(ret), K(backup_set_path));
749} else if (OB_FAIL(store.init(backup_set_path.ptr()))) {
750LOG_WARN("fail to init mgr", K(ret));
751} else if (OB_FAIL(store.read_backup_set_info(backup_set_info))) {
752LOG_WARN("fail to read backup set info", K(ret));
753} else if (OB_FAIL(store.read_tenant_locality_info(locality_info))) {
754LOG_WARN("fail to read locality info", K(ret));
755} else if (!backup_set_info.is_valid()) {
756ret = OB_ERR_UNEXPECTED;
757LOG_WARN("invalid backup set file", K(ret), K(backup_set_info));
758} else if (OB_FAIL(store.read_ls_attr_info(backup_set_info.backup_set_file_.meta_turn_id_, ls_info))) {
759LOG_WARN("failed to read ls attr info", K(ret), K(backup_set_info));
760} else if (OB_FAIL(check_backup_set_version_match_(backup_set_info.backup_set_file_))) {
761LOG_WARN("failed to check backup set version match", K(ret));
762} else if (OB_FAIL(job.set_backup_tenant_name(locality_info.tenant_name_.ptr()))) {
763LOG_WARN("fail to set backup tenant name", K(ret), "tenant name", locality_info.tenant_name_);
764} else if (OB_FAIL(job.set_backup_cluster_name(locality_info.cluster_name_.ptr()))) {
765LOG_WARN("fail to set backup cluster name", K(ret), "cluster name", locality_info.cluster_name_);
766} else {
767job.set_source_data_version(backup_set_info.backup_set_file_.tenant_compatible_);
768job.set_source_cluster_version(backup_set_info.backup_set_file_.cluster_version_);
769job.set_compat_mode(locality_info.compat_mode_);
770job.set_backup_tenant_id(backup_set_info.backup_set_file_.tenant_id_);
771// becuase of no consistent scn in 4.1.x backup set, using ls_info.backup_scn to set the restore consisitent scn
772// ls_info.backup_scn is the default replayable scn when create restore tenant,
773// so using it as the consistet scn can also make recovery service work normally
774const SCN &scn = backup_set_info.backup_set_file_.tenant_compatible_ < DATA_VERSION_4_2_0_0
775? ls_info.backup_scn_
776: backup_set_info.backup_set_file_.consistent_scn_;
777job.set_consistent_scn(scn);
778}
779}
780return ret;
781}
782
783int ObRestoreUtil::check_backup_set_version_match_(share::ObBackupSetFileDesc &backup_file_desc)
784{
785int ret = OB_SUCCESS;
786uint64_t data_version = 0;
787if (!backup_file_desc.is_valid()) {
788ret = OB_INVALID_ARGUMENT;
789LOG_WARN("invalid argument", K(ret), K(backup_file_desc));
790} else if (!ObUpgradeChecker::check_cluster_version_exist(backup_file_desc.cluster_version_)) {
791ret = OB_INVALID_ARGUMENT;
792LOG_WARN("cluster version are not exist", K(ret));
793LOG_USER_ERROR(OB_INVALID_ARGUMENT, "cluster version of backup set");
794} else if (!ObUpgradeChecker::check_data_version_exist(backup_file_desc.tenant_compatible_)) {
795ret = OB_INVALID_ARGUMENT;
796LOG_WARN("data version are not exist", K(ret));
797LOG_USER_ERROR(OB_INVALID_ARGUMENT, "tenant compatible of backup set");
798} else if (GET_MIN_CLUSTER_VERSION() < backup_file_desc.cluster_version_) {
799ret = OB_OP_NOT_ALLOW;
800LOG_WARN("restore from higher cluster version is not allowed", K(ret));
801LOG_USER_ERROR(OB_OP_NOT_ALLOW, "restore from higher cluster version is");
802} else if (OB_FAIL(ObUpgradeChecker::get_data_version_by_cluster_version(GET_MIN_CLUSTER_VERSION(), data_version))) {
803LOG_WARN("failed to get data version", K(ret));
804} else if (data_version < backup_file_desc.tenant_compatible_) {
805ret = OB_OP_NOT_ALLOW;
806LOG_WARN("restore from higher data version is not allowed", K(ret), K(data_version), K(backup_file_desc.tenant_compatible_));
807LOG_USER_ERROR(OB_OP_NOT_ALLOW, "restore from higher data version is");
808} else if (backup_file_desc.tenant_compatible_ < DATA_VERSION_4_1_0_0 && data_version >= DATA_VERSION_4_1_0_0) {
809ret = OB_OP_NOT_ALLOW;
810LOG_WARN("restore from version 4.0 is not allowd", K(ret), K(backup_file_desc.tenant_compatible_), K(data_version));
811LOG_USER_ERROR(OB_OP_NOT_ALLOW, "restore from version 4.0 is");
812}
813return ret;
814}
815
816int ObRestoreUtil::recycle_restore_job(const uint64_t tenant_id,
817common::ObMySQLProxy &sql_proxy,
818const ObPhysicalRestoreJob &job_info)
819{
820int ret = OB_SUCCESS;
821ObMySQLTransaction trans;
822const int64_t job_id = job_info.get_job_id();
823const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
824if (OB_UNLIKELY(!is_user_tenant(tenant_id))) {
825ret = OB_INVALID_ARGUMENT;
826LOG_WARN("invalid argument", KR(ret), K(exec_tenant_id));
827} else if (OB_FAIL(trans.start(&sql_proxy, exec_tenant_id))) {
828LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
829} else {
830ObPhysicalRestoreTableOperator restore_op;
831if (OB_FAIL(restore_op.init(&trans, tenant_id, share::OBCG_STORAGE /*group_id*/))) {
832LOG_WARN("failed to init restore op", KR(ret), K(tenant_id));
833} else if (OB_FAIL(restore_op.remove_job(job_id))) {
834LOG_WARN("failed to remove job", KR(ret), K(tenant_id), K(job_id));
835} else {
836ObHisRestoreJobPersistInfo history_info;
837ObRestoreProgressPersistInfo restore_progress;
838ObRestorePersistHelper persist_helper;
839ObRestoreJobPersistKey key;
840common::ObArray<share::ObLSRestoreProgressPersistInfo> ls_restore_progress_infos;
841key.tenant_id_ = tenant_id;
842key.job_id_ = job_info.get_job_id();
843if (OB_FAIL(persist_helper.init(tenant_id, share::OBCG_STORAGE /*group_id*/))) {
844LOG_WARN("failed to init persist helper", KR(ret), K(tenant_id));
845} else if (OB_FAIL(persist_helper.get_restore_process(
846trans, key, restore_progress))) {
847LOG_WARN("failed to get restore progress", KR(ret), K(key));
848} else if (OB_FAIL(history_info.init_with_job_process(
849job_info, restore_progress))) {
850LOG_WARN("failed to init history", KR(ret), K(job_info), K(restore_progress));
851} else if (history_info.is_restore_success()) { // restore succeed, no need to record comment
852} else if (OB_FAIL(persist_helper.get_all_ls_restore_progress(trans, ls_restore_progress_infos))) {
853LOG_WARN("failed to get ls restore progress", K(ret));
854} else {
855int64_t pos = 0;
856ARRAY_FOREACH_X(ls_restore_progress_infos, i, cnt, OB_SUCC(ret)) {
857const ObLSRestoreProgressPersistInfo &ls_restore_info = ls_restore_progress_infos.at(i);
858if (ls_restore_info.status_.is_failed()) {
859if (OB_FAIL(databuff_printf(history_info.comment_.ptr(), history_info.comment_.capacity(), pos,
860"%s;", ls_restore_info.comment_.ptr()))) {
861if (OB_SIZE_OVERFLOW == ret) {
862ret = OB_SUCCESS;
863break;
864} else {
865LOG_WARN("failed to databuff printf comment", K(ret));
866}
867}
868}
869}
870}
871if (OB_FAIL(ret)) {
872} else if (OB_FAIL(persist_helper.insert_restore_job_history(
873trans, history_info))) {
874LOG_WARN("failed to insert restore job history", KR(ret), K(history_info));
875}
876}
877}
878if (trans.is_started()) {
879int tmp_ret = OB_SUCCESS;
880if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
881ret = OB_SUCC(ret) ? tmp_ret : ret;
882LOG_WARN("failed to end trans", KR(ret), K(tmp_ret));
883}
884}
885return ret;
886}
887
888int ObRestoreUtil::recycle_restore_job(common::ObMySQLProxy &sql_proxy,
889const share::ObPhysicalRestoreJob &job_info,
890const ObHisRestoreJobPersistInfo &history_info)
891{
892int ret = OB_SUCCESS;
893ObMySQLTransaction trans;
894const int64_t job_id = job_info.get_job_id();
895const int64_t tenant_id = job_info.get_restore_key().tenant_id_;
896const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
897ObRestorePersistHelper persist_helper;
898if (OB_UNLIKELY(OB_INVALID_TENANT_ID == exec_tenant_id)) {
899ret = OB_INVALID_ARGUMENT;
900LOG_WARN("invalid argument", KR(ret), K(exec_tenant_id));
901} else if (OB_FAIL(trans.start(&sql_proxy, exec_tenant_id))) {
902LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
903} else if (OB_FAIL(persist_helper.init(tenant_id, share::OBCG_STORAGE /*group_id*/))) {
904LOG_WARN("failed to init persist helper", KR(ret));
905} else if (OB_FAIL(persist_helper.insert_restore_job_history(trans, history_info))) {
906LOG_WARN("failed to insert restore job history", KR(ret), K(history_info));
907} else {
908ObPhysicalRestoreTableOperator restore_op;
909if (OB_FAIL(restore_op.init(&trans, tenant_id, share::OBCG_STORAGE /*group_id*/))) {
910LOG_WARN("failed to init restore op", KR(ret), K(tenant_id));
911} else if (OB_FAIL(restore_op.remove_job(job_id))) {
912LOG_WARN("failed to remove job", KR(ret), K(tenant_id), K(job_id));
913} else if (is_sys_tenant(tenant_id)) {
914//finish __all_rootservice_job
915int tmp_ret = PHYSICAL_RESTORE_SUCCESS == job_info.get_status() ? OB_SUCCESS : OB_ERROR;
916if (OB_FAIL(RS_JOB_COMPLETE(job_id, tmp_ret, trans))) {
917LOG_WARN("failed to complete job", KR(ret), K(job_id));
918}
919}
920}
921if (trans.is_started()) {
922int tmp_ret = OB_SUCCESS;
923if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
924ret = OB_SUCC(ret) ? tmp_ret : ret;
925LOG_WARN("failed to end trans", KR(ret), K(tmp_ret));
926}
927}
928return ret;
929}
930int ObRestoreUtil::get_user_restore_job_history(common::ObISQLClient &sql_client,
931const uint64_t user_tenant_id,
932const uint64_t initiator_tenant_id,
933const int64_t initiator_job_id,
934ObHisRestoreJobPersistInfo &history_info)
935{
936int ret = OB_SUCCESS;
937if (OB_UNLIKELY(!is_user_tenant(user_tenant_id)
938|| OB_INVALID_TENANT_ID == initiator_tenant_id
939|| 0 > initiator_job_id)) {
940ret = OB_INVALID_ARGUMENT;
941LOG_WARN("invalid argument", KR(ret), K(user_tenant_id),
942K(initiator_job_id), K(initiator_tenant_id));
943} else {
944ObRestorePersistHelper user_persist_helper;
945if (OB_FAIL(user_persist_helper.init(user_tenant_id, share::OBCG_STORAGE /*group_id*/))) {
946LOG_WARN("failed to init persist helper", KR(ret), K(user_tenant_id));
947} else if (OB_FAIL(user_persist_helper.get_restore_job_history(
948sql_client, initiator_job_id, initiator_tenant_id,
949history_info))) {
950LOG_WARN("failed to get restore progress", KR(ret), K(initiator_job_id), K(initiator_tenant_id));
951}
952}
953return ret;
954}
955
956int ObRestoreUtil::get_restore_ls_palf_base_info(
957const share::ObPhysicalRestoreJob &job_info, const ObLSID &ls_id,
958palf::PalfBaseInfo &palf_base_info)
959{
960int ret = OB_SUCCESS;
961storage::ObBackupDataStore store;
962const common::ObSArray<share::ObBackupSetPath> &backup_set_array =
963job_info.get_multi_restore_path_list().get_backup_set_path_list();
964const int64_t idx = backup_set_array.count() - 1;
965storage::ObLSMetaPackage ls_meta_package;
966if (idx < 0) {
967ret = OB_ERR_UNEXPECTED;
968LOG_WARN("backup_set_array can't empty", KR(ret), K(job_info));
969} else if (OB_FAIL(store.init(backup_set_array.at(idx).ptr()))) {
970LOG_WARN("fail to init backup data store", KR(ret));
971} else if (OB_FAIL(store.read_ls_meta_infos(ls_id, ls_meta_package))) {
972LOG_WARN("fail to read backup set info", KR(ret));
973} else if (!ls_meta_package.is_valid()) {
974ret = OB_INVALID_ARGUMENT;
975LOG_WARN("invalid backup set info", KR(ret), K(ls_meta_package));
976} else {
977palf_base_info = ls_meta_package.palf_meta_;
978LOG_INFO("[RESTORE] get restore ls palf base info", K(palf_base_info));
979}
980return ret;
981}
982
983int ObRestoreUtil::check_physical_restore_finish(
984common::ObISQLClient &proxy, const int64_t job_id, bool &is_finish, bool &is_failed) {
985int ret = OB_SUCCESS;
986is_failed = false;
987is_finish = false;
988ObSqlString sql;
989char status_str[OB_DEFAULT_STATUS_LENTH] = "";
990int64_t real_length = 0;
991HEAP_VAR(ObMySQLProxy::ReadResult, res) {
992common::sqlclient::ObMySQLResult *result = nullptr;
993int64_t cnt = 0;
994if (OB_FAIL(sql.assign_fmt("select status from %s where tenant_id=%lu and job_id=%ld",
995OB_ALL_RESTORE_JOB_HISTORY_TNAME, OB_SYS_TENANT_ID, job_id))) {
996LOG_WARN("failed to assign fmt", K(ret));
997} else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
998LOG_WARN("failed to exec sql", K(ret), K(sql));
999} else if (OB_ISNULL(result = res.get_result())) {
1000ret = OB_ERR_UNEXPECTED;
1001LOG_WARN("result is null", K(ret));
1002} else if (OB_FAIL(result->next())) {
1003if (OB_ITER_END == ret) {
1004ret = OB_SUCCESS;
1005} else {
1006LOG_WARN("failed to get next", K(ret), K(job_id));
1007}
1008} else {
1009EXTRACT_STRBUF_FIELD_MYSQL(*result, OB_STR_STATUS, status_str, OB_DEFAULT_STATUS_LENTH, real_length);
1010if (OB_SUCC(ret)) {
1011is_finish = true;
1012is_failed = 0 == STRCMP(status_str, "FAIL");
1013}
1014}
1015}
1016return ret;
1017}
1018
1019int ObRestoreUtil::get_restore_job_comment(
1020common::ObISQLClient &proxy, const int64_t job_id, char *buf, const int64_t buf_size)
1021{
1022int ret = OB_SUCCESS;
1023ObSqlString sql;
1024int real_length = 0;
1025HEAP_VAR(ObMySQLProxy::ReadResult, res) {
1026common::sqlclient::ObMySQLResult *result = nullptr;
1027int64_t cnt = 0;
1028if (OB_FAIL(sql.assign_fmt("select comment from %s where tenant_id=%lu and job_id=%ld",
1029OB_ALL_RESTORE_JOB_HISTORY_TNAME, OB_SYS_TENANT_ID, job_id))) {
1030LOG_WARN("failed to assign fmt", K(ret));
1031} else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
1032LOG_WARN("failed to exec sql", K(ret), K(sql));
1033} else if (OB_ISNULL(result = res.get_result())) {
1034ret = OB_ERR_UNEXPECTED;
1035LOG_WARN("result is null", K(ret));
1036} else if (OB_FAIL(result->next())) {
1037if (OB_ITER_END == ret) {
1038ret = OB_ENTRY_NOT_EXIST;
1039LOG_WARN("restore job comment not exist", K(ret));
1040} else {
1041LOG_WARN("failed to get next", K(ret), K(job_id));
1042}
1043} else {
1044EXTRACT_STRBUF_FIELD_MYSQL(*result, OB_STR_COMMENT, buf, buf_size, real_length);
1045}
1046}
1047return ret;
1048}
1049
1050int ObRestoreUtil::get_restore_tenant_cpu_count(
1051common::ObMySQLProxy &proxy, const uint64_t tenant_id, double &cpu_count)
1052{
1053int ret = OB_SUCCESS;
1054share::ObUnitTableOperator unit_op;
1055common::ObArray<share::ObResourcePool> pools;
1056common::ObArray<uint64_t> unit_config_ids;
1057common::ObArray<ObUnitConfig> configs;
1058if (OB_FAIL(unit_op.init(proxy))) {
1059LOG_WARN("failed to init proxy", K(ret));
1060} else if (OB_FAIL(unit_op.get_resource_pools(tenant_id, pools))) {
1061LOG_WARN("failed to get resource pool", K(ret), K(tenant_id));
1062}
1063ARRAY_FOREACH(pools, i) {
1064if (OB_FAIL(unit_config_ids.push_back(pools.at(i).unit_config_id_))) {
1065LOG_WARN("failed to push back unit config", K(ret));
1066}
1067}
1068if (FAILEDx(unit_op.get_unit_configs(unit_config_ids, configs))) {
1069LOG_WARN("failed to get unit configs", K(ret));
1070}
1071double max_cpu = OB_MAX_CPU_NUM;
1072ARRAY_FOREACH(configs, i) {
1073max_cpu = std::min(max_cpu, configs.at(i).max_cpu());
1074}
1075if (OB_SUCC(ret)) {
1076cpu_count = max_cpu;
1077}
1078return ret;
1079}
1080
1081int ObRestoreUtil::convert_restore_timestamp_to_scn_(
1082const ObString ×tamp,
1083const common::ObTimeZoneInfoWrap &time_zone_wrap,
1084share::SCN &scn)
1085{
1086int ret = OB_SUCCESS;
1087uint64_t scn_value = 0;
1088const ObTimeZoneInfo *time_zone_info = time_zone_wrap.get_time_zone_info();
1089if (timestamp.empty() || !time_zone_wrap.is_valid()) {
1090ret = OB_INVALID_ARGUMENT;
1091LOG_WARN("invalid time zone wrap", K(ret));
1092} else if (OB_FAIL(ObTimeConverter::str_to_scn_value(timestamp, time_zone_info, time_zone_info, ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT, true/*oracle mode*/, scn_value))) {
1093LOG_WARN("failed to str to scn value", K(ret), K(timestamp), K(time_zone_info));
1094} else if (OB_FAIL(scn.convert_for_sql(scn_value))) {
1095LOG_WARN("failed to convert for sql scn", K(ret), K(scn_value));
1096}
1097return ret;
1098}
1099
1100int ObRestoreUtil::get_backup_sys_time_zone_(
1101const ObIArray<ObString> &tenant_path_array,
1102common::ObTimeZoneInfoWrap &time_zone_wrap)
1103{
1104int ret = OB_SUCCESS;
1105ARRAY_FOREACH_X(tenant_path_array, i, cnt, OB_SUCC(ret)) {
1106const ObString &tenant_path = tenant_path_array.at(i);
1107storage::ObBackupDataStore store;
1108share::ObBackupDest backup_dest;
1109ObBackupFormatDesc format_desc;
1110if (OB_FAIL(backup_dest.set(tenant_path.ptr()))) {
1111LOG_WARN("fail to set backup dest", K(ret), K(tenant_path));
1112} else if (OB_FAIL(store.init(backup_dest))) {
1113LOG_WARN("failed to init backup store", K(ret), K(tenant_path));
1114} else if (OB_FAIL(store.read_format_file(format_desc))) {
1115LOG_WARN("failed to read format file", K(ret), K(store));
1116} else if (ObBackupDestType::DEST_TYPE_BACKUP_DATA != format_desc.dest_type_) {
1117LOG_INFO("skip log dir", K(tenant_path), K(format_desc));
1118} else if (OB_FAIL(store.get_backup_sys_time_zone_wrap(time_zone_wrap))) {
1119LOG_WARN("fail to get locality_info", K(ret));
1120} else {
1121break;
1122}
1123}
1124return ret;
1125}
1126
1127ObRestoreFailureChecker::ObRestoreFailureChecker()
1128: is_inited_(false),
1129job_()
1130{
1131}
1132
1133ObRestoreFailureChecker::~ObRestoreFailureChecker()
1134{
1135}
1136
1137int ObRestoreFailureChecker::init(const share::ObPhysicalRestoreJob &job)
1138{
1139int ret = OB_SUCCESS;
1140if (IS_INIT) {
1141ret = OB_INIT_TWICE;
1142LOG_WARN("restore failure checker init twice", K(ret));
1143} else if (!job.is_valid()) {
1144ret = OB_INVALID_ARGUMENT;
1145LOG_WARN("get invalid arg", K(ret), K(job));
1146} else if (OB_FAIL(job_.assign(job))) {
1147LOG_WARN("failed to assign job", K(ret), K(job));
1148} else {
1149is_inited_ = true;
1150}
1151return ret;
1152}
1153
1154int ObRestoreFailureChecker::check_is_concurrent_with_clean(bool &is_concurrent_with_clean)
1155{
1156int ret = OB_SUCCESS;
1157is_concurrent_with_clean = false;
1158if (IS_NOT_INIT) {
1159ret = OB_NOT_INIT;
1160LOG_WARN("[RESTORE_FAILURE_CHECKER]restore failure checker not do init", K(ret));
1161} else if (OB_FAIL(loop_path_list_(job_, is_concurrent_with_clean))) {
1162LOG_WARN("failed to loop path list", K(ret), K_(job));
1163}
1164FLOG_INFO("[RESTORE_FAILURE_CHECKER]check is concurrent with clean", K(ret), K(is_concurrent_with_clean), K_(job));
1165return ret;
1166}
1167
1168int ObRestoreFailureChecker::loop_path_list_(const share::ObPhysicalRestoreJob &job, bool &has_been_cleaned)
1169{
1170int ret = OB_SUCCESS;
1171has_been_cleaned = false;
1172ObBackupDest backup_tenant_dest;
1173const ObPhysicalRestoreBackupDestList& list = job.get_multi_restore_path_list();
1174const common::ObSArray<share::ObBackupSetPath> &backup_set_path_list = list.get_backup_set_path_list();
1175const common::ObSArray<share::ObBackupPiecePath> &backup_piece_path_list = list.get_backup_piece_path_list();
1176
1177ARRAY_FOREACH_X(backup_set_path_list, idx, cnt, OB_SUCC(ret) && !has_been_cleaned) {
1178backup_tenant_dest.reset();
1179const share::ObBackupSetPath &backup_set_path = backup_set_path_list.at(idx);
1180bool is_exist = true;
1181if (OB_FAIL(backup_tenant_dest.set(backup_set_path.ptr()))) {
1182LOG_WARN("failed to set backup tenant dest", K(ret), K(backup_set_path));
1183} else if (OB_FAIL(check_tenant_backup_set_infos_path_exist_(backup_tenant_dest, is_exist))) {
1184LOG_WARN("failed to check tenant backup set infos path exist", K(ret), K(backup_tenant_dest));
1185} else {
1186has_been_cleaned = !is_exist;
1187}
1188}
1189
1190ARRAY_FOREACH_X(backup_piece_path_list, idx, cnt, OB_SUCC(ret) && !has_been_cleaned) {
1191backup_tenant_dest.reset();
1192const share::ObBackupPiecePath &backup_piece_path = backup_piece_path_list.at(idx);
1193bool is_exist = true;
1194bool is_empty = false;
1195if (OB_FAIL(backup_tenant_dest.set(backup_piece_path.ptr()))) {
1196LOG_WARN("failed to set backup tenant dest", K(ret), K(backup_piece_path));
1197} else if (OB_FAIL(check_tenant_archive_piece_infos_path_exist_(backup_tenant_dest, is_exist))) {
1198LOG_WARN("failed to check archive piece infos path exist", K(ret), K(backup_tenant_dest));
1199} else if (OB_FAIL(check_checkpoint_dir_emtpy_(backup_tenant_dest, is_empty))) {
1200LOG_WARN("failed to check checkpoint dir empty", K(ret), K(backup_tenant_dest));
1201} else {
1202has_been_cleaned = !is_exist && is_empty;
1203}
1204}
1205return ret;
1206}
1207
1208// single_backup_set_info
1209int ObRestoreFailureChecker::check_tenant_backup_set_infos_path_exist_(
1210const share::ObBackupDest &backup_set_dest,
1211bool &is_exist)
1212{
1213int ret = OB_SUCCESS;
1214is_exist = false;
1215ObBackupPath backup_path;
1216if (OB_FAIL(ObBackupPathUtil::get_backup_set_info_path(backup_set_dest, backup_path))) {
1217LOG_WARN("failed to get backup set info path", K(ret), K(backup_set_dest));
1218} else if (OB_FAIL(check_path_exist_(backup_path, backup_set_dest.get_storage_info(), is_exist))) {
1219LOG_WARN("failed to check path exist", K(ret));
1220}
1221return ret;
1222}
1223
1224// tenant_archive_piece_infos
1225int ObRestoreFailureChecker::check_tenant_archive_piece_infos_path_exist_(
1226const share::ObBackupDest &backup_set_dest,
1227bool &is_exist)
1228{
1229int ret = OB_SUCCESS;
1230is_exist = false;
1231ObBackupPath backup_path;
1232if (OB_FAIL(ObArchivePathUtil::get_tenant_archive_piece_infos_file_path(backup_set_dest, backup_path))) {
1233LOG_WARN("failed to get tenant archive piece infos file path", K(ret), K(backup_set_dest));
1234} else if (OB_FAIL(check_path_exist_(backup_path, backup_set_dest.get_storage_info(), is_exist))) {
1235LOG_WARN("failed to check path exist", K(ret));
1236}
1237return ret;
1238}
1239
1240int ObRestoreFailureChecker::check_checkpoint_dir_emtpy_(
1241const share::ObBackupDest &backup_tenant_dest,
1242bool &is_empty)
1243{
1244int ret = OB_SUCCESS;
1245is_empty = false;
1246ObBackupPath backup_path;
1247if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_dir_path(backup_tenant_dest, backup_path))) {
1248LOG_WARN("failed to get tenant archive piece infos file path", K(ret), K(backup_tenant_dest));
1249} else if (OB_FAIL(check_dir_empty_(backup_path, backup_tenant_dest.get_storage_info(), is_empty))) {
1250LOG_WARN("failed to check dir empty", K(ret));
1251}
1252return ret;
1253}
1254
1255int ObRestoreFailureChecker::check_path_exist_(
1256const share::ObBackupPath &backup_path,
1257const share::ObBackupStorageInfo *storage_info,
1258bool &is_exist)
1259{
1260int ret = OB_SUCCESS;
1261is_exist = false;
1262ObBackupIoAdapter util;
1263if (OB_FAIL(util.is_exist(backup_path.get_ptr(), storage_info, is_exist))) {
1264LOG_WARN("failed to check is exist", K(ret));
1265}
1266return ret;
1267}
1268
1269int ObRestoreFailureChecker::check_dir_empty_(
1270const share::ObBackupPath &backup_path,
1271const share::ObBackupStorageInfo *storage_info,
1272bool &is_empty)
1273{
1274int ret = OB_SUCCESS;
1275is_empty = false;
1276ObBackupIoAdapter util;
1277if (OB_FAIL(util.is_empty_directory(backup_path.get_ptr(), storage_info, is_empty))) {
1278LOG_WARN("fail to init store", K(ret), K(backup_path));
1279} else {
1280LOG_INFO("is empty dir", K(backup_path), K(is_empty));
1281}
1282return ret;
1283}
1284