oceanbase

Форк
0
/
ob_recover_table_initiator.cpp 
522 строки · 24.7 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "ob_recover_table_initiator.h"
16
#include "lib/hash/ob_hashset.h"
17
#include "lib/charset/ob_charset.h"
18
#include "share/ob_rpc_struct.h"
19
#include "share/backup/ob_backup_data_table_operator.h"
20
#include "rootserver/ob_rs_event_history_table_operator.h"
21
#include "ob_restore_util.h"
22
#include "share/restore/ob_recover_table_persist_helper.h"
23
#include "sql/parser/parse_node.h"
24
#include "rootserver/ddl_task/ob_ddl_task.h"
25
#include "share/restore/ob_import_table_persist_helper.h"
26

27
using namespace oceanbase;
28
using namespace share::schema;
29
using namespace common;
30
using namespace obrpc;
31
using namespace rootserver;
32
using namespace share;
33

34
int ObRecoverTableInitiator::init(
35
    share::schema::ObMultiVersionSchemaService *schema_service, common::ObMySQLProxy *sql_proxy)
36
{
37
  int ret = OB_SUCCESS;
38
  if (IS_INIT) {
39
    ret = OB_INIT_TWICE;
40
    LOG_WARN("ObRecoverTableInitiator init twice", K(ret));
41
  } else if (OB_ISNULL(schema_service) || OB_ISNULL(schema_service)) {
42
    ret = OB_INVALID_ARGUMENT;
43
    LOG_WARN("schema service and sql prxoy must not be null", K(ret));
44
  } else {
45
    schema_service_ = schema_service;
46
    sql_proxy_ = sql_proxy;
47
    is_inited_ = true;
48
  }
49
  return ret;
50
}
51

52
int ObRecoverTableInitiator::initiate_recover_table(const obrpc::ObRecoverTableArg &arg)
53
{
54
  int ret = OB_SUCCESS;
55
  if (IS_NOT_INIT) {
56
    ret = OB_NOT_INIT;
57
    LOG_WARN("ObRecoverTableInitiator is not init", K(ret));
58
  } else if (!arg.is_valid()) {
59
    ret = OB_INVALID_ARGUMENT;
60
    LOG_WARN("invalid ObRecoverTableArg", K(ret), K(arg));
61
  } else if (OB_FAIL(check_before_initiate_(arg))) {
62
    LOG_WARN("failed to check before initiate", K(ret));
63
  } else if (obrpc::ObRecoverTableArg::Action::INITIATE == arg.action_) {
64
    if (OB_FAIL(start_recover_table_(arg))) {
65
      LOG_WARN("failed to start recover table", K(ret), K(arg));
66
    }
67
  } else if (obrpc::ObRecoverTableArg::Action::CANCEL == arg.action_) {
68
    if (OB_FAIL(cancel_recover_table_(arg))) {
69
      LOG_WARN("failed to cancel recover table", K(ret), K(arg));
70
    }
71
  }
72
  return ret;
73
}
74

75
int ObRecoverTableInitiator::is_recover_job_exist(const uint64_t target_tenant_id, bool &is_exist) const
76
{
77
  int ret = OB_SUCCESS;
78
  share::ObRecoverTablePersistHelper table_op;
79
  if (IS_NOT_INIT) {
80
    ret = OB_NOT_INIT;
81
    LOG_WARN("ObRecoverTableInitiator is not init", K(ret));
82
  } else if (!is_user_tenant(target_tenant_id)) {
83
    ret = OB_INVALID_ARGUMENT;
84
    LOG_WARN("invalid target_tenant_id", K(ret), K(target_tenant_id));
85
  } else if (OB_FAIL(table_op.init(OB_SYS_TENANT_ID))) {
86
    LOG_WARN("failed to init sys table op", K(ret));
87
  } else if (OB_FAIL(table_op.is_recover_table_job_exist(*sql_proxy_, target_tenant_id, is_exist))) {
88
    LOG_WARN("failed to check recover table job exist", K(ret), K(target_tenant_id));
89
  }
90
  return ret;
91
}
92

93
int ObRecoverTableInitiator::start_recover_table_(const obrpc::ObRecoverTableArg &arg)
94
{
95
  int ret = OB_SUCCESS;
96
  share::ObRecoverTableJob job;
97
  ObPhysicalRestoreJob physical_restore_job;
98
  if (OB_FALSE_IT(job.set_status(share::ObRecoverTableStatus::PREPARE))) {
99
  } else if (OB_FAIL(job.set_target_tenant_name(arg.tenant_name_))) {
100
    LOG_WARN("failed to set traget tenant name", K(ret));
101
  } else if (OB_FALSE_IT(job.set_target_tenant_id(arg.tenant_id_))) {
102
  } else if (OB_FAIL(job.set_description(arg.restore_tenant_arg_.description_))) {
103
    LOG_WARN("failed to set description", K(ret));
104
  } else if (OB_FAIL(fill_aux_tenant_restore_info_(arg, job, physical_restore_job))) {
105
    LOG_WARN("failed to fill aux tenant resetore info", K(ret), K(arg));
106
  } else if (OB_FAIL(fill_recover_table_arg_(arg, job))) {
107
    LOG_WARN("failed to fill recover table arg", K(ret));
108
  } else if (OB_FAIL(insert_sys_job_(job, physical_restore_job))) {
109
    LOG_WARN("failed to insert sys recover table job", K(ret));
110
  } else {
111
    LOG_INFO("initiate recover table succeed", K(ret), K(job));
112
  }
113
  uint64_t tenant_id = arg.tenant_id_;
114
  int64_t job_id = job.get_job_id();
115
  share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
116
  ROOTSERVICE_EVENT_ADD("recover_table", "start_recover_table", K(tenant_id), K(job_id), K(ret), K(trace_id));
117
  return ret;
118
}
119
int ObRecoverTableInitiator::cancel_recover_table_(const obrpc::ObRecoverTableArg &arg)
120
{
121
  int ret = OB_SUCCESS;
122
  ObMySQLTransaction trans;
123
  share::ObRecoverTablePersistHelper recover_helper;
124
  share::ObImportTableJobPersistHelper import_helper;
125
  uint64_t exec_tenant_id = gen_meta_tenant_id(arg.tenant_id_);
126
  if (OB_FAIL(recover_helper.init(arg.tenant_id_))) {
127
    LOG_WARN("failed to init helper", K(ret), K(arg));
128
  } else if (OB_FAIL(import_helper.init(arg.tenant_id_))) {
129
    LOG_WARN("failed to init helper", K(ret), K(arg));
130
  } else if (OB_FAIL(trans.start(sql_proxy_, exec_tenant_id))) {
131
    LOG_WARN("failed to start trans", K(ret), K(exec_tenant_id));
132
  } else if (OB_FAIL(import_helper.force_cancel_import_job(trans))) {
133
    LOG_WARN("failed to force cancel import job", K(ret), K(arg));
134
  } else if (OB_FAIL(recover_helper.force_cancel_recover_job(trans))) {
135
    LOG_WARN("failed to force cancel recover job", K(ret), K(arg));
136
  }
137
  if (trans.is_started()) {
138
    int tmp_ret = OB_SUCCESS;
139
    if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
140
      ret = OB_SUCC(ret) ? tmp_ret : ret;
141
      LOG_WARN("failed to end trans", K(ret));
142
    }
143
  }
144

145
  ROOTSERVICE_EVENT_ADD("recover_table", "cancel_recover_table", "tenant_id", arg.tenant_id_, "result", ret);
146
  return ret;
147
}
148

149
int ObRecoverTableInitiator::insert_sys_job_(
150
    share::ObRecoverTableJob &job, share::ObPhysicalRestoreJob &physical_restore_job)
151
{
152
  int ret = OB_SUCCESS;
153
  ObMySQLTransaction trans;
154
  int64_t job_id = -1;
155
  if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
156
    LOG_WARN("failed to start trans", K(ret));
157
  } else {
158
    share::ObRecoverTablePersistHelper helper;
159
    if (OB_FAIL(ObLSBackupInfoOperator::get_next_job_id(trans, OB_SYS_TENANT_ID, job_id))) {
160
      LOG_WARN("failed to get next job_id", K(ret));
161
    } else if (OB_FALSE_IT(job.set_tenant_id(OB_SYS_TENANT_ID))) {
162
    } else if (OB_FALSE_IT(job.set_initiator_tenant_id(OB_SYS_TENANT_ID))) {
163
    } else if (OB_FALSE_IT(job.set_job_id(job_id))) {
164
    } else if (OB_FALSE_IT(job.set_initiator_job_id(0/*sys job default value*/))) {
165
    } else if (OB_FALSE_IT(job.set_start_ts(ObTimeUtility::current_time()))) {
166
    } else if (OB_FAIL(helper.init(OB_SYS_TENANT_ID))) {
167
      LOG_WARN("failed to init sys table op", K(ret));
168
    } else if (OB_FAIL(helper.insert_recover_table_job(trans, job))) {
169
      LOG_WARN("failed to insert initital recover table job", K(ret), K(job));
170
    }
171

172
    if (FAILEDx(RS_JOB_CREATE_EXT(job_id, RESTORE_TENANT, trans, "sql_text", "restore aux tenant"))) {
173
      LOG_WARN("failed to get job id", K(ret));
174
    } else if (OB_FALSE_IT(physical_restore_job.init_restore_key(OB_SYS_TENANT_ID, job_id))) {
175
    } else if (OB_FALSE_IT(physical_restore_job.set_restore_start_ts(ObTimeUtility::current_time()))) {
176
    } else if (OB_FALSE_IT(physical_restore_job.set_initiator_job_id(job.get_job_id()))) {
177
    } else if (OB_FALSE_IT(physical_restore_job.set_initiator_tenant_id(OB_SYS_TENANT_ID))) {
178
    } else if (OB_FALSE_IT(physical_restore_job.set_recover_table(true))) {
179
    } else if (OB_FAIL(ObRestoreUtil::record_physical_restore_job(trans, physical_restore_job))) {
180
      LOG_WARN("failed to record physical restore job", K(ret));
181
    }
182
    if (OB_SUCC(ret)) {
183
      if (OB_FAIL(trans.end(true))) {
184
        LOG_WARN("failed to commit trans", K(ret));
185
      }
186
    } else {
187
      int tmp_ret = OB_SUCCESS;
188
      if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
189
        LOG_WARN("failed to rollback trans", K(tmp_ret));
190
      }
191
    }
192
  }
193
  return ret;
194
}
195

196
int ObRecoverTableInitiator::check_before_initiate_(const obrpc::ObRecoverTableArg &arg)
197
{
198
  int ret = OB_SUCCESS;
199
  schema::ObSchemaGetterGuard guard;
200
  uint64_t target_tenant_id = 0;
201
  if (!is_user_tenant(arg.tenant_id_)) {
202
    ret = OB_INVALID_ARGUMENT;
203
    LOG_USER_ERROR(OB_INVALID_ARGUMENT, "TENANT, it must be user tenant");
204
    LOG_WARN("invlaid tenant id, it must be user tenant", K(ret), K(arg.tenant_id_));
205
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
206
    LOG_WARN("failed to get sys schema guard", K(ret));
207
  } else if (OB_FAIL(guard.get_tenant_id(arg.tenant_name_, target_tenant_id))) {
208
    LOG_WARN("failed to get tenant id", K(ret), K(arg.tenant_name_));
209
  } else if (arg.tenant_id_ != target_tenant_id) {
210
    ret = OB_ERR_UNEXPECTED;
211
    LOG_WARN("arg tenant id and tenant name must be couple", K(ret), K(arg));
212
  }
213
  return ret;
214
}
215

216
int ObRecoverTableInitiator::fill_aux_tenant_name_(share::ObRecoverTableJob &job)
217
{
218
  int ret = OB_SUCCESS;
219
  char aux_tenant_name[OB_MAX_TENANT_NAME_LENGTH] = "";
220
  if (OB_FAIL(databuff_printf(aux_tenant_name, OB_MAX_TENANT_NAME_LENGTH, "AUX_RECOVER$%ld", ObTimeUtility::current_time()))) {
221
    LOG_WARN("failed to generate aux tenant name", K(ret));
222
  } else if (OB_FAIL(job.set_aux_tenant_name(ObString(aux_tenant_name)))) {
223
    LOG_WARN("failed to set aux tenant name", K(ret), K(aux_tenant_name));
224
  }
225
  return ret;
226
}
227

228
int ObRecoverTableInitiator::fill_aux_tenant_restore_info_(
229
    const obrpc::ObRecoverTableArg &arg, share::ObRecoverTableJob &job, share::ObPhysicalRestoreJob &physical_restore_job)
230
{
231
  int ret = OB_SUCCESS;
232
  obrpc::ObPhysicalRestoreTenantArg tenant_restore_arg;
233
  if (OB_FAIL(fill_aux_tenant_name_(job))) {
234
    LOG_WARN("failed to fill aux tenant name", K(ret));
235
  } else if (OB_FAIL(tenant_restore_arg.assign(arg.restore_tenant_arg_))) {
236
    LOG_WARN("failed to assign tenant restore arg", K(ret), K(arg.restore_tenant_arg_));
237
  } else if (OB_FALSE_IT(tenant_restore_arg.tenant_name_ = job.get_aux_tenant_name())) {
238
  } else if (OB_FAIL(ObRestoreUtil::fill_physical_restore_job(1/*fake job id*/, tenant_restore_arg, physical_restore_job))) {
239
    LOG_WARN("failed to fill physical restore job", K(ret), K(tenant_restore_arg));
240
  } else if (OB_FALSE_IT(job.set_restore_scn(physical_restore_job.get_restore_scn()))) {
241
  } else if (OB_FAIL(job.set_restore_option(physical_restore_job.get_restore_option()))) {
242
    LOG_WARN("failed to set restore option", K(ret));
243
  } else if (OB_FAIL(job.set_backup_dest(physical_restore_job.get_backup_dest()))) {
244
    LOG_WARN("failed to set backup dest", K(ret));
245
  } else if (OB_FAIL(job.set_external_kms_info(physical_restore_job.get_kms_info()))) {
246
    LOG_WARN("failed to set kms info", K(ret));
247
  } else if (OB_FAIL(job.set_backup_passwd(physical_restore_job.get_passwd_array()))) {
248
    LOG_WARN("failed to set backup passwd", K(ret));
249
  } else if (OB_FAIL(job.get_multi_restore_path_list().assign(physical_restore_job.get_multi_restore_path_list()))) {
250
    LOG_WARN("faield to assign multi restore path", K(ret));
251
  }
252
  return ret;
253
}
254

255
int ObRecoverTableInitiator::fill_recover_database(
256
    const share::ObImportArg &import_arg,
257
    share::ObImportTableArg &import_table_arg)
258
{
259
  //TODO(zeyong) move duplicate item checking logic to ObImportArg internal later.
260
  int ret = OB_SUCCESS;
261
  const share::ObImportDatabaseArray &db_array = import_arg.get_import_database_array();
262
  ARRAY_FOREACH(db_array.get_items(), i) {
263
    const share::ObImportDatabaseItem db_item = db_array.get_items().at(i);
264
    if (OB_FAIL(import_table_arg.add_database(db_item))) {
265
      LOG_WARN("failed to add database", K(ret));
266
    }
267
  }
268
  if (OB_SUCC(ret)) {
269
    LOG_INFO("succeed fill recover database", K(import_arg), K(db_array), K(import_table_arg.get_import_database_array()));
270
  }
271
  return ret;
272
}
273

274
int ObRecoverTableInitiator::fill_recover_table(
275
    const share::ObImportArg &import_arg,
276
    share::ObImportTableArg &import_table_arg)
277
{
278
  int ret = OB_SUCCESS;
279
  const share::ObImportTableArray &table_array = import_arg.get_import_table_array();
280
  bool is_dup = false;
281
  ObSqlString dup_item_str;
282
  ARRAY_FOREACH(table_array.get_items(), i) {
283
    const share::ObImportTableItem table_item = table_array.get_items().at(i);
284
    share::ObImportDatabaseItem db_item(table_item.mode_, table_item.database_name_.ptr(), table_item.database_name_.length());
285
    if (OB_FAIL(import_table_arg.check_database_dup(db_item, is_dup, dup_item_str))) {
286
      LOG_WARN("failed to check database dup", K(ret));
287
    } else if (is_dup) {
288
      ret = OB_NOT_SUPPORTED;
289
      LOG_WARN("duplicate database", K(table_item));
290
      LOG_USER_ERROR(OB_NOT_SUPPORTED, dup_item_str.ptr());
291
    } else if (OB_FAIL(import_table_arg.add_table(table_item))) {
292
      LOG_WARN("failed to add table", K(ret));
293
    }
294
  }
295
  if (OB_SUCC(ret)) {
296
    LOG_INFO("succeed fill recover table", K(import_arg), K(table_array), K(import_table_arg.get_import_table_array()));
297
  }
298
  return ret;
299
}
300

301
int ObRecoverTableInitiator::fill_recover_partition(
302
    const share::ObImportArg &import_arg,
303
    share::ObImportTableArg &import_table_arg)
304
{
305
  int ret = OB_SUCCESS;
306
  bool is_dup = true;
307
  ObSqlString dup_item_str;
308
  const share::ObImportPartitionArray &partition_array = import_arg.get_import_partition_array();
309
  ARRAY_FOREACH(partition_array.get_items(), i) {
310
    const share::ObImportPartitionItem partition_item = partition_array.get_items().at(i);
311
    share::ObImportDatabaseItem db_item(partition_item.mode_,
312
                                                partition_item.database_name_.ptr(),
313
                                                partition_item.database_name_.length());
314
    share::ObImportTableItem table_item(partition_item.mode_,
315
                                                partition_item.database_name_.ptr(),
316
                                                partition_item.database_name_.length(),
317
                                                partition_item.table_name_.ptr(),
318
                                                partition_item.table_name_.length());
319
    if (OB_FAIL(import_table_arg.check_database_dup(db_item, is_dup, dup_item_str))) {
320
      LOG_WARN("failed to check database dup", K(ret));
321
    } else if (is_dup) {
322
      ret = OB_NOT_SUPPORTED;
323
      LOG_WARN("duplicate database", K(table_item));
324
      LOG_USER_ERROR(OB_NOT_SUPPORTED, dup_item_str.ptr());
325
    } else if (OB_FAIL(import_table_arg.check_table_dup(table_item, is_dup, dup_item_str))) {
326
      LOG_WARN("failed to check table dup", K(ret));
327
    } else if (is_dup) {
328
      ret = OB_NOT_SUPPORTED;
329
      LOG_WARN("duplicate table", K(table_item));
330
      LOG_USER_ERROR(OB_NOT_SUPPORTED, dup_item_str.ptr());
331
    } else if (OB_FAIL(import_table_arg.add_partition(partition_item))) {
332
      LOG_WARN("failed to add partition", K(ret));
333
    }
334
  }
335
  if (OB_SUCC(ret)) {
336
    LOG_INFO("succeed fill recover partition", K(import_arg), K(partition_array), K(import_table_arg.get_import_partition_array()));
337
  }
338
  return ret;
339
}
340

341
int ObRecoverTableInitiator::fill_remap_database(
342
    const share::ObImportArg &import_arg,
343
    const share::ObImportTableArg &import_table_arg,
344
    share::ObImportRemapArg &import_remap_arg)
345
{
346
  int ret = OB_SUCCESS;
347
  bool is_dup = true;
348
  ObSqlString dup_item_str;
349
  const share::ObRemapDatabaseArray &remap_db_array = import_arg.get_remap_database_array();
350
  ARRAY_FOREACH(remap_db_array.get_remap_items(), i) {
351
    const share::ObRemapDatabaseItem remap_db_item = remap_db_array.get_remap_items().at(i);
352
    const share::ObImportDatabaseItem src_db_item(remap_db_item.src_.mode_,
353
                                                          remap_db_item.src_.name_.ptr(),
354
                                                          remap_db_item.src_.name_.length());
355
    const share::ObImportDatabaseItem target_db_item(remap_db_item.target_.mode_,
356
                                                             remap_db_item.target_.name_.ptr(),
357
                                                             remap_db_item.target_.name_.length());
358
    if (OB_FAIL(import_table_arg.check_database_dup(src_db_item, is_dup, dup_item_str))) {
359
      LOG_WARN("failed to check database dup", K(ret));
360
    } else if (!is_dup) {
361
      ret = OB_NOT_SUPPORTED;
362
      LOG_WARN("remap not exist database", K(src_db_item));
363
      LOG_USER_ERROR(OB_NOT_SUPPORTED, "remap not exist recover database");
364
    } else if (OB_FAIL(import_table_arg.check_database_dup(target_db_item, is_dup, dup_item_str))) {
365
      LOG_WARN("failed to check dup", K(ret));
366
    } else if (is_dup) {
367
      ret = OB_NOT_SUPPORTED;
368
      LOG_WARN("remap exist database", K(src_db_item));
369
      LOG_USER_ERROR(OB_NOT_SUPPORTED, dup_item_str.ptr());
370
    } else if (OB_FAIL(import_remap_arg.add_remap_database(remap_db_item))) {
371
      LOG_WARN("failed to add database", K(ret));
372
    }
373
  }
374
  if (OB_SUCC(ret)) {
375
    LOG_INFO("succeed fill remap database", K(import_arg), K(remap_db_array), K(import_remap_arg.get_remap_database_array()));
376
  }
377
  return ret;
378
}
379

380
int ObRecoverTableInitiator::fill_remap_table(
381
    const share::ObImportArg &import_arg,
382
    const share::ObImportTableArg &import_table_arg,
383
    share::ObImportRemapArg &import_remap_arg)
384
{
385
  int ret = OB_SUCCESS;
386
  bool is_dup = true;
387
  ObSqlString dup_item_str;
388
  const share::ObRemapTableArray &remap_table_array = import_arg.get_remap_table_array();
389
  ARRAY_FOREACH(remap_table_array.get_remap_items(), i) {
390
    const share::ObRemapTableItem remap_table_item = remap_table_array.get_remap_items().at(i);
391
    const share::ObImportTableItem src_table_item(remap_table_item.src_.mode_,
392
                                                          remap_table_item.src_.database_name_.ptr(),
393
                                                          remap_table_item.src_.database_name_.length(),
394
                                                          remap_table_item.src_.table_name_.ptr(),
395
                                                          remap_table_item.src_.table_name_.length());
396
    const share::ObImportTableItem target_table_item(remap_table_item.target_.mode_,
397
                                                             remap_table_item.target_.database_name_.ptr(),
398
                                                             remap_table_item.target_.database_name_.length(),
399
                                                             remap_table_item.target_.table_name_.ptr(),
400
                                                             remap_table_item.target_.table_name_.length());
401
    if (OB_FAIL(import_table_arg.check_table_dup(src_table_item, is_dup, dup_item_str))) {
402
      LOG_WARN("failed to check table dup", K(ret));
403
    } else if (!is_dup) {
404
      ret = OB_NOT_SUPPORTED;
405
      LOG_WARN("remap not exist table", K(src_table_item));
406
      LOG_USER_ERROR(OB_NOT_SUPPORTED, "remap not exist recover table");
407
    } else if (OB_FAIL(import_table_arg.check_table_dup(target_table_item, is_dup, dup_item_str))) {
408
    } else if (is_dup) {
409
      ret = OB_NOT_SUPPORTED;
410
      LOG_WARN("remap exist table", K(target_table_item));
411
      LOG_USER_ERROR(OB_NOT_SUPPORTED, dup_item_str.ptr());
412
    } else if (OB_FAIL(import_remap_arg.add_remap_table(remap_table_item))) {
413
      LOG_WARN("failed to add remap table", K(ret));
414
    }
415
  }
416
  if (OB_SUCC(ret)) {
417
    LOG_INFO("succeed fill remap table", K(import_arg), K(remap_table_array), K(import_remap_arg.get_remap_table_array()));
418
  }
419
  return ret;
420
}
421

422
int ObRecoverTableInitiator::fill_remap_partition(
423
    const share::ObImportArg &import_arg,
424
    const share::ObImportTableArg &import_table_arg,
425
    share::ObImportRemapArg &import_remap_arg)
426
{
427
  int ret = OB_SUCCESS;
428
  bool is_dup = true;
429
  ObSqlString dup_item_str;
430
  const share::ObRemapPartitionArray &remap_partition_array = import_arg.get_remap_partition_array();
431
  ARRAY_FOREACH(remap_partition_array.get_remap_items(), i) {
432
    const share::ObRemapPartitionItem remap_part_item = remap_partition_array.get_remap_items().at(i);
433
    const share::ObImportPartitionItem src_part_item(remap_part_item.src_.mode_,
434
                                                          remap_part_item.src_.database_name_.ptr(),
435
                                                          remap_part_item.src_.database_name_.length(),
436
                                                          remap_part_item.src_.table_name_.ptr(),
437
                                                          remap_part_item.src_.table_name_.length(),
438
                                                          remap_part_item.src_.partition_name_.ptr(),
439
                                                          remap_part_item.src_.partition_name_.length());
440
    const share::ObImportTableItem target_table_item(remap_part_item.target_.mode_,
441
                                                             remap_part_item.target_.database_name_.ptr(),
442
                                                             remap_part_item.target_.database_name_.length(),
443
                                                             remap_part_item.target_.table_name_.ptr(),
444
                                                             remap_part_item.target_.table_name_.length());
445
    if (OB_FAIL(import_table_arg.check_partion_dup(src_part_item, is_dup, dup_item_str))) {
446
      LOG_WARN("failed to check dup", K(ret));
447
    } else if (!is_dup) {
448
      ret = OB_NOT_SUPPORTED;
449
      LOG_WARN("remap not exist partition", K(src_part_item));
450
      LOG_USER_ERROR(OB_NOT_SUPPORTED, "remap not exist recover partition");
451
    } else if (OB_FAIL(import_table_arg.check_table_dup(target_table_item, is_dup, dup_item_str))) {
452
      LOG_WARN("failed to check dup", K(ret));
453
    } else if (is_dup) {
454
      ret = OB_NOT_SUPPORTED;
455
      LOG_WARN("remap exist partition", K(target_table_item));
456
      LOG_USER_ERROR(OB_NOT_SUPPORTED, dup_item_str.ptr());
457
    } else if (OB_FAIL(import_remap_arg.add_remap_parition(remap_part_item))) {
458
      LOG_WARN("failed to add remap partition", K(ret));
459
    }
460
  }
461
  return ret;
462
}
463

464
int ObRecoverTableInitiator::fill_remap_tablespace(
465
    const share::ObImportArg &import_arg,
466
    share::ObImportRemapArg &import_remap_arg)
467
{
468
  int ret = OB_SUCCESS;
469
  const share::ObRemapTablespaceArray &remap_tablespace_array = import_arg.get_remap_tablespace_array();
470
  ARRAY_FOREACH(remap_tablespace_array.get_remap_items(), i) {
471
    const share::ObRemapTablespaceItem remap_tablespace_item = remap_tablespace_array.get_remap_items().at(i);
472
    if (OB_FAIL(import_remap_arg.add_remap_tablespace(remap_tablespace_item))) {
473
      LOG_WARN("failed to add tablespace", K(ret));
474
    }
475
  }
476
  return ret;
477
}
478

479
int ObRecoverTableInitiator::fill_remap_tablegroup(
480
    const share::ObImportArg &import_arg,
481
    share::ObImportRemapArg &import_remap_arg)
482
{
483
  int ret = OB_SUCCESS;
484
  const share::ObRemapTablegroupArray &remap_tablegroup_array = import_arg.get_remap_tablegroup_array();
485
  ARRAY_FOREACH(remap_tablegroup_array.get_remap_items(), i) {
486
    const share::ObRemapTablegroupItem remap_tablegroup_item = remap_tablegroup_array.get_remap_items().at(i);
487
    if (OB_FAIL(import_remap_arg.add_remap_tablegroup(remap_tablegroup_item))) {
488
      LOG_WARN("failed to add tablespace", K(ret));
489
    }
490
  }
491
  return ret;
492
}
493

494

495
int ObRecoverTableInitiator::fill_recover_table_arg_(
496
    const obrpc::ObRecoverTableArg &arg, share::ObRecoverTableJob &job)
497
{
498
  int ret = OB_SUCCESS;
499
  share::ObImportTableArg &import_table_arg = job.get_import_arg().get_import_table_arg();
500
  share::ObImportRemapArg &import_remap_arg = job.get_import_arg().get_remap_table_arg();
501
  LOG_INFO("succeed fill arg", K(arg), K(import_table_arg), K(import_remap_arg));
502
  if (arg.import_arg_.get_import_table_arg().is_import_all()) {
503
    import_table_arg.set_import_all();
504
  } else if (OB_FAIL(fill_recover_database(arg.import_arg_, import_table_arg))) {
505
    LOG_WARN("failed to recover database", K(ret), K(arg.import_arg_));
506
  } else if (OB_FAIL(fill_recover_table(arg.import_arg_, import_table_arg))) {
507
    LOG_WARN("failed to recover table", K(ret), K(arg.import_arg_));
508
  } else if (OB_FAIL(fill_recover_partition(arg.import_arg_, import_table_arg))) {
509
    LOG_WARN("failed to recover partition", K(ret), K(arg.import_arg_));
510
  } else if (OB_FAIL(fill_remap_database(arg.import_arg_, import_table_arg, import_remap_arg))) {
511
    LOG_WARN("failed to remap database", K(ret), K(arg.import_arg_));
512
  } else if (OB_FAIL(fill_remap_table(arg.import_arg_, import_table_arg, import_remap_arg))) {
513
    LOG_WARN("failed to remap table", K(ret), K(arg.import_arg_));
514
  } else if (OB_FAIL(fill_remap_partition(arg.import_arg_, import_table_arg, import_remap_arg))) {
515
    LOG_WARN("failed to remap partition", K(ret), K(arg.import_arg_));
516
  } else if (OB_FAIL(fill_remap_tablespace(arg.import_arg_, import_remap_arg))) {
517
    LOG_WARN("failed to remap tablespace", K(ret), K(arg.import_arg_));
518
  } else if (OB_FAIL(fill_remap_tablegroup(arg.import_arg_, import_remap_arg))) {
519
    LOG_WARN("failed to remap tablegroup", K(ret), K(arg.import_arg_));
520
  }
521
  return ret;
522
}

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.