oceanbase

Форк
0
/
ob_import_table_job_scheduler.cpp 
920 строк · 38.4 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12
#define USING_LOG_PREFIX RS
13

14
#include "ob_import_table_job_scheduler.h"
15
#include "ob_recover_table_initiator.h"
16
#include "lib/mysqlclient/ob_mysql_transaction.h"
17
#include "storage/ddl/ob_ddl_server_client.h"
18
#include "share/backup/ob_backup_struct.h"
19
#include "rootserver/ddl_task/ob_ddl_task.h"
20
#include "sql/engine/cmd/ob_ddl_executor_util.h"
21
#include "share/ob_ddl_error_message_table_operator.h"
22
#include "rootserver/ob_rs_event_history_table_operator.h"
23
#include "share/restore/ob_import_util.h"
24
#include "rootserver/restore/ob_restore_service.h"
25

26
using namespace oceanbase;
27
using namespace rootserver;
28
using namespace common;
29
using namespace share;
30

31
ObImportTableJobScheduler::ObImportTableJobScheduler()
32
  : is_inited_(false),
33
    tenant_id_(OB_INVALID_TENANT_ID),
34
    schema_service_(nullptr),
35
    sql_proxy_(nullptr),
36
    job_helper_(),
37
    task_helper_()
38
{}
39

40
int ObImportTableJobScheduler::init(
41
    share::schema::ObMultiVersionSchemaService &schema_service,
42
    common::ObMySQLProxy &sql_proxy)
43
{
44
  int ret = OB_SUCCESS;
45
  const uint64_t tenant_id = gen_user_tenant_id(MTL_ID());
46
  if (IS_INIT) {
47
    ret = OB_INIT_TWICE;
48
    LOG_WARN("ObImportTableJobScheduler init twice", K(ret));
49
  } else if (OB_FAIL(job_helper_.init(tenant_id))) {
50
    LOG_WARN("failed to init table op", K(ret), K(tenant_id));
51
  } else if (OB_FAIL(task_helper_.init(tenant_id))) {
52
    LOG_WARN("failed to init table op", K(ret), K(tenant_id));
53
  } else {
54
    schema_service_ = &schema_service;
55
    sql_proxy_ = &sql_proxy;
56
    tenant_id_ = tenant_id;
57
    is_inited_ = true;
58
  }
59
  return ret;
60
}
61

62
void ObImportTableJobScheduler::wakeup_()
63
{
64
  ObRestoreService *restore_service = nullptr;
65
  if (OB_ISNULL(restore_service = MTL(ObRestoreService *))) {
66
    LOG_ERROR_RET(OB_ERR_UNEXPECTED, "restore service must not be null");
67
  } else {
68
    restore_service->wakeup();
69
  }
70
}
71

72
void ObImportTableJobScheduler::do_work()
73
{
74
  int ret = OB_SUCCESS;
75
  uint64_t data_version = 0;
76
  ObArray<share::ObImportTableJob> jobs;
77
  if (IS_NOT_INIT) {
78
    ret = OB_NOT_INIT;
79
    LOG_WARN("not init ObImportTableJobScheduler", K(ret));
80
  } else if (is_sys_tenant(tenant_id_)) {
81
    // no import table job in sys tenant
82
  } else if (OB_FAIL(check_compatible_())) {
83
    LOG_WARN("check compatible failed", K(ret));
84
  } else if (OB_FAIL(job_helper_.get_all_import_table_jobs(*sql_proxy_, jobs))) {
85
    LOG_WARN("failed to get recover all recover table job", K(ret));
86
  } else {
87
    ObCurTraceId::init(GCTX.self_addr());
88
    ARRAY_FOREACH(jobs, i) {
89
      ObImportTableJob &job = jobs.at(i);
90
      if (!job.is_valid()) {
91
        ret = OB_ERR_UNEXPECTED;
92
        LOG_WARN("recover table job is not valid", K(ret), K(job));
93
      } else if (is_user_tenant(job.get_tenant_id())) {
94
        process_(job);
95
      } else {
96
        ret = OB_ERR_UNEXPECTED;
97
        LOG_WARN("invalid tenant", K(ret), K(job));
98
      }
99
    }
100
  }
101
}
102

103
int ObImportTableJobScheduler::check_compatible_() const
104
{
105
  int ret = OB_SUCCESS;
106
  uint64_t data_version = 0;
107
  if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, data_version))) {
108
    LOG_WARN("fail to get data version", K(ret), K_(tenant_id));
109
  } else if (data_version < DATA_VERSION_4_2_1_0) {
110
    ret = OB_OP_NOT_ALLOW;
111
    LOG_WARN("min data version is smaller than v4.2.1", K(ret), K_(tenant_id), K(data_version));
112
  } else if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(tenant_id_), data_version))) {
113
    LOG_WARN("fail to get data version", K(ret), "tenant_id", gen_meta_tenant_id(tenant_id_));
114
  } else if (data_version < DATA_VERSION_4_2_1_0) {
115
    ret = OB_OP_NOT_ALLOW;
116
    LOG_WARN("min data version is smaller than v4.2.1", K(ret), K_(tenant_id), K(data_version));
117
  }
118
  return ret;
119
}
120

121
int ObImportTableJobScheduler::process_(share::ObImportTableJob &job)
122
{
123
  int ret = OB_SUCCESS;
124
  bool is_dropped = false;
125
  if (OB_FAIL(schema_service_->check_if_tenant_has_been_dropped(job.get_src_tenant_id(), is_dropped))) {
126
    LOG_WARN("failed to check if tenant has been dropped", K(ret), "tenant_id", job.get_src_tenant_id());
127
  } else if (!is_dropped && OB_FAIL(wait_src_tenant_schema_refreshed_(job.get_src_tenant_id()))) {
128
    if (OB_SCHEMA_EAGAIN != ret) {
129
      LOG_WARN("failed to wait src tenant schema refreshed", K(ret), K(job));
130
    }
131
  } else {
132
    switch(job.get_status()) {
133
      case ObImportTableJobStatus::INIT: {
134
        if (OB_FAIL(gen_import_table_task_(job))) {
135
          LOG_WARN("failed to gen import table task", K(ret), K(job));
136
        }
137
        break;
138
      }
139
      case ObImportTableJobStatus::IMPORT_TABLE: {
140
        if (OB_FAIL(deal_with_import_table_task_(job))) {
141
          LOG_WARN("failed to deal with import table task", K(ret), K(job));
142
        }
143
        break;
144
      }
145
      case ObImportTableJobStatus::RECONSTRUCT_REF_CONSTRAINT: {
146
        if (OB_FAIL(reconstruct_ref_constraint_(job))) {
147
          LOG_WARN("failed to deal with reconstrcut ref constraint", K(ret));
148
        }
149
        break;
150
      }
151
      case ObImportTableJobStatus::CANCELING: {
152
        if (OB_FAIL(canceling_(job))) {
153
          LOG_WARN("failed to cancel", K(ret), K(job));
154
        }
155
        break;
156
      }
157
      case ObImportTableJobStatus::IMPORT_FINISH: {
158
        if (OB_FAIL(finish_(job))) {
159
          LOG_WARN("failed to cancel", K(ret), K(job));
160
        }
161
        break;
162
      }
163
      default: {
164
        ret = OB_ERR_SYS;
165
        LOG_WARN("invalid import job status", K(ret));
166
        break;
167
      }
168
    }
169
  }
170
  return ret;
171
}
172

173
int ObImportTableJobScheduler::wait_src_tenant_schema_refreshed_(const uint64_t tenant_id)
174
{
175
  // Only if the aux tenant schema refreshed to newest, then we can confirm src table exist or not.
176
  int ret = OB_SUCCESS;
177
  int64_t max_schema_version = OB_INVALID_VERSION;
178
  ObSchemaService *sql_schema_service = nullptr;
179
  ObRefreshSchemaStatus status;
180
  status.tenant_id_ = tenant_id;
181
  MTL_SWITCH (OB_SYS_TENANT_ID) {
182
    if (OB_ISNULL(schema_service_)) {
183
      ret = OB_ERR_UNEXPECTED;
184
      LOG_WARN("schema_service_ is null", K(ret));
185
    } else if (OB_ISNULL(sql_schema_service = schema_service_->get_schema_service())) {
186
      ret = OB_ERR_UNEXPECTED;
187
      LOG_WARN("sql_schema_service is null", K(ret));
188
    } else if (OB_FAIL(sql_schema_service->fetch_schema_version(status, *sql_proxy_, max_schema_version))) {
189
      LOG_WARN("fail to fetch max schema version", K(ret), K(tenant_id), K(status));
190
    } else {
191
      int64_t refreshed_schema_version = 0;
192
      if (OB_FAIL(schema_service_->get_tenant_refreshed_schema_version(
193
        tenant_id, refreshed_schema_version))) {
194
        LOG_WARN("get refreshed schema version failed", K(ret), K(tenant_id));
195
      } else if (!ObSchemaService::is_formal_version(refreshed_schema_version) || refreshed_schema_version < max_schema_version) {
196
        ret = OB_SCHEMA_EAGAIN;
197
        if (REACH_TIME_INTERVAL(1000L * 1000L)) {
198
          LOG_WARN("tenant schema not refreshed to the newest version", K(ret), K(tenant_id), K(max_schema_version), K(refreshed_schema_version));
199
        }
200
      }
201
    }
202
  }
203

204
  return ret;
205
}
206

207
int ObImportTableJobScheduler::reconstruct_ref_constraint_(share::ObImportTableJob &job)
208
{
209
  int ret = OB_SUCCESS;
210
  ObArray<share::ObImportTableTask> import_tasks;
211
  ObImportTableJobStatus next_status = ObImportTableJobStatus::get_next_status(job.get_status());
212
  LOG_INFO("[IMPORT_TABLE]start reconstruct ref constraint", K(job));
213
  if (OB_FAIL(get_import_table_tasks_(job, import_tasks))) {
214
      LOG_WARN("failed to get import table task", K(ret));
215
  } else if (OB_FALSE_IT(job.set_end_ts(ObTimeUtility::current_time()))) {
216
  } else if (OB_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
217
    LOG_WARN("failed to advance status", K(ret), K(job), K(next_status));
218
  } else {
219
    LOG_INFO("[IMPORT_TABLE]finish reconstruct ref constraint", K(job));
220
    ROOTSERVICE_EVENT_ADD("import_table", "reconstruct_ref_constraint",
221
                          "tenant_id", job.get_tenant_id(),
222
                          "job_id", job.get_job_id());
223
  }
224
  return ret;
225
}
226

227
int ObImportTableJobScheduler::finish_(const share::ObImportTableJob &job)
228
{
229
  int ret = OB_SUCCESS;
230
  const uint64_t tenant_id = job.get_tenant_id();
231
  const int64_t job_id = job.get_job_id();
232
  if (OB_FAIL(task_helper_.move_import_task_to_history(*sql_proxy_, tenant_id, job_id))) {
233
    LOG_WARN("failed to move import task to history", K(ret), K(tenant_id), K(job_id));
234
  } else if (OB_FAIL(job_helper_.move_import_job_to_history(*sql_proxy_, tenant_id, job_id))) {
235
    LOG_WARN("failed to move import job to history", K(ret), K(tenant_id), K(job_id));
236
  } else {
237
    LOG_INFO("[IMPORT_TABLE]import table job finish", K(job));
238
    ROOTSERVICE_EVENT_ADD("import_table", "import table finish",
239
                          "tenant_id", job.get_tenant_id(),
240
                          "job_id", job.get_job_id());
241
  }
242
  return ret;
243
}
244

245
int ObImportTableJobScheduler::gen_import_table_task_(share::ObImportTableJob &job)
246
{
247
  int ret = OB_SUCCESS;
248
  ObImportTableTaskGenerator generator;
249
  ObArray<oceanbase::share::ObImportTableTask> import_tasks;
250
  ObMySQLTransaction trans;
251
  uint64_t meta_tenant_id = gen_meta_tenant_id(job.get_tenant_id());
252
  DEBUG_SYNC(BEFORE_GENERATE_IMPORT_TABLE_TASK);
253
  if (OB_FAIL(generator.init(*schema_service_, *sql_proxy_))) {
254
    LOG_WARN("failed to init import task generator", K(ret));
255
  } else if (OB_FAIL(generator.gen_import_task(job, import_tasks))) {
256
    LOG_WARN("failed to gen import table task", K(ret), K(job));
257
    if (!ObImportTableUtil::can_retrieable_err(ret)) {
258
      int tmp_ret = OB_SUCCESS;
259
      ObImportTableJobStatus next_status(ObImportTableJobStatus::IMPORT_FINISH);
260
      job.set_end_ts(ObTimeUtility::current_time());
261

262
      if (!job.get_result().is_comment_setted()) {
263
        share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
264
        ObImportResult result;
265
        if (OB_TMP_FAIL(result.set_result(ret, trace_id, GCONF.self_addr_))) {
266
          LOG_WARN("failed to set result", K(ret));
267
        } else {
268
          job.set_result(result);
269
        }
270
      }
271

272
      if (OB_TMP_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
273
        LOG_WARN("failed to advance status", K(ret));
274
      }
275
    }
276
  } else if (OB_FAIL(trans.start(sql_proxy_, meta_tenant_id))) {
277
    LOG_WARN("failed to start trans", K(ret), K(meta_tenant_id));
278
  } else {
279
    ARRAY_FOREACH(import_tasks, i) {
280
      const ObImportTableTask &task = import_tasks.at(i);
281
      if (OB_FAIL(persist_import_table_task_(trans, task))) {
282
        LOG_WARN("failed to persist import table task", K(ret), K(task));
283
      } else {
284
        job.set_total_bytes(job.get_total_bytes() + task.get_total_bytes());
285
        job.set_total_table_count(job.get_total_table_count() + 1);
286
      }
287
    }
288

289
    ObImportTableJobStatus next_status = ObImportTableJobStatus::get_next_status(job.get_status());
290
    if (OB_FAIL(ret)) {
291
    } else if (OB_FAIL(job_helper_.report_import_job_statistics(*sql_proxy_, job))) {
292
      LOG_WARN("failed to report import job statistics", K(ret));
293
    } else if (!next_status.is_valid()) {
294
      ret = OB_ERR_UNEXPECTED;
295
      LOG_WARN("error import table job status is unexpected", K(ret), K(next_status));
296
    } else if (OB_FAIL(advance_status_(trans, job, next_status))) {
297
      LOG_WARN("failed to advance to next status", K(ret));
298
    }
299

300
    if (OB_SUCC(ret)) {
301
      if (OB_FAIL(trans.end(true))) {
302
        LOG_WARN("failed to commit", K(ret));
303
      } else {
304
        LOG_INFO("[IMPORT_TABLE] succeed generate import table task", K(import_tasks), K(next_status));
305
        ROOTSERVICE_EVENT_ADD("import_table", "generate import table task",
306
                              "tenant_id", job.get_tenant_id(),
307
                              "job_id", job.get_job_id(),
308
                              "task_count", import_tasks.count());
309
      }
310
    } else {
311
      int tmp_ret = OB_SUCCESS;
312
      if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
313
        LOG_WARN("failed to roll back", K(ret), K(tmp_ret));
314
      }
315
    }
316
  }
317
  return ret;
318
}
319

320
int ObImportTableJobScheduler::deal_with_import_table_task_(share::ObImportTableJob &job)
321
{
322
  int ret = OB_SUCCESS;
323
  ObImportTableTask task;
324
  bool all_finish = false;
325
  if (OB_FAIL(task_helper_.get_one_not_finish_task_by_initiator(*sql_proxy_, job, all_finish, task))) {
326
    LOG_WARN("failed to get import table task", K(ret), K(job));
327
  } else if (!all_finish) {
328
    if (OB_FAIL(process_import_table_task_(task))) {
329
      LOG_WARN("failed to process import table task", K(ret), K(task));
330
    }
331
  } else if (OB_FAIL(do_after_import_all_table_(job))) {
332
    LOG_WARN("failed to do after import all table", K(ret), K(job));
333
  }
334
  return ret;
335
}
336

337
int ObImportTableJobScheduler::process_import_table_task_(share::ObImportTableTask &task)
338
{
339
  int ret = OB_SUCCESS;
340
  ObImportTableTaskScheduler task_mgr;
341
  if (OB_FAIL(task_mgr.init(*schema_service_, *sql_proxy_, task))) {
342
    LOG_WARN("failed to init task mgr", K(ret));
343
  } else {
344
    task_mgr.process();
345
  }
346
  return ret;
347
}
348

349
int ObImportTableJobScheduler::do_after_import_all_table_(share::ObImportTableJob &job)
350
{
351
  int ret = OB_SUCCESS;
352
  common::ObArray<share::ObImportTableTask> import_tasks;
353
  ObImportTableJobStatus next_status = ObImportTableJobStatus::get_next_status(job.get_status());
354
  if (OB_FAIL(get_import_table_tasks_(job, import_tasks))) {
355
    LOG_WARN("failed to get import table tasks", K(ret), K(job));
356
  } else if (!next_status.is_valid()) {
357
    ret = OB_ERR_UNEXPECTED;
358
    LOG_WARN("invalid import job status", K(ret), K(next_status));
359
  } else if (OB_FAIL(update_statistic_(import_tasks, job))) {
360
    LOG_WARN("failed to update statistic", K(ret));
361
  } else if (OB_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
362
    LOG_WARN("failed to advance to next status", K(ret));
363
  } else {
364
    LOG_INFO("[IMPORT_TABLE]importing table finished", K(import_tasks), K(next_status));
365
    ROOTSERVICE_EVENT_ADD("import_table", "import table task finish",
366
                          "tenant_id", job.get_tenant_id(),
367
                          "job_id", job.get_job_id(),
368
                          "succeed_import_table_count", job.get_finished_table_count(),
369
                          "failed_import_table_count", job.get_failed_table_count());
370
  }
371
  return ret;
372
}
373

374
int ObImportTableJobScheduler::update_statistic_(
375
    common::ObIArray<share::ObImportTableTask> &import_tasks, share::ObImportTableJob &job)
376
{
377
  int ret = OB_SUCCESS;
378
  int64_t succeed_task_cnt = 0;
379
  int64_t failed_task_cnt = 0;
380
  ObImportResult::Comment comment;
381
  int64_t pos = 0;
382
  ARRAY_FOREACH(import_tasks, i) {
383
    const ObImportTableTask &task = import_tasks.at(i);
384
    if (task.get_result().is_succeed()) {
385
      succeed_task_cnt++;
386
    } else {
387
      failed_task_cnt++;
388
    }
389
  }
390
  ObImportResult result;
391

392
  if (OB_FAIL(databuff_printf(comment.ptr(), comment.capacity(), pos,
393
    "import succeed table count: %ld, failed table count: %ld", succeed_task_cnt, failed_task_cnt))) {
394
    if (OB_SIZE_OVERFLOW == ret) {
395
      ret = OB_SUCCESS;
396
    } else {
397
      LOG_WARN("failed to databuff_printf", K(ret));
398
    }
399
  }
400

401
  result.set_result(true, comment);
402
  job.set_result(result);
403
  job.set_finished_table_count(succeed_task_cnt);
404
  job.set_failed_table_count(failed_task_cnt);
405

406
  if (FAILEDx(job_helper_.report_statistics(*sql_proxy_, job))) {
407
    LOG_WARN("failed to report statistics", K(ret));
408
  }
409
  return ret;
410
}
411

412
int ObImportTableJobScheduler::canceling_(share::ObImportTableJob &job)
413
{
414
  int ret = OB_SUCCESS;
415
  LOG_INFO("[IMPORT_TABLE]cancel import table job", K(job));
416
  ObArray<share::ObImportTableTask> import_tasks;
417
  if (OB_FAIL(get_import_table_tasks_(job, import_tasks))) {
418
    if (OB_ENTRY_NOT_EXIST == ret) {
419
      ret = OB_SUCCESS;
420
    } else {
421
      LOG_WARN("failed to get import table task", K(ret));
422
    }
423
  } else {
424
    ObImportTableTaskStatus next_status(ObImportTableTaskStatus::FINISH);
425
    ARRAY_FOREACH(import_tasks, i) {
426
      ObImportTableTask &task = import_tasks.at(i);
427
      obrpc::ObAbortRedefTableArg arg;
428
      arg.task_id_ = task.get_task_id();
429
      arg.tenant_id_ = task.get_tenant_id();
430
      bool is_exist = false;
431
      if (task.get_status().is_finish()) {
432
      } else if (OB_FAIL(check_import_ddl_task_exist_(task, is_exist))) {
433
        LOG_WARN("failed to check import ddl task", K(ret));
434
      } else if (is_exist && OB_FAIL(ObDDLServerClient::abort_redef_table(arg))) {
435
        LOG_WARN("failed to abort redef table", K(ret), K(arg));
436
      } else {
437
        LOG_INFO("[IMPORT_TABLE]cancel import table task", K(arg));
438
        share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
439
        ObImportResult result;
440
        if (OB_FAIL(result.set_result(OB_CANCELED, trace_id, GCONF.self_addr_))) {
441
          LOG_WARN("failed to set result", K(ret));
442
        } else if (OB_FALSE_IT(task.set_result(result))) {
443
        } else if (OB_FAIL(task_helper_.advance_status(*sql_proxy_, task, next_status))) {
444
          LOG_WARN("failed to cancel import task", K(ret), K(task));
445
        } else {
446
          LOG_INFO("[IMPORT_TABLE]succeed cancel import table task", K(arg));
447
        }
448
      }
449
    }
450
  }
451

452
  if (OB_SUCC(ret)) {
453
    share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
454
    ObImportResult result;
455
    ObImportTableJobStatus next_status(ObImportTableJobStatus::IMPORT_FINISH);
456
    job.set_end_ts(ObTimeUtility::current_time());
457
    if (OB_FAIL(result.set_result(OB_CANCELED, trace_id, GCONF.self_addr_))) {
458
      LOG_WARN("failed to set result", K(ret));
459
    } else if (OB_FALSE_IT(job.set_result(result))) {
460
    } else if (OB_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
461
      LOG_WARN("failed to advance status", K(ret));
462
    } else {
463
      LOG_INFO("[IMPORT_TABLE]succeed to cancel import table job", K(job));
464
      ROOTSERVICE_EVENT_ADD("import_table", "cancel import table task",
465
                      "tenant_id", job.get_tenant_id(),
466
                      "job_id", job.get_job_id());
467
    }
468
  }
469
  return ret;
470
}
471

472
int ObImportTableJobScheduler::check_import_ddl_task_exist_(const share::ObImportTableTask &task, bool &is_exist)
473
{
474
  int ret = OB_SUCCESS;
475
  is_exist = false;
476
  const uint64_t tenant_id = task.get_tenant_id();
477
  const int64_t task_id = task.get_task_id();
478
  int64_t unused_user_msg_len = 0;
479
  ObDDLErrorMessageTableOperator::ObBuildDDLErrorMessage error_message;
480
  if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy_, tenant_id, task_id, is_exist))) {
481
    LOG_WARN("failed to check task id exist", K(ret), K(tenant_id), K(task_id));
482
  } else if (is_exist) {
483
  } else if (OB_FAIL(ObDDLErrorMessageTableOperator::get_ddl_error_message(tenant_id,
484
                                                                      task_id,
485
                                                                      -1 /* target_object_id */,
486
                                                                      ObAddr()/*unused addr*/,
487
                                                                      false /* is_ddl_retry_task */,
488
                                                                      *sql_proxy_,
489
                                                                      error_message,
490
                                                                      unused_user_msg_len))) {
491
    if (OB_ENTRY_NOT_EXIST == ret) {
492
      ret = OB_SUCCESS;
493
    } else {
494
      LOG_WARN("failed to load ddl user error", K(ret), K(tenant_id), K(task_id));
495
    }
496
  } else {
497
    is_exist = true;
498
  }
499
  return ret;
500
}
501

502
int ObImportTableJobScheduler::persist_import_table_task_(
503
    common::ObMySQLTransaction &trans, const share::ObImportTableTask &task)
504
{
505
  int ret = OB_SUCCESS;
506
  if (OB_FAIL(task_helper_.insert_import_table_task(trans, task))) {
507
    LOG_WARN("failed to get import table job", K(ret), K(task));
508
  } else {
509
    LOG_INFO("succeed to persist import table task", K(task));
510
  }
511
  return ret;
512
}
513

514
int ObImportTableJobScheduler::get_import_table_tasks_(
515
    const share::ObImportTableJob &job, common::ObIArray<share::ObImportTableTask> &import_tasks)
516
{
517
  int ret = OB_SUCCESS;
518
  if (OB_FAIL(task_helper_.get_all_import_table_tasks_by_initiator(*sql_proxy_, job, import_tasks))) {
519
    LOG_WARN("failed to get import table task", K(ret), K(job));
520
  }
521
  return ret;
522
}
523

524
int ObImportTableJobScheduler::advance_status_(
525
    common::ObISQLClient &sql_proxy, const share::ObImportTableJob &job, const share::ObImportTableJobStatus &next_status)
526
{
527
  int ret = OB_SUCCESS;
528
  if (OB_FAIL(job_helper_.advance_status(sql_proxy, job, next_status))) {
529
    LOG_WARN("failed to advance status", K(ret), K(job), K(next_status));
530
  } else {
531
    wakeup_();
532
  }
533
  return ret;
534
}
535

536
int ObImportTableTaskScheduler::init(share::schema::ObMultiVersionSchemaService &schema_service,
537
    common::ObMySQLProxy &sql_proxy, share::ObImportTableTask &task)
538
{
539
  int ret = OB_SUCCESS;
540
  if (IS_INIT) {
541
    ret = OB_INIT_TWICE;
542
    LOG_WARN("ObImportTableTaskScheduler init twice", K(ret));
543
  } else if (OB_FAIL(helper_.init(task.get_tenant_id()))) {
544
    LOG_WARN("failed to init recover table persist helper", K(ret));
545
  } else {
546
    schema_service_ = &schema_service;
547
    sql_proxy_ = &sql_proxy;
548
    import_task_ = &task;
549
    is_inited_ = true;
550
  }
551
  return ret;
552
}
553

554
void ObImportTableTaskScheduler::wakeup_() {
555
  ObRestoreService *restore_service = nullptr;
556
  if (OB_ISNULL(restore_service = MTL(ObRestoreService *))) {
557
    LOG_ERROR_RET(OB_ERR_UNEXPECTED, "restore service must not be null");
558
  } else {
559
    restore_service->wakeup();
560
  }
561
}
562

563
void ObImportTableTaskScheduler::reset()
564
{
565
  is_inited_ = false;
566
  schema_service_ = nullptr;
567
  sql_proxy_ = nullptr;
568
  import_task_ = nullptr;
569
}
570

571
int ObImportTableTaskScheduler::process()
572
{
573
  int ret = OB_SUCCESS;
574
  LOG_INFO("ready process import table task", KPC_(import_task));
575
  if (IS_NOT_INIT) {
576
    ret = OB_NOT_INIT;
577
    LOG_WARN("ObIImportTableTaskMgr not inited", K(ret));
578
  } else {
579
    const ObImportTableTaskStatus &status = import_task_->get_status();
580
    switch(status) {
581
      case ObImportTableTaskStatus::INIT: {
582
        if (OB_FAIL(init_())) {
583
          LOG_WARN("failed to do init work", K(ret), KPC_(import_task));
584
        }
585
        break;
586
      }
587
      case ObImportTableTaskStatus::DOING: {
588
        if (OB_FAIL(doing_())) {
589
          LOG_WARN("failed to do doing work", K(ret), KPC_(import_task));
590
        }
591
        break;
592
      }
593
      case ObImportTableTaskStatus::FINISH: {
594
        break; // do nothing
595
      }
596
      default: {
597
        ret = OB_ERR_SYS;
598
        LOG_WARN("invalid recover task status", K(ret));
599
      }
600
    }
601
  }
602
  return ret;
603
}
604

605
int ObImportTableTaskScheduler::init_()
606
{
607
  int ret = OB_SUCCESS;
608
  if (OB_FAIL(gen_import_ddl_task_())) {
609
    LOG_WARN("failed to generate import ddl task", K(ret), KPC_(import_task));
610
  }
611

612
  int tmp_ret = OB_SUCCESS;
613
  if (OB_TMP_FAIL(try_advance_status_(ret))) {
614
    LOG_WARN("failed to advance status", K(tmp_ret), K(ret));
615
    ret = OB_SUCC(ret) ? tmp_ret : ret;
616
  }
617
  return ret;
618
}
619

620
int ObImportTableTaskScheduler::doing_()
621
{
622
  int ret = OB_SUCCESS;
623
  bool is_finish = false;
624
  if (OB_FAIL(wait_import_ddl_task_finish_(is_finish))) {
625
    LOG_WARN("failed to do doing work", K(ret), KPC_(import_task));
626
  } else if (!is_finish) {
627
  } else if (OB_FAIL(try_advance_status_(ret))) {
628
    LOG_WARN("failed to advance status", K(ret));
629
  }
630
  return ret;
631
}
632

633
int ObImportTableTaskScheduler::try_advance_status_(const int err_code)
634
{
635
  int ret = OB_SUCCESS;
636
  if (OB_FAIL(err_code) && ObImportTableUtil::can_retrieable_err(err_code)) { // do nothing
637
  } else {
638

639
    share::ObImportTableTaskStatus next_status = import_task_->get_status().get_next_status(err_code);
640
    if (import_task_->get_result().is_succeed()) { // avoid to cover comment
641
      share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
642
      ObImportResult result;
643
      if (OB_FAIL(result.set_result(err_code, trace_id, GCONF.self_addr_))) {
644
        LOG_WARN("failed to set result", K(ret));
645
      } else if (OB_FALSE_IT(import_task_->set_result(result))) {
646
      }
647
    }
648
    if (FAILEDx(helper_.advance_status(*sql_proxy_, *import_task_, next_status))) {
649
      LOG_WARN("failed to advance status", K(ret), KPC_(import_task), K(next_status));
650
    } else {
651
      wakeup_();
652
    }
653
  }
654
  return ret;
655
}
656

657
int ObImportTableTaskScheduler::gen_import_ddl_task_()
658
{
659
  int ret = OB_SUCCESS;
660
  obrpc::ObRecoverRestoreTableDDLArg arg;
661
  bool is_exist = false;
662
  LOG_INFO("[IMPORT_TABLE]start to create import table", KPC_(import_task));
663
  if (OB_FAIL(check_import_ddl_task_exist_(is_exist))) {
664
    LOG_WARN("failed to check import ddl task", K(ret));
665
  } else if (is_exist) {
666
    LOG_INFO("[IMPORT_TABLE]import ddl task exist, skip it", KPC_(import_task), K(arg));
667
  } else if (OB_FAIL(construct_import_table_arg_(arg))) {
668
    LOG_WARN("failed to construct import table arg", K(ret));
669
  } else if (OB_FAIL(ObDDLServerClient::execute_recover_restore_table(arg))) {
670
    if (OB_ENTRY_EXIST == ret) {
671
      // old and new leader both execute import ddl at the same time.
672
      ret = OB_EAGAIN;
673
      LOG_WARN("import ddl task exist, try again", K(ret), K(arg));
674
    } else {
675
      LOG_WARN("fail to start import table", K(ret), K(arg));
676
    }
677
  } else {
678
    LOG_INFO("[IMPORT_TABLE]succeed execute_recover_restore_table", KPC_(import_task), K(arg));
679
  }
680
  return ret;
681
}
682

683
int ObImportTableTaskScheduler::check_import_ddl_task_exist_(bool &is_exist)
684
{
685
  int ret = OB_SUCCESS;
686
  is_exist = false;
687
  const uint64_t tenant_id = import_task_->get_tenant_id();
688
  const int64_t task_id = import_task_->get_task_id();
689
  int64_t unused_user_msg_len = 0;
690
  ObDDLErrorMessageTableOperator::ObBuildDDLErrorMessage error_message;
691
  if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy_, tenant_id, task_id, is_exist))) {
692
    LOG_WARN("failed to check task id exist", K(ret), K(tenant_id), K(task_id));
693
  } else if (is_exist) {
694
  } else if (OB_FAIL(ObDDLErrorMessageTableOperator::get_ddl_error_message(tenant_id,
695
                                                                      task_id,
696
                                                                      -1 /* target_object_id */,
697
                                                                      ObAddr()/*unused addr*/,
698
                                                                      false /* is_ddl_retry_task */,
699
                                                                      *sql_proxy_,
700
                                                                      error_message,
701
                                                                      unused_user_msg_len))) {
702
    if (OB_ENTRY_NOT_EXIST == ret) {
703
      ret = OB_SUCCESS;
704
    } else {
705
      LOG_WARN("failed to load ddl user error", K(ret), K(tenant_id), K(task_id));
706
    }
707
  } else {
708
    is_exist = true;
709
  }
710
  return ret;
711
}
712

713
int ObImportTableTaskScheduler::construct_import_table_arg_(obrpc::ObRecoverRestoreTableDDLArg &arg)
714
{
715
  int ret = OB_SUCCESS;
716
  ObSchemaGetterGuard src_tenant_guard;
717
  const ObTableSchema *src_table_schema = nullptr;
718
  ObFixedLengthString<common::OB_MAX_TIMESTAMP_TZ_LENGTH> time_zone;
719
  MTL_SWITCH(OB_SYS_TENANT_ID) {
720
    if (OB_FAIL(schema_service_->get_tenant_schema_guard(import_task_->get_src_tenant_id(), src_tenant_guard))) {
721
      LOG_WARN("failed to get tenant schema guard", K(ret), KPC_(import_task));
722
    } else if (OB_FAIL(src_tenant_guard.get_table_schema(import_task_->get_src_tenant_id(),
723
                                                        import_task_->get_src_database(),
724
                                                        import_task_->get_src_table(),
725
                                                        false,
726
                                                        src_table_schema))) {
727
      LOG_WARN("failed to get table schema", K(ret), KPC_(import_task));
728
    } else if (OB_ISNULL(src_table_schema)) {
729
      ret = OB_TABLE_NOT_EXIST;
730
      LOG_WARN("src table not exist", K(ret), KPC_(import_task));
731
    }
732
  }
733
  if (FAILEDx(construct_import_table_schema_(*src_table_schema, arg.target_schema_))) {
734
    LOG_WARN("failed to construct import table schema", K(ret));
735
  } else {
736
    arg.src_tenant_id_ = src_table_schema->get_tenant_id();
737
    arg.src_table_id_ = src_table_schema->get_table_id();
738
    arg.ddl_task_id_ = import_task_->get_task_id();
739
    arg.exec_tenant_id_ = import_task_->get_tenant_id();
740
    const ObSysVarSchema *data_format_schema = nullptr;
741
    const ObSysVarSchema *nls_timestamp_format = nullptr;
742
    const ObSysVarSchema *nls_timestamp_tz_format = nullptr;
743
    if (OB_FAIL(share::ObBackupUtils::get_tenant_sys_time_zone_wrap(import_task_->get_tenant_id(),
744
                                                                    time_zone,
745
                                                                    arg.tz_info_wrap_))) {
746
      LOG_WARN("failed to get tenant sys timezoen wrap", K(ret), KPC_(import_task));
747
    } else if (OB_FAIL(src_tenant_guard.get_tenant_system_variable(import_task_->get_src_tenant_id(),
748
                                                                    share::SYS_VAR_NLS_DATE_FORMAT,
749
                                                                    data_format_schema))) {
750
      LOG_WARN("fail to get tenant system variable", K(ret));
751
    } else if (OB_FAIL(src_tenant_guard.get_tenant_system_variable(import_task_->get_src_tenant_id(),
752
                                                                    share::SYS_VAR_NLS_TIMESTAMP_FORMAT,
753
                                                                    nls_timestamp_format))) {
754
      LOG_WARN("fail to get tenant system variable", K(ret));
755
    } else if (OB_FAIL(src_tenant_guard.get_tenant_system_variable(import_task_->get_src_tenant_id(),
756
                                                                    share::SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT,
757
                                                                    nls_timestamp_tz_format))) {
758
      LOG_WARN("fail to get tenant system variable", K(ret));
759
    } else if (OB_ISNULL(data_format_schema) || OB_ISNULL(nls_timestamp_format) || OB_ISNULL(nls_timestamp_tz_format)) {
760
      ret = OB_ERR_UNEXPECTED;
761
      LOG_WARN("var schema must not be null", K(ret), KP(data_format_schema), KP(nls_timestamp_format), KP(nls_timestamp_tz_format));
762
    } else if (OB_FAIL(ob_write_string(arg.allocator_, data_format_schema->get_value(), arg.nls_formats_[ObNLSFormatEnum::NLS_DATE]))) {
763
      LOG_WARN("deep copy failed", K(ret), K(data_format_schema->get_value()));
764
    } else if (OB_FAIL(ob_write_string(arg.allocator_, nls_timestamp_format->get_value(), arg.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP]))) {
765
      LOG_WARN("deep copy failed", K(ret), K(nls_timestamp_format->get_value()));
766
    } else if (OB_FAIL(ob_write_string(arg.allocator_, nls_timestamp_tz_format->get_value(), arg.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP_TZ]))) {
767
      LOG_WARN("deep copy failed", K(ret), K(nls_timestamp_tz_format->get_value()));
768
    } else {
769
      arg.tz_info_ =  arg.tz_info_wrap_.get_tz_info_offset();
770
    }
771
  }
772
  return ret;
773
}
774

775
int ObImportTableTaskScheduler::construct_import_table_schema_(
776
    const share::schema::ObTableSchema &src_table_schema, share::schema::ObTableSchema &target_table_schema)
777
{
778
  int ret = OB_SUCCESS;
779
  ObSchemaGetterGuard target_tenant_guard;
780
  if (OB_FAIL(schema_service_->get_tenant_schema_guard(import_task_->get_tenant_id(), target_tenant_guard))) {
781
    LOG_WARN("failed to get tenant schema guard", K(ret), KPC_(import_task));
782
  } else if (OB_FAIL(target_table_schema.assign(src_table_schema))) {
783
    LOG_WARN("failed to assign target table schema", K(ret));
784
  } else {
785
    target_table_schema.set_tenant_id(import_task_->get_tenant_id());
786
    target_table_schema.set_table_name(import_task_->get_target_table());
787
    target_table_schema.set_table_id(OB_INVALID_ID);
788

789
    target_table_schema.set_data_table_id(0);
790
    target_table_schema.clear_constraint();
791
    target_table_schema.clear_foreign_key_infos();
792
    target_table_schema.set_table_state_flag(ObTableStateFlag::TABLE_STATE_NORMAL);
793
    target_table_schema.set_mlog_tid(OB_INVALID_ID); // mlog (if exists) will be discarded
794

795
    uint64_t database_id = OB_INVALID_ID;
796
    if (OB_FAIL(target_tenant_guard.get_database_id(import_task_->get_tenant_id(),
797
                                                    import_task_->get_target_database(),
798
                                                    database_id))) {
799
      LOG_WARN("failed to get database id", K(ret), KPC_(import_task));
800
    } else if (OB_INVALID_ID == database_id) {
801
      ret = OB_ERR_BAD_DATABASE;
802
      LOG_WARN("invalid target database name", K(ret), K(database_id), KPC_(import_task));
803
    } else {
804
      target_table_schema.set_database_id(database_id);
805
    }
806

807
    uint64_t table_group_id = OB_INVALID_ID;
808
    if (import_task_->get_target_tablegroup().empty()) {
809
    } else if (FAILEDx(target_tenant_guard.get_tablegroup_id(import_task_->get_tenant_id(),
810
                                                             import_task_->get_target_tablegroup(),
811
                                                             table_group_id))) {
812
      LOG_WARN("failed to get table group id", K(ret), KPC_(import_task));
813
    } else if (OB_INVALID_ID == table_group_id) {
814
      ret = OB_TABLEGROUP_NOT_EXIST;
815
      LOG_WARN("invalid target tablegroup id", K(ret), K(table_group_id), KPC_(import_task));
816
    } else {
817
      target_table_schema.set_tablegroup_id(table_group_id);
818
    }
819

820
    const schema::ObTablespaceSchema *schema = nullptr;
821
    if (import_task_->get_target_tablespace().empty()) {
822
    } else if (FAILEDx(target_tenant_guard.get_tablespace_schema_with_name(import_task_->get_tenant_id(),
823
                                                                           import_task_->get_target_tablespace(),
824
                                                                           schema))) {
825
      LOG_WARN("failed to get tablespace schema", K(ret), KPC_(import_task));
826
    } else if (OB_ISNULL(schema)) {
827
      ret = OB_TABLESPACE_NOT_EXIST;
828
      LOG_WARN("tablespace must not be null", K(ret), KPC_(import_task));
829
    } else if (OB_FAIL(target_table_schema.set_encryption_str(schema->get_encryption_name()))) {
830
      LOG_WARN("failed to set encryption str", K(ret));
831
    } else if (OB_FAIL(target_table_schema.set_encrypt_key(schema->get_encrypt_key()))) {
832
      LOG_WARN("failed to set encrypt key", K(ret));
833
    } else {
834
      target_table_schema.set_master_key_id(schema->get_master_key_id());
835
      target_table_schema.set_tablespace_id(schema->get_tablespace_id());
836
    }
837
  }
838
  return ret;
839
}
840

841
int ObImportTableTaskScheduler::wait_import_ddl_task_finish_(bool &is_finish)
842
{
843
  int ret = OB_SUCCESS;
844
  int64_t unused_user_msg_len = 0;
845
  ObDDLErrorMessageTableOperator::ObBuildDDLErrorMessage error_message;
846
  uint64_t tenant_id = import_task_->get_tenant_id();
847
  int64_t task_id = import_task_->get_task_id();
848
  is_finish = false;
849
  if (OB_FAIL(ObDDLErrorMessageTableOperator::get_ddl_error_message(tenant_id,
850
                                                                    task_id,
851
                                                                    -1 /* target_object_id */,
852
                                                                    ObAddr()/*unused addr*/,
853
                                                                    false /* is_ddl_retry_task */,
854
                                                                    *sql_proxy_,
855
                                                                    error_message,
856
                                                                    unused_user_msg_len))) {
857
    if (OB_ENTRY_NOT_EXIST == ret) {
858
      ret = OB_SUCCESS;
859
      if(REACH_TIME_INTERVAL(120 * 1000 * 1000)) {
860
        LOG_WARN("[IMPORT_TABLE]import ddl task does not finish, retry again", K(tenant_id), K(task_id));
861
      }
862
    } else {
863
      LOG_WARN("failed to load ddl user error", K(ret), K(tenant_id), K(task_id));
864
    }
865
  } else if (OB_SUCCESS != error_message.ret_code_) {
866
    ObImportResult result;
867
    if (OB_FAIL(result.set_result(false, error_message.user_message_))) {
868
      LOG_WARN("failed to set result", K(ret), K(error_message));
869
    } else {
870
      import_task_->set_result(result);
871
      is_finish = true;
872
      LOG_INFO("[IMPORT_TABLE]import table failed", KPC_(import_task), K(error_message));
873
    }
874
  } else if (OB_FAIL(statistics_import_results_())) {
875
    LOG_WARN("failed to statistics import result", K(ret));
876
  } else if (OB_FAIL(helper_.report_import_task_statistics(*sql_proxy_, *import_task_))) {
877
    LOG_WARN("failed to report import task statistics", K(ret), KPC_(import_task));
878
  } else {
879
    is_finish = true;
880
    LOG_INFO("[IMPORT_TABLE]import table succeed", KPC_(import_task), K(error_message));
881
  }
882
  return ret;
883
}
884

885
int ObImportTableTaskScheduler::statistics_import_results_()
886
{
887
  int ret = OB_SUCCESS;
888
  int tmp_ret = OB_SUCCESS;
889
  ObSchemaGetterGuard guard;
890
  const ObTableSchema * table_schema = nullptr;
891
  const int64_t tenant_id = import_task_->get_tenant_id();
892
  const ObString &db_name = import_task_->get_target_database();
893
  const ObString &table_name = import_task_->get_target_table();
894
  if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, guard))) {
895
    LOG_WARN("failed get tenant schema guard", K(ret), K(tenant_id));
896
  } else if (OB_FAIL(guard.get_table_schema(tenant_id, db_name, table_name, false/*no index*/, table_schema))) {
897
    LOG_WARN("failed to get table schema", K(ret), K(tenant_id), K(db_name), K(table_name));
898
  } else if (OB_ISNULL(table_schema)) {
899
    ret = OB_TABLE_NOT_EXIST;
900
    LOG_WARN("table is not exist", K(tenant_id), K(db_name), K(table_name));
901
    ObImportResult::Comment comment;
902
    if (OB_TMP_FAIL(databuff_printf(comment.ptr(), comment.capacity(),
903
      "table %.*s has been deleted by user", table_name.length(), table_name.ptr()))) {
904
      LOG_WARN("failed to databuff printf", K(ret), K(tmp_ret));
905
    } else {
906
      import_task_->get_result().set_result(true, comment);
907
    }
908
  } else {
909
    import_task_->set_completion_ts(ObTimeUtility::current_time());
910
    import_task_->set_imported_index_count(table_schema->get_simple_index_infos().count());
911
    import_task_->set_failed_index_count(import_task_->get_total_index_count() - import_task_->get_imported_index_count());
912
    import_task_->set_imported_constraint_count(table_schema->get_constraint_count());
913
    import_task_->set_failed_constraint_count(import_task_->get_total_constraint_count() - import_task_->get_imported_constraint_count());
914
    import_task_->set_imported_ref_constraint_count(table_schema->get_foreign_key_infos().count());
915
    import_task_->set_failed_ref_constraint_count(import_task_->get_total_ref_constraint_count() - import_task_->get_imported_ref_constraint_count());
916
    import_task_->set_imported_trigger_count(table_schema->get_trigger_list().count());
917
    import_task_->set_failed_trigger_count(import_task_->get_total_trigger_count() - import_task_->get_imported_trigger_count());
918
  }
919
  return ret;
920
}
921

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.