oceanbase
920 строк · 38.4 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12#define USING_LOG_PREFIX RS
13
14#include "ob_import_table_job_scheduler.h"
15#include "ob_recover_table_initiator.h"
16#include "lib/mysqlclient/ob_mysql_transaction.h"
17#include "storage/ddl/ob_ddl_server_client.h"
18#include "share/backup/ob_backup_struct.h"
19#include "rootserver/ddl_task/ob_ddl_task.h"
20#include "sql/engine/cmd/ob_ddl_executor_util.h"
21#include "share/ob_ddl_error_message_table_operator.h"
22#include "rootserver/ob_rs_event_history_table_operator.h"
23#include "share/restore/ob_import_util.h"
24#include "rootserver/restore/ob_restore_service.h"
25
26using namespace oceanbase;
27using namespace rootserver;
28using namespace common;
29using namespace share;
30
31ObImportTableJobScheduler::ObImportTableJobScheduler()
32: is_inited_(false),
33tenant_id_(OB_INVALID_TENANT_ID),
34schema_service_(nullptr),
35sql_proxy_(nullptr),
36job_helper_(),
37task_helper_()
38{}
39
40int ObImportTableJobScheduler::init(
41share::schema::ObMultiVersionSchemaService &schema_service,
42common::ObMySQLProxy &sql_proxy)
43{
44int ret = OB_SUCCESS;
45const uint64_t tenant_id = gen_user_tenant_id(MTL_ID());
46if (IS_INIT) {
47ret = OB_INIT_TWICE;
48LOG_WARN("ObImportTableJobScheduler init twice", K(ret));
49} else if (OB_FAIL(job_helper_.init(tenant_id))) {
50LOG_WARN("failed to init table op", K(ret), K(tenant_id));
51} else if (OB_FAIL(task_helper_.init(tenant_id))) {
52LOG_WARN("failed to init table op", K(ret), K(tenant_id));
53} else {
54schema_service_ = &schema_service;
55sql_proxy_ = &sql_proxy;
56tenant_id_ = tenant_id;
57is_inited_ = true;
58}
59return ret;
60}
61
62void ObImportTableJobScheduler::wakeup_()
63{
64ObRestoreService *restore_service = nullptr;
65if (OB_ISNULL(restore_service = MTL(ObRestoreService *))) {
66LOG_ERROR_RET(OB_ERR_UNEXPECTED, "restore service must not be null");
67} else {
68restore_service->wakeup();
69}
70}
71
72void ObImportTableJobScheduler::do_work()
73{
74int ret = OB_SUCCESS;
75uint64_t data_version = 0;
76ObArray<share::ObImportTableJob> jobs;
77if (IS_NOT_INIT) {
78ret = OB_NOT_INIT;
79LOG_WARN("not init ObImportTableJobScheduler", K(ret));
80} else if (is_sys_tenant(tenant_id_)) {
81// no import table job in sys tenant
82} else if (OB_FAIL(check_compatible_())) {
83LOG_WARN("check compatible failed", K(ret));
84} else if (OB_FAIL(job_helper_.get_all_import_table_jobs(*sql_proxy_, jobs))) {
85LOG_WARN("failed to get recover all recover table job", K(ret));
86} else {
87ObCurTraceId::init(GCTX.self_addr());
88ARRAY_FOREACH(jobs, i) {
89ObImportTableJob &job = jobs.at(i);
90if (!job.is_valid()) {
91ret = OB_ERR_UNEXPECTED;
92LOG_WARN("recover table job is not valid", K(ret), K(job));
93} else if (is_user_tenant(job.get_tenant_id())) {
94process_(job);
95} else {
96ret = OB_ERR_UNEXPECTED;
97LOG_WARN("invalid tenant", K(ret), K(job));
98}
99}
100}
101}
102
103int ObImportTableJobScheduler::check_compatible_() const
104{
105int ret = OB_SUCCESS;
106uint64_t data_version = 0;
107if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, data_version))) {
108LOG_WARN("fail to get data version", K(ret), K_(tenant_id));
109} else if (data_version < DATA_VERSION_4_2_1_0) {
110ret = OB_OP_NOT_ALLOW;
111LOG_WARN("min data version is smaller than v4.2.1", K(ret), K_(tenant_id), K(data_version));
112} else if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(tenant_id_), data_version))) {
113LOG_WARN("fail to get data version", K(ret), "tenant_id", gen_meta_tenant_id(tenant_id_));
114} else if (data_version < DATA_VERSION_4_2_1_0) {
115ret = OB_OP_NOT_ALLOW;
116LOG_WARN("min data version is smaller than v4.2.1", K(ret), K_(tenant_id), K(data_version));
117}
118return ret;
119}
120
121int ObImportTableJobScheduler::process_(share::ObImportTableJob &job)
122{
123int ret = OB_SUCCESS;
124bool is_dropped = false;
125if (OB_FAIL(schema_service_->check_if_tenant_has_been_dropped(job.get_src_tenant_id(), is_dropped))) {
126LOG_WARN("failed to check if tenant has been dropped", K(ret), "tenant_id", job.get_src_tenant_id());
127} else if (!is_dropped && OB_FAIL(wait_src_tenant_schema_refreshed_(job.get_src_tenant_id()))) {
128if (OB_SCHEMA_EAGAIN != ret) {
129LOG_WARN("failed to wait src tenant schema refreshed", K(ret), K(job));
130}
131} else {
132switch(job.get_status()) {
133case ObImportTableJobStatus::INIT: {
134if (OB_FAIL(gen_import_table_task_(job))) {
135LOG_WARN("failed to gen import table task", K(ret), K(job));
136}
137break;
138}
139case ObImportTableJobStatus::IMPORT_TABLE: {
140if (OB_FAIL(deal_with_import_table_task_(job))) {
141LOG_WARN("failed to deal with import table task", K(ret), K(job));
142}
143break;
144}
145case ObImportTableJobStatus::RECONSTRUCT_REF_CONSTRAINT: {
146if (OB_FAIL(reconstruct_ref_constraint_(job))) {
147LOG_WARN("failed to deal with reconstrcut ref constraint", K(ret));
148}
149break;
150}
151case ObImportTableJobStatus::CANCELING: {
152if (OB_FAIL(canceling_(job))) {
153LOG_WARN("failed to cancel", K(ret), K(job));
154}
155break;
156}
157case ObImportTableJobStatus::IMPORT_FINISH: {
158if (OB_FAIL(finish_(job))) {
159LOG_WARN("failed to cancel", K(ret), K(job));
160}
161break;
162}
163default: {
164ret = OB_ERR_SYS;
165LOG_WARN("invalid import job status", K(ret));
166break;
167}
168}
169}
170return ret;
171}
172
173int ObImportTableJobScheduler::wait_src_tenant_schema_refreshed_(const uint64_t tenant_id)
174{
175// Only if the aux tenant schema refreshed to newest, then we can confirm src table exist or not.
176int ret = OB_SUCCESS;
177int64_t max_schema_version = OB_INVALID_VERSION;
178ObSchemaService *sql_schema_service = nullptr;
179ObRefreshSchemaStatus status;
180status.tenant_id_ = tenant_id;
181MTL_SWITCH (OB_SYS_TENANT_ID) {
182if (OB_ISNULL(schema_service_)) {
183ret = OB_ERR_UNEXPECTED;
184LOG_WARN("schema_service_ is null", K(ret));
185} else if (OB_ISNULL(sql_schema_service = schema_service_->get_schema_service())) {
186ret = OB_ERR_UNEXPECTED;
187LOG_WARN("sql_schema_service is null", K(ret));
188} else if (OB_FAIL(sql_schema_service->fetch_schema_version(status, *sql_proxy_, max_schema_version))) {
189LOG_WARN("fail to fetch max schema version", K(ret), K(tenant_id), K(status));
190} else {
191int64_t refreshed_schema_version = 0;
192if (OB_FAIL(schema_service_->get_tenant_refreshed_schema_version(
193tenant_id, refreshed_schema_version))) {
194LOG_WARN("get refreshed schema version failed", K(ret), K(tenant_id));
195} else if (!ObSchemaService::is_formal_version(refreshed_schema_version) || refreshed_schema_version < max_schema_version) {
196ret = OB_SCHEMA_EAGAIN;
197if (REACH_TIME_INTERVAL(1000L * 1000L)) {
198LOG_WARN("tenant schema not refreshed to the newest version", K(ret), K(tenant_id), K(max_schema_version), K(refreshed_schema_version));
199}
200}
201}
202}
203
204return ret;
205}
206
207int ObImportTableJobScheduler::reconstruct_ref_constraint_(share::ObImportTableJob &job)
208{
209int ret = OB_SUCCESS;
210ObArray<share::ObImportTableTask> import_tasks;
211ObImportTableJobStatus next_status = ObImportTableJobStatus::get_next_status(job.get_status());
212LOG_INFO("[IMPORT_TABLE]start reconstruct ref constraint", K(job));
213if (OB_FAIL(get_import_table_tasks_(job, import_tasks))) {
214LOG_WARN("failed to get import table task", K(ret));
215} else if (OB_FALSE_IT(job.set_end_ts(ObTimeUtility::current_time()))) {
216} else if (OB_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
217LOG_WARN("failed to advance status", K(ret), K(job), K(next_status));
218} else {
219LOG_INFO("[IMPORT_TABLE]finish reconstruct ref constraint", K(job));
220ROOTSERVICE_EVENT_ADD("import_table", "reconstruct_ref_constraint",
221"tenant_id", job.get_tenant_id(),
222"job_id", job.get_job_id());
223}
224return ret;
225}
226
227int ObImportTableJobScheduler::finish_(const share::ObImportTableJob &job)
228{
229int ret = OB_SUCCESS;
230const uint64_t tenant_id = job.get_tenant_id();
231const int64_t job_id = job.get_job_id();
232if (OB_FAIL(task_helper_.move_import_task_to_history(*sql_proxy_, tenant_id, job_id))) {
233LOG_WARN("failed to move import task to history", K(ret), K(tenant_id), K(job_id));
234} else if (OB_FAIL(job_helper_.move_import_job_to_history(*sql_proxy_, tenant_id, job_id))) {
235LOG_WARN("failed to move import job to history", K(ret), K(tenant_id), K(job_id));
236} else {
237LOG_INFO("[IMPORT_TABLE]import table job finish", K(job));
238ROOTSERVICE_EVENT_ADD("import_table", "import table finish",
239"tenant_id", job.get_tenant_id(),
240"job_id", job.get_job_id());
241}
242return ret;
243}
244
245int ObImportTableJobScheduler::gen_import_table_task_(share::ObImportTableJob &job)
246{
247int ret = OB_SUCCESS;
248ObImportTableTaskGenerator generator;
249ObArray<oceanbase::share::ObImportTableTask> import_tasks;
250ObMySQLTransaction trans;
251uint64_t meta_tenant_id = gen_meta_tenant_id(job.get_tenant_id());
252DEBUG_SYNC(BEFORE_GENERATE_IMPORT_TABLE_TASK);
253if (OB_FAIL(generator.init(*schema_service_, *sql_proxy_))) {
254LOG_WARN("failed to init import task generator", K(ret));
255} else if (OB_FAIL(generator.gen_import_task(job, import_tasks))) {
256LOG_WARN("failed to gen import table task", K(ret), K(job));
257if (!ObImportTableUtil::can_retrieable_err(ret)) {
258int tmp_ret = OB_SUCCESS;
259ObImportTableJobStatus next_status(ObImportTableJobStatus::IMPORT_FINISH);
260job.set_end_ts(ObTimeUtility::current_time());
261
262if (!job.get_result().is_comment_setted()) {
263share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
264ObImportResult result;
265if (OB_TMP_FAIL(result.set_result(ret, trace_id, GCONF.self_addr_))) {
266LOG_WARN("failed to set result", K(ret));
267} else {
268job.set_result(result);
269}
270}
271
272if (OB_TMP_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
273LOG_WARN("failed to advance status", K(ret));
274}
275}
276} else if (OB_FAIL(trans.start(sql_proxy_, meta_tenant_id))) {
277LOG_WARN("failed to start trans", K(ret), K(meta_tenant_id));
278} else {
279ARRAY_FOREACH(import_tasks, i) {
280const ObImportTableTask &task = import_tasks.at(i);
281if (OB_FAIL(persist_import_table_task_(trans, task))) {
282LOG_WARN("failed to persist import table task", K(ret), K(task));
283} else {
284job.set_total_bytes(job.get_total_bytes() + task.get_total_bytes());
285job.set_total_table_count(job.get_total_table_count() + 1);
286}
287}
288
289ObImportTableJobStatus next_status = ObImportTableJobStatus::get_next_status(job.get_status());
290if (OB_FAIL(ret)) {
291} else if (OB_FAIL(job_helper_.report_import_job_statistics(*sql_proxy_, job))) {
292LOG_WARN("failed to report import job statistics", K(ret));
293} else if (!next_status.is_valid()) {
294ret = OB_ERR_UNEXPECTED;
295LOG_WARN("error import table job status is unexpected", K(ret), K(next_status));
296} else if (OB_FAIL(advance_status_(trans, job, next_status))) {
297LOG_WARN("failed to advance to next status", K(ret));
298}
299
300if (OB_SUCC(ret)) {
301if (OB_FAIL(trans.end(true))) {
302LOG_WARN("failed to commit", K(ret));
303} else {
304LOG_INFO("[IMPORT_TABLE] succeed generate import table task", K(import_tasks), K(next_status));
305ROOTSERVICE_EVENT_ADD("import_table", "generate import table task",
306"tenant_id", job.get_tenant_id(),
307"job_id", job.get_job_id(),
308"task_count", import_tasks.count());
309}
310} else {
311int tmp_ret = OB_SUCCESS;
312if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
313LOG_WARN("failed to roll back", K(ret), K(tmp_ret));
314}
315}
316}
317return ret;
318}
319
320int ObImportTableJobScheduler::deal_with_import_table_task_(share::ObImportTableJob &job)
321{
322int ret = OB_SUCCESS;
323ObImportTableTask task;
324bool all_finish = false;
325if (OB_FAIL(task_helper_.get_one_not_finish_task_by_initiator(*sql_proxy_, job, all_finish, task))) {
326LOG_WARN("failed to get import table task", K(ret), K(job));
327} else if (!all_finish) {
328if (OB_FAIL(process_import_table_task_(task))) {
329LOG_WARN("failed to process import table task", K(ret), K(task));
330}
331} else if (OB_FAIL(do_after_import_all_table_(job))) {
332LOG_WARN("failed to do after import all table", K(ret), K(job));
333}
334return ret;
335}
336
337int ObImportTableJobScheduler::process_import_table_task_(share::ObImportTableTask &task)
338{
339int ret = OB_SUCCESS;
340ObImportTableTaskScheduler task_mgr;
341if (OB_FAIL(task_mgr.init(*schema_service_, *sql_proxy_, task))) {
342LOG_WARN("failed to init task mgr", K(ret));
343} else {
344task_mgr.process();
345}
346return ret;
347}
348
349int ObImportTableJobScheduler::do_after_import_all_table_(share::ObImportTableJob &job)
350{
351int ret = OB_SUCCESS;
352common::ObArray<share::ObImportTableTask> import_tasks;
353ObImportTableJobStatus next_status = ObImportTableJobStatus::get_next_status(job.get_status());
354if (OB_FAIL(get_import_table_tasks_(job, import_tasks))) {
355LOG_WARN("failed to get import table tasks", K(ret), K(job));
356} else if (!next_status.is_valid()) {
357ret = OB_ERR_UNEXPECTED;
358LOG_WARN("invalid import job status", K(ret), K(next_status));
359} else if (OB_FAIL(update_statistic_(import_tasks, job))) {
360LOG_WARN("failed to update statistic", K(ret));
361} else if (OB_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
362LOG_WARN("failed to advance to next status", K(ret));
363} else {
364LOG_INFO("[IMPORT_TABLE]importing table finished", K(import_tasks), K(next_status));
365ROOTSERVICE_EVENT_ADD("import_table", "import table task finish",
366"tenant_id", job.get_tenant_id(),
367"job_id", job.get_job_id(),
368"succeed_import_table_count", job.get_finished_table_count(),
369"failed_import_table_count", job.get_failed_table_count());
370}
371return ret;
372}
373
374int ObImportTableJobScheduler::update_statistic_(
375common::ObIArray<share::ObImportTableTask> &import_tasks, share::ObImportTableJob &job)
376{
377int ret = OB_SUCCESS;
378int64_t succeed_task_cnt = 0;
379int64_t failed_task_cnt = 0;
380ObImportResult::Comment comment;
381int64_t pos = 0;
382ARRAY_FOREACH(import_tasks, i) {
383const ObImportTableTask &task = import_tasks.at(i);
384if (task.get_result().is_succeed()) {
385succeed_task_cnt++;
386} else {
387failed_task_cnt++;
388}
389}
390ObImportResult result;
391
392if (OB_FAIL(databuff_printf(comment.ptr(), comment.capacity(), pos,
393"import succeed table count: %ld, failed table count: %ld", succeed_task_cnt, failed_task_cnt))) {
394if (OB_SIZE_OVERFLOW == ret) {
395ret = OB_SUCCESS;
396} else {
397LOG_WARN("failed to databuff_printf", K(ret));
398}
399}
400
401result.set_result(true, comment);
402job.set_result(result);
403job.set_finished_table_count(succeed_task_cnt);
404job.set_failed_table_count(failed_task_cnt);
405
406if (FAILEDx(job_helper_.report_statistics(*sql_proxy_, job))) {
407LOG_WARN("failed to report statistics", K(ret));
408}
409return ret;
410}
411
412int ObImportTableJobScheduler::canceling_(share::ObImportTableJob &job)
413{
414int ret = OB_SUCCESS;
415LOG_INFO("[IMPORT_TABLE]cancel import table job", K(job));
416ObArray<share::ObImportTableTask> import_tasks;
417if (OB_FAIL(get_import_table_tasks_(job, import_tasks))) {
418if (OB_ENTRY_NOT_EXIST == ret) {
419ret = OB_SUCCESS;
420} else {
421LOG_WARN("failed to get import table task", K(ret));
422}
423} else {
424ObImportTableTaskStatus next_status(ObImportTableTaskStatus::FINISH);
425ARRAY_FOREACH(import_tasks, i) {
426ObImportTableTask &task = import_tasks.at(i);
427obrpc::ObAbortRedefTableArg arg;
428arg.task_id_ = task.get_task_id();
429arg.tenant_id_ = task.get_tenant_id();
430bool is_exist = false;
431if (task.get_status().is_finish()) {
432} else if (OB_FAIL(check_import_ddl_task_exist_(task, is_exist))) {
433LOG_WARN("failed to check import ddl task", K(ret));
434} else if (is_exist && OB_FAIL(ObDDLServerClient::abort_redef_table(arg))) {
435LOG_WARN("failed to abort redef table", K(ret), K(arg));
436} else {
437LOG_INFO("[IMPORT_TABLE]cancel import table task", K(arg));
438share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
439ObImportResult result;
440if (OB_FAIL(result.set_result(OB_CANCELED, trace_id, GCONF.self_addr_))) {
441LOG_WARN("failed to set result", K(ret));
442} else if (OB_FALSE_IT(task.set_result(result))) {
443} else if (OB_FAIL(task_helper_.advance_status(*sql_proxy_, task, next_status))) {
444LOG_WARN("failed to cancel import task", K(ret), K(task));
445} else {
446LOG_INFO("[IMPORT_TABLE]succeed cancel import table task", K(arg));
447}
448}
449}
450}
451
452if (OB_SUCC(ret)) {
453share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
454ObImportResult result;
455ObImportTableJobStatus next_status(ObImportTableJobStatus::IMPORT_FINISH);
456job.set_end_ts(ObTimeUtility::current_time());
457if (OB_FAIL(result.set_result(OB_CANCELED, trace_id, GCONF.self_addr_))) {
458LOG_WARN("failed to set result", K(ret));
459} else if (OB_FALSE_IT(job.set_result(result))) {
460} else if (OB_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
461LOG_WARN("failed to advance status", K(ret));
462} else {
463LOG_INFO("[IMPORT_TABLE]succeed to cancel import table job", K(job));
464ROOTSERVICE_EVENT_ADD("import_table", "cancel import table task",
465"tenant_id", job.get_tenant_id(),
466"job_id", job.get_job_id());
467}
468}
469return ret;
470}
471
472int ObImportTableJobScheduler::check_import_ddl_task_exist_(const share::ObImportTableTask &task, bool &is_exist)
473{
474int ret = OB_SUCCESS;
475is_exist = false;
476const uint64_t tenant_id = task.get_tenant_id();
477const int64_t task_id = task.get_task_id();
478int64_t unused_user_msg_len = 0;
479ObDDLErrorMessageTableOperator::ObBuildDDLErrorMessage error_message;
480if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy_, tenant_id, task_id, is_exist))) {
481LOG_WARN("failed to check task id exist", K(ret), K(tenant_id), K(task_id));
482} else if (is_exist) {
483} else if (OB_FAIL(ObDDLErrorMessageTableOperator::get_ddl_error_message(tenant_id,
484task_id,
485-1 /* target_object_id */,
486ObAddr()/*unused addr*/,
487false /* is_ddl_retry_task */,
488*sql_proxy_,
489error_message,
490unused_user_msg_len))) {
491if (OB_ENTRY_NOT_EXIST == ret) {
492ret = OB_SUCCESS;
493} else {
494LOG_WARN("failed to load ddl user error", K(ret), K(tenant_id), K(task_id));
495}
496} else {
497is_exist = true;
498}
499return ret;
500}
501
502int ObImportTableJobScheduler::persist_import_table_task_(
503common::ObMySQLTransaction &trans, const share::ObImportTableTask &task)
504{
505int ret = OB_SUCCESS;
506if (OB_FAIL(task_helper_.insert_import_table_task(trans, task))) {
507LOG_WARN("failed to get import table job", K(ret), K(task));
508} else {
509LOG_INFO("succeed to persist import table task", K(task));
510}
511return ret;
512}
513
514int ObImportTableJobScheduler::get_import_table_tasks_(
515const share::ObImportTableJob &job, common::ObIArray<share::ObImportTableTask> &import_tasks)
516{
517int ret = OB_SUCCESS;
518if (OB_FAIL(task_helper_.get_all_import_table_tasks_by_initiator(*sql_proxy_, job, import_tasks))) {
519LOG_WARN("failed to get import table task", K(ret), K(job));
520}
521return ret;
522}
523
524int ObImportTableJobScheduler::advance_status_(
525common::ObISQLClient &sql_proxy, const share::ObImportTableJob &job, const share::ObImportTableJobStatus &next_status)
526{
527int ret = OB_SUCCESS;
528if (OB_FAIL(job_helper_.advance_status(sql_proxy, job, next_status))) {
529LOG_WARN("failed to advance status", K(ret), K(job), K(next_status));
530} else {
531wakeup_();
532}
533return ret;
534}
535
536int ObImportTableTaskScheduler::init(share::schema::ObMultiVersionSchemaService &schema_service,
537common::ObMySQLProxy &sql_proxy, share::ObImportTableTask &task)
538{
539int ret = OB_SUCCESS;
540if (IS_INIT) {
541ret = OB_INIT_TWICE;
542LOG_WARN("ObImportTableTaskScheduler init twice", K(ret));
543} else if (OB_FAIL(helper_.init(task.get_tenant_id()))) {
544LOG_WARN("failed to init recover table persist helper", K(ret));
545} else {
546schema_service_ = &schema_service;
547sql_proxy_ = &sql_proxy;
548import_task_ = &task;
549is_inited_ = true;
550}
551return ret;
552}
553
554void ObImportTableTaskScheduler::wakeup_() {
555ObRestoreService *restore_service = nullptr;
556if (OB_ISNULL(restore_service = MTL(ObRestoreService *))) {
557LOG_ERROR_RET(OB_ERR_UNEXPECTED, "restore service must not be null");
558} else {
559restore_service->wakeup();
560}
561}
562
563void ObImportTableTaskScheduler::reset()
564{
565is_inited_ = false;
566schema_service_ = nullptr;
567sql_proxy_ = nullptr;
568import_task_ = nullptr;
569}
570
571int ObImportTableTaskScheduler::process()
572{
573int ret = OB_SUCCESS;
574LOG_INFO("ready process import table task", KPC_(import_task));
575if (IS_NOT_INIT) {
576ret = OB_NOT_INIT;
577LOG_WARN("ObIImportTableTaskMgr not inited", K(ret));
578} else {
579const ObImportTableTaskStatus &status = import_task_->get_status();
580switch(status) {
581case ObImportTableTaskStatus::INIT: {
582if (OB_FAIL(init_())) {
583LOG_WARN("failed to do init work", K(ret), KPC_(import_task));
584}
585break;
586}
587case ObImportTableTaskStatus::DOING: {
588if (OB_FAIL(doing_())) {
589LOG_WARN("failed to do doing work", K(ret), KPC_(import_task));
590}
591break;
592}
593case ObImportTableTaskStatus::FINISH: {
594break; // do nothing
595}
596default: {
597ret = OB_ERR_SYS;
598LOG_WARN("invalid recover task status", K(ret));
599}
600}
601}
602return ret;
603}
604
605int ObImportTableTaskScheduler::init_()
606{
607int ret = OB_SUCCESS;
608if (OB_FAIL(gen_import_ddl_task_())) {
609LOG_WARN("failed to generate import ddl task", K(ret), KPC_(import_task));
610}
611
612int tmp_ret = OB_SUCCESS;
613if (OB_TMP_FAIL(try_advance_status_(ret))) {
614LOG_WARN("failed to advance status", K(tmp_ret), K(ret));
615ret = OB_SUCC(ret) ? tmp_ret : ret;
616}
617return ret;
618}
619
620int ObImportTableTaskScheduler::doing_()
621{
622int ret = OB_SUCCESS;
623bool is_finish = false;
624if (OB_FAIL(wait_import_ddl_task_finish_(is_finish))) {
625LOG_WARN("failed to do doing work", K(ret), KPC_(import_task));
626} else if (!is_finish) {
627} else if (OB_FAIL(try_advance_status_(ret))) {
628LOG_WARN("failed to advance status", K(ret));
629}
630return ret;
631}
632
633int ObImportTableTaskScheduler::try_advance_status_(const int err_code)
634{
635int ret = OB_SUCCESS;
636if (OB_FAIL(err_code) && ObImportTableUtil::can_retrieable_err(err_code)) { // do nothing
637} else {
638
639share::ObImportTableTaskStatus next_status = import_task_->get_status().get_next_status(err_code);
640if (import_task_->get_result().is_succeed()) { // avoid to cover comment
641share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
642ObImportResult result;
643if (OB_FAIL(result.set_result(err_code, trace_id, GCONF.self_addr_))) {
644LOG_WARN("failed to set result", K(ret));
645} else if (OB_FALSE_IT(import_task_->set_result(result))) {
646}
647}
648if (FAILEDx(helper_.advance_status(*sql_proxy_, *import_task_, next_status))) {
649LOG_WARN("failed to advance status", K(ret), KPC_(import_task), K(next_status));
650} else {
651wakeup_();
652}
653}
654return ret;
655}
656
657int ObImportTableTaskScheduler::gen_import_ddl_task_()
658{
659int ret = OB_SUCCESS;
660obrpc::ObRecoverRestoreTableDDLArg arg;
661bool is_exist = false;
662LOG_INFO("[IMPORT_TABLE]start to create import table", KPC_(import_task));
663if (OB_FAIL(check_import_ddl_task_exist_(is_exist))) {
664LOG_WARN("failed to check import ddl task", K(ret));
665} else if (is_exist) {
666LOG_INFO("[IMPORT_TABLE]import ddl task exist, skip it", KPC_(import_task), K(arg));
667} else if (OB_FAIL(construct_import_table_arg_(arg))) {
668LOG_WARN("failed to construct import table arg", K(ret));
669} else if (OB_FAIL(ObDDLServerClient::execute_recover_restore_table(arg))) {
670if (OB_ENTRY_EXIST == ret) {
671// old and new leader both execute import ddl at the same time.
672ret = OB_EAGAIN;
673LOG_WARN("import ddl task exist, try again", K(ret), K(arg));
674} else {
675LOG_WARN("fail to start import table", K(ret), K(arg));
676}
677} else {
678LOG_INFO("[IMPORT_TABLE]succeed execute_recover_restore_table", KPC_(import_task), K(arg));
679}
680return ret;
681}
682
683int ObImportTableTaskScheduler::check_import_ddl_task_exist_(bool &is_exist)
684{
685int ret = OB_SUCCESS;
686is_exist = false;
687const uint64_t tenant_id = import_task_->get_tenant_id();
688const int64_t task_id = import_task_->get_task_id();
689int64_t unused_user_msg_len = 0;
690ObDDLErrorMessageTableOperator::ObBuildDDLErrorMessage error_message;
691if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy_, tenant_id, task_id, is_exist))) {
692LOG_WARN("failed to check task id exist", K(ret), K(tenant_id), K(task_id));
693} else if (is_exist) {
694} else if (OB_FAIL(ObDDLErrorMessageTableOperator::get_ddl_error_message(tenant_id,
695task_id,
696-1 /* target_object_id */,
697ObAddr()/*unused addr*/,
698false /* is_ddl_retry_task */,
699*sql_proxy_,
700error_message,
701unused_user_msg_len))) {
702if (OB_ENTRY_NOT_EXIST == ret) {
703ret = OB_SUCCESS;
704} else {
705LOG_WARN("failed to load ddl user error", K(ret), K(tenant_id), K(task_id));
706}
707} else {
708is_exist = true;
709}
710return ret;
711}
712
713int ObImportTableTaskScheduler::construct_import_table_arg_(obrpc::ObRecoverRestoreTableDDLArg &arg)
714{
715int ret = OB_SUCCESS;
716ObSchemaGetterGuard src_tenant_guard;
717const ObTableSchema *src_table_schema = nullptr;
718ObFixedLengthString<common::OB_MAX_TIMESTAMP_TZ_LENGTH> time_zone;
719MTL_SWITCH(OB_SYS_TENANT_ID) {
720if (OB_FAIL(schema_service_->get_tenant_schema_guard(import_task_->get_src_tenant_id(), src_tenant_guard))) {
721LOG_WARN("failed to get tenant schema guard", K(ret), KPC_(import_task));
722} else if (OB_FAIL(src_tenant_guard.get_table_schema(import_task_->get_src_tenant_id(),
723import_task_->get_src_database(),
724import_task_->get_src_table(),
725false,
726src_table_schema))) {
727LOG_WARN("failed to get table schema", K(ret), KPC_(import_task));
728} else if (OB_ISNULL(src_table_schema)) {
729ret = OB_TABLE_NOT_EXIST;
730LOG_WARN("src table not exist", K(ret), KPC_(import_task));
731}
732}
733if (FAILEDx(construct_import_table_schema_(*src_table_schema, arg.target_schema_))) {
734LOG_WARN("failed to construct import table schema", K(ret));
735} else {
736arg.src_tenant_id_ = src_table_schema->get_tenant_id();
737arg.src_table_id_ = src_table_schema->get_table_id();
738arg.ddl_task_id_ = import_task_->get_task_id();
739arg.exec_tenant_id_ = import_task_->get_tenant_id();
740const ObSysVarSchema *data_format_schema = nullptr;
741const ObSysVarSchema *nls_timestamp_format = nullptr;
742const ObSysVarSchema *nls_timestamp_tz_format = nullptr;
743if (OB_FAIL(share::ObBackupUtils::get_tenant_sys_time_zone_wrap(import_task_->get_tenant_id(),
744time_zone,
745arg.tz_info_wrap_))) {
746LOG_WARN("failed to get tenant sys timezoen wrap", K(ret), KPC_(import_task));
747} else if (OB_FAIL(src_tenant_guard.get_tenant_system_variable(import_task_->get_src_tenant_id(),
748share::SYS_VAR_NLS_DATE_FORMAT,
749data_format_schema))) {
750LOG_WARN("fail to get tenant system variable", K(ret));
751} else if (OB_FAIL(src_tenant_guard.get_tenant_system_variable(import_task_->get_src_tenant_id(),
752share::SYS_VAR_NLS_TIMESTAMP_FORMAT,
753nls_timestamp_format))) {
754LOG_WARN("fail to get tenant system variable", K(ret));
755} else if (OB_FAIL(src_tenant_guard.get_tenant_system_variable(import_task_->get_src_tenant_id(),
756share::SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT,
757nls_timestamp_tz_format))) {
758LOG_WARN("fail to get tenant system variable", K(ret));
759} else if (OB_ISNULL(data_format_schema) || OB_ISNULL(nls_timestamp_format) || OB_ISNULL(nls_timestamp_tz_format)) {
760ret = OB_ERR_UNEXPECTED;
761LOG_WARN("var schema must not be null", K(ret), KP(data_format_schema), KP(nls_timestamp_format), KP(nls_timestamp_tz_format));
762} else if (OB_FAIL(ob_write_string(arg.allocator_, data_format_schema->get_value(), arg.nls_formats_[ObNLSFormatEnum::NLS_DATE]))) {
763LOG_WARN("deep copy failed", K(ret), K(data_format_schema->get_value()));
764} else if (OB_FAIL(ob_write_string(arg.allocator_, nls_timestamp_format->get_value(), arg.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP]))) {
765LOG_WARN("deep copy failed", K(ret), K(nls_timestamp_format->get_value()));
766} else if (OB_FAIL(ob_write_string(arg.allocator_, nls_timestamp_tz_format->get_value(), arg.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP_TZ]))) {
767LOG_WARN("deep copy failed", K(ret), K(nls_timestamp_tz_format->get_value()));
768} else {
769arg.tz_info_ = arg.tz_info_wrap_.get_tz_info_offset();
770}
771}
772return ret;
773}
774
775int ObImportTableTaskScheduler::construct_import_table_schema_(
776const share::schema::ObTableSchema &src_table_schema, share::schema::ObTableSchema &target_table_schema)
777{
778int ret = OB_SUCCESS;
779ObSchemaGetterGuard target_tenant_guard;
780if (OB_FAIL(schema_service_->get_tenant_schema_guard(import_task_->get_tenant_id(), target_tenant_guard))) {
781LOG_WARN("failed to get tenant schema guard", K(ret), KPC_(import_task));
782} else if (OB_FAIL(target_table_schema.assign(src_table_schema))) {
783LOG_WARN("failed to assign target table schema", K(ret));
784} else {
785target_table_schema.set_tenant_id(import_task_->get_tenant_id());
786target_table_schema.set_table_name(import_task_->get_target_table());
787target_table_schema.set_table_id(OB_INVALID_ID);
788
789target_table_schema.set_data_table_id(0);
790target_table_schema.clear_constraint();
791target_table_schema.clear_foreign_key_infos();
792target_table_schema.set_table_state_flag(ObTableStateFlag::TABLE_STATE_NORMAL);
793target_table_schema.set_mlog_tid(OB_INVALID_ID); // mlog (if exists) will be discarded
794
795uint64_t database_id = OB_INVALID_ID;
796if (OB_FAIL(target_tenant_guard.get_database_id(import_task_->get_tenant_id(),
797import_task_->get_target_database(),
798database_id))) {
799LOG_WARN("failed to get database id", K(ret), KPC_(import_task));
800} else if (OB_INVALID_ID == database_id) {
801ret = OB_ERR_BAD_DATABASE;
802LOG_WARN("invalid target database name", K(ret), K(database_id), KPC_(import_task));
803} else {
804target_table_schema.set_database_id(database_id);
805}
806
807uint64_t table_group_id = OB_INVALID_ID;
808if (import_task_->get_target_tablegroup().empty()) {
809} else if (FAILEDx(target_tenant_guard.get_tablegroup_id(import_task_->get_tenant_id(),
810import_task_->get_target_tablegroup(),
811table_group_id))) {
812LOG_WARN("failed to get table group id", K(ret), KPC_(import_task));
813} else if (OB_INVALID_ID == table_group_id) {
814ret = OB_TABLEGROUP_NOT_EXIST;
815LOG_WARN("invalid target tablegroup id", K(ret), K(table_group_id), KPC_(import_task));
816} else {
817target_table_schema.set_tablegroup_id(table_group_id);
818}
819
820const schema::ObTablespaceSchema *schema = nullptr;
821if (import_task_->get_target_tablespace().empty()) {
822} else if (FAILEDx(target_tenant_guard.get_tablespace_schema_with_name(import_task_->get_tenant_id(),
823import_task_->get_target_tablespace(),
824schema))) {
825LOG_WARN("failed to get tablespace schema", K(ret), KPC_(import_task));
826} else if (OB_ISNULL(schema)) {
827ret = OB_TABLESPACE_NOT_EXIST;
828LOG_WARN("tablespace must not be null", K(ret), KPC_(import_task));
829} else if (OB_FAIL(target_table_schema.set_encryption_str(schema->get_encryption_name()))) {
830LOG_WARN("failed to set encryption str", K(ret));
831} else if (OB_FAIL(target_table_schema.set_encrypt_key(schema->get_encrypt_key()))) {
832LOG_WARN("failed to set encrypt key", K(ret));
833} else {
834target_table_schema.set_master_key_id(schema->get_master_key_id());
835target_table_schema.set_tablespace_id(schema->get_tablespace_id());
836}
837}
838return ret;
839}
840
841int ObImportTableTaskScheduler::wait_import_ddl_task_finish_(bool &is_finish)
842{
843int ret = OB_SUCCESS;
844int64_t unused_user_msg_len = 0;
845ObDDLErrorMessageTableOperator::ObBuildDDLErrorMessage error_message;
846uint64_t tenant_id = import_task_->get_tenant_id();
847int64_t task_id = import_task_->get_task_id();
848is_finish = false;
849if (OB_FAIL(ObDDLErrorMessageTableOperator::get_ddl_error_message(tenant_id,
850task_id,
851-1 /* target_object_id */,
852ObAddr()/*unused addr*/,
853false /* is_ddl_retry_task */,
854*sql_proxy_,
855error_message,
856unused_user_msg_len))) {
857if (OB_ENTRY_NOT_EXIST == ret) {
858ret = OB_SUCCESS;
859if(REACH_TIME_INTERVAL(120 * 1000 * 1000)) {
860LOG_WARN("[IMPORT_TABLE]import ddl task does not finish, retry again", K(tenant_id), K(task_id));
861}
862} else {
863LOG_WARN("failed to load ddl user error", K(ret), K(tenant_id), K(task_id));
864}
865} else if (OB_SUCCESS != error_message.ret_code_) {
866ObImportResult result;
867if (OB_FAIL(result.set_result(false, error_message.user_message_))) {
868LOG_WARN("failed to set result", K(ret), K(error_message));
869} else {
870import_task_->set_result(result);
871is_finish = true;
872LOG_INFO("[IMPORT_TABLE]import table failed", KPC_(import_task), K(error_message));
873}
874} else if (OB_FAIL(statistics_import_results_())) {
875LOG_WARN("failed to statistics import result", K(ret));
876} else if (OB_FAIL(helper_.report_import_task_statistics(*sql_proxy_, *import_task_))) {
877LOG_WARN("failed to report import task statistics", K(ret), KPC_(import_task));
878} else {
879is_finish = true;
880LOG_INFO("[IMPORT_TABLE]import table succeed", KPC_(import_task), K(error_message));
881}
882return ret;
883}
884
885int ObImportTableTaskScheduler::statistics_import_results_()
886{
887int ret = OB_SUCCESS;
888int tmp_ret = OB_SUCCESS;
889ObSchemaGetterGuard guard;
890const ObTableSchema * table_schema = nullptr;
891const int64_t tenant_id = import_task_->get_tenant_id();
892const ObString &db_name = import_task_->get_target_database();
893const ObString &table_name = import_task_->get_target_table();
894if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, guard))) {
895LOG_WARN("failed get tenant schema guard", K(ret), K(tenant_id));
896} else if (OB_FAIL(guard.get_table_schema(tenant_id, db_name, table_name, false/*no index*/, table_schema))) {
897LOG_WARN("failed to get table schema", K(ret), K(tenant_id), K(db_name), K(table_name));
898} else if (OB_ISNULL(table_schema)) {
899ret = OB_TABLE_NOT_EXIST;
900LOG_WARN("table is not exist", K(tenant_id), K(db_name), K(table_name));
901ObImportResult::Comment comment;
902if (OB_TMP_FAIL(databuff_printf(comment.ptr(), comment.capacity(),
903"table %.*s has been deleted by user", table_name.length(), table_name.ptr()))) {
904LOG_WARN("failed to databuff printf", K(ret), K(tmp_ret));
905} else {
906import_task_->get_result().set_result(true, comment);
907}
908} else {
909import_task_->set_completion_ts(ObTimeUtility::current_time());
910import_task_->set_imported_index_count(table_schema->get_simple_index_infos().count());
911import_task_->set_failed_index_count(import_task_->get_total_index_count() - import_task_->get_imported_index_count());
912import_task_->set_imported_constraint_count(table_schema->get_constraint_count());
913import_task_->set_failed_constraint_count(import_task_->get_total_constraint_count() - import_task_->get_imported_constraint_count());
914import_task_->set_imported_ref_constraint_count(table_schema->get_foreign_key_infos().count());
915import_task_->set_failed_ref_constraint_count(import_task_->get_total_ref_constraint_count() - import_task_->get_imported_ref_constraint_count());
916import_task_->set_imported_trigger_count(table_schema->get_trigger_list().count());
917import_task_->set_failed_trigger_count(import_task_->get_total_trigger_count() - import_task_->get_imported_trigger_count());
918}
919return ret;
920}
921