oceanbase

Форк
0
/
ob_recover_table_job_scheduler.cpp 
892 строки · 35.4 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "ob_recover_table_job_scheduler.h"
16
#include "rootserver/ob_rs_event_history_table_operator.h"
17
#include "rootserver/restore/ob_recover_table_initiator.h"
18
#include "rootserver/restore/ob_restore_service.h"
19
#include "share/backup/ob_backup_data_table_operator.h"
20
#include "share/ob_primary_standby_service.h"
21
#include "share/location_cache/ob_location_service.h"
22
#include "share/restore/ob_physical_restore_table_operator.h"
23
#include "share/restore/ob_import_util.h"
24
#include "storage/tablelock/ob_lock_inner_connection_util.h"
25
#include "observer/ob_inner_sql_connection.h"
26

27
using namespace oceanbase;
28
using namespace rootserver;
29
using namespace share;
30

31
void ObRecoverTableJobScheduler::reset()
32
{
33
  rs_rpc_proxy_ = nullptr;
34
  sql_proxy_ = nullptr;
35
  schema_service_ = nullptr;
36
  srv_rpc_proxy_ = nullptr;
37
  is_inited_ = false;
38
  tenant_id_ = OB_INVALID_TENANT_ID;
39
}
40

41
int ObRecoverTableJobScheduler::init(
42
    share::schema::ObMultiVersionSchemaService &schema_service,
43
    common::ObMySQLProxy &sql_proxy,
44
    obrpc::ObCommonRpcProxy &rs_rpc_proxy,
45
    obrpc::ObSrvRpcProxy &srv_rpc_proxy)
46
{
47
  int ret = OB_SUCCESS;
48
  const uint64_t tenant_id = gen_user_tenant_id(MTL_ID());
49
  if (IS_INIT) {
50
    ret = OB_INIT_TWICE;
51
    LOG_WARN("ObRecoverTableJobScheduler init twice", K(ret));
52
  } else if (OB_FAIL(helper_.init(tenant_id))) {
53
    LOG_WARN("failed to init table op", K(ret), K(tenant_id));
54
  } else {
55
    schema_service_ = &schema_service;
56
    sql_proxy_ = &sql_proxy;
57
    rs_rpc_proxy_ = &rs_rpc_proxy;
58
    srv_rpc_proxy_ = &srv_rpc_proxy;
59
    tenant_id_ = tenant_id;
60
    is_inited_ = true;
61
  }
62
  return ret;
63
}
64

65
void ObRecoverTableJobScheduler::wakeup_()
66
{
67
  ObRestoreService *restore_service = nullptr;
68
  if (OB_ISNULL(restore_service = MTL(ObRestoreService *))) {
69
    LOG_ERROR_RET(OB_ERR_UNEXPECTED, "restore service must not be null");
70
  } else {
71
    restore_service->wakeup();
72
  }
73
}
74

75
void ObRecoverTableJobScheduler::do_work()
76
{
77
  int ret = OB_SUCCESS;
78
  ObArray<share::ObRecoverTableJob> jobs;
79
  if (IS_NOT_INIT) {
80
    ret = OB_NOT_INIT;
81
    LOG_WARN("not init ObSysRecoverTableJobScheduler", K(ret));
82
  } else if (OB_FAIL(check_compatible_())) {
83
    LOG_WARN("check compatible failed", K(ret));
84
  } else if (OB_FAIL(helper_.get_all_recover_table_job(*sql_proxy_, jobs))) {
85
    LOG_WARN("failed to get recover all recover table job", K(ret));
86
  } else {
87
    ObCurTraceId::init(GCTX.self_addr());
88
    ARRAY_FOREACH(jobs, i) {
89
      ObRecoverTableJob &job = jobs.at(i);
90
      if (!job.is_valid()) {
91
        ret = OB_ERR_UNEXPECTED;
92
        LOG_WARN("recover table job is not valid", K(ret), K(job));
93
      } else if (is_sys_tenant(job.get_tenant_id())) {
94
        sys_process_(job);
95
      } else if (is_user_tenant(job.get_tenant_id())) {
96
        user_process_(job);
97
      } else {
98
        ret = OB_ERR_UNEXPECTED;
99
        LOG_WARN("invalid tenant", K(ret), K(job));
100
      }
101
    }
102
  }
103

104
}
105

106
int ObRecoverTableJobScheduler::check_compatible_() const
107
{
108
  int ret = OB_SUCCESS;
109
  uint64_t data_version = 0;
110
  if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, data_version))) {
111
    LOG_WARN("fail to get data version", K(ret), K_(tenant_id));
112
  } else if (data_version < DATA_VERSION_4_2_1_0) {
113
    ret = OB_OP_NOT_ALLOW;
114
    LOG_WARN("min data version is smaller than v4.2.1", K(ret), K_(tenant_id), K(data_version));
115
  } else if (is_sys_tenant(tenant_id_)) {
116
  } else if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(tenant_id_), data_version))) {
117
    LOG_WARN("fail to get data version", K(ret), "tenant_id", gen_meta_tenant_id(tenant_id_));
118
  } else if (data_version < DATA_VERSION_4_2_1_0) {
119
    ret = OB_OP_NOT_ALLOW;
120
    LOG_WARN("min data version is smaller than v4.2.1", K(ret), K_(tenant_id), K(data_version));
121
  }
122

123
  return ret;
124
}
125

126
int ObRecoverTableJobScheduler::try_advance_status_(share::ObRecoverTableJob &job, const int err_code)
127
{
128
  int ret = OB_SUCCESS;
129
  share::ObRecoverTableStatus next_status;
130
  const uint64_t tenant_id = job.get_tenant_id();
131
  const int64_t job_id = job.get_job_id();
132
  bool need_advance_status = true;
133
  if (err_code != OB_SUCCESS) {
134
    if (ObImportTableUtil::can_retrieable_err(err_code)) {
135
      need_advance_status = false;
136
    } else {
137
      share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
138
      next_status = ObRecoverTableStatus::FAILED;
139
      if (job.get_result().is_comment_setted()) {
140
      } else if (OB_FAIL(job.get_result().set_result(
141
          err_code, trace_id, GCONF.self_addr_))) {
142
        LOG_WARN("failed to set result", K(ret));
143
      }
144
      LOG_WARN("[RECOVER_TABLE]recover table job failed", K(err_code), K(job));
145
      ROOTSERVICE_EVENT_ADD("recover_table", "recover_table_failed", K(tenant_id), K(job_id), K(err_code), K(trace_id));
146
    }
147
  } else if (job.get_tenant_id() == OB_SYS_TENANT_ID) {
148
    next_status = ObRecoverTableStatus::get_sys_next_status(job.get_status());
149
  } else {
150
    next_status = ObRecoverTableStatus::get_user_next_status(job.get_status());
151
  }
152
  if (next_status.is_finish()) {
153
    job.set_end_ts(ObTimeUtility::current_time());
154
  }
155
  if (OB_FAIL(ret)) {
156
  } else if (need_advance_status && OB_FAIL(helper_.advance_status(*sql_proxy_, job, next_status))) {
157
    LOG_WARN("failed to advance statsu", K(ret), K(job), K(next_status));
158
  } else {
159
    wakeup_();
160
    ROOTSERVICE_EVENT_ADD("recover_table", "advance_status", K(tenant_id), K(job_id), K(next_status));
161
  }
162
  return ret;
163
}
164

165
void ObRecoverTableJobScheduler::sys_process_(share::ObRecoverTableJob &job)
166
{
167
  int ret = OB_SUCCESS;
168
  LOG_INFO("ready to schedule sys recover table job", K(job));
169
  switch(job.get_status()) {
170
    case ObRecoverTableStatus::Status::PREPARE: {
171
      if (OB_FAIL(sys_prepare_(job))) {
172
        LOG_WARN("failed to do sys prepare work", K(ret), K(job));
173
      }
174
      break;
175
    }
176
    case ObRecoverTableStatus::Status::RECOVERING: {
177
      if (OB_FAIL(recovering_(job))) {
178
        LOG_WARN("failed to do sys recovering work", K(ret), K(job));
179
      }
180
      break;
181
    }
182
    case ObRecoverTableStatus::Status::COMPLETED:
183
    case ObRecoverTableStatus::Status::FAILED: {
184
      if (OB_FAIL(sys_finish_(job))) {
185
        LOG_WARN("failed to do sys finish work", K(ret), K(job));
186
      }
187
      break;
188
    }
189
    default: {
190
      ret = OB_ERR_SYS;
191
      LOG_WARN("invalid sys recover job status", K(ret), K(job));
192
      break;
193
    }
194
  }
195
}
196

197
int ObRecoverTableJobScheduler::check_target_tenant_version_(share::ObRecoverTableJob &job)
198
{
199
  int ret = OB_SUCCESS;
200
  uint64_t data_version = 0;
201
  const uint64_t target_tenant_id = job.get_target_tenant_id();
202
  // check data version
203
  if (OB_FAIL(GET_MIN_DATA_VERSION(target_tenant_id, data_version))) {
204
    LOG_WARN("fail to get data version", K(ret), K(target_tenant_id));
205
  } else if (data_version < DATA_VERSION_4_2_1_0) {
206
    ret = OB_OP_NOT_ALLOW;
207
    LOG_WARN("min data version is smaller than v4.2.1", K(ret), K(target_tenant_id), K(data_version));
208
  } else if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(target_tenant_id), data_version))) {
209
    LOG_WARN("fail to get data version", K(ret), "target_tenant_id", gen_meta_tenant_id(target_tenant_id));
210
  } else if (data_version < DATA_VERSION_4_2_1_0) {
211
    ret = OB_OP_NOT_ALLOW;
212
    LOG_WARN("min data version is smaller than v4.2.1", K(ret), K(target_tenant_id), K(data_version));
213
  }
214

215
  if (OB_FAIL(ret)) {
216
    int tmp_ret = OB_SUCCESS;
217
    schema::ObSchemaGetterGuard guard;
218
    if (OB_TMP_FAIL(ObImportTableUtil::get_tenant_schema_guard(*schema_service_, job.get_target_tenant_id(), guard))) {
219
      if (OB_TENANT_NOT_EXIST == tmp_ret) {
220
        ret = tmp_ret;
221
      }
222
      LOG_WARN("failed to get tenant schema guard", K(tmp_ret));
223
    }
224
  }
225
  return ret;
226
}
227

228
int ObRecoverTableJobScheduler::sys_prepare_(share::ObRecoverTableJob &job)
229
{
230
  int ret = OB_SUCCESS;
231
  ObRecoverTableJob target_job;
232
  share::ObRecoverTablePersistHelper helper;
233
  DEBUG_SYNC(BEFORE_INSERT_UERR_RECOVER_TABLE_JOB);
234
  if (OB_FAIL(check_target_tenant_version_(job))) {
235
    LOG_WARN("failed to check target tenant version", K(ret));
236
  } else if (OB_FAIL(helper.init(job.get_target_tenant_id()))) {
237
    LOG_WARN("failed to init recover table persist helper", K(ret));
238
  }
239

240
  ObMySQLTransaction trans;
241
  const uint64_t meta_tenant_id = gen_meta_tenant_id(job.get_target_tenant_id());
242
  if (FAILEDx(trans.start(sql_proxy_, meta_tenant_id))) {
243
    LOG_WARN("failed to start trans", K(ret));
244
  } else if (OB_FAIL(lock_recover_table_(meta_tenant_id, trans))) {
245
    LOG_WARN("failed to lock recover table", K(ret));
246
  } else if (OB_FAIL(helper.get_recover_table_job_by_initiator(trans, job, target_job))) {
247
    if (OB_ENTRY_NOT_EXIST == ret) {
248
      if (OB_FAIL(helper.get_recover_table_job_history_by_initiator(trans, job, target_job))) {
249
        if (OB_ENTRY_NOT_EXIST == ret) {
250
          if (OB_FAIL(insert_user_job_(job, trans, helper))) {
251
            LOG_WARN("failed to insert user job", K(ret), K(job));
252
          } else {
253
            ROOTSERVICE_EVENT_ADD("recover_table", "insert_user_job",
254
                                  "tenant_id", job.get_tenant_id(),
255
                                  "job_id", job.get_job_id());
256
          }
257
        } else {
258
          LOG_WARN("failed to get target tenant recover table job history", K(ret), K(job));
259
        }
260
      }
261
    } else {
262
      LOG_WARN("failed to get target tenant recover table job", K(ret), K(job));
263
    }
264
  }
265

266
  if (trans.is_started()) {
267
    int tmp_ret = OB_SUCCESS;
268
    if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
269
      ret = OB_SUCC(ret) ? tmp_ret : ret;
270
      LOG_WARN("failed to end trans", K(ret));
271
    }
272
  }
273

274
#ifdef ERRSIM
275
  ret = OB_E(EventTable::EN_INSERT_USER_RECOVER_JOB_FAILED) OB_SUCCESS;
276
  if (OB_FAIL(ret)) {
277
    ROOTSERVICE_EVENT_ADD("recover_table_errsim", "insert_user_job_failed");
278
  }
279
#endif
280
  int tmp_ret = OB_SUCCESS;
281
  if (OB_SUCCESS != (tmp_ret = try_advance_status_(job, ret))) {
282
    LOG_INFO("failed to advance status", K(tmp_ret), K(ret), K(job));
283
  }
284
  return ret;
285
}
286

287
int ObRecoverTableJobScheduler::lock_recover_table_(
288
    const uint64_t tenant_id, ObMySQLTransaction &trans)
289
{
290
  int ret = OB_SUCCESS;
291
  ObLockTableRequest recover_job_arg;
292
  recover_job_arg.table_id_ = OB_ALL_RECOVER_TABLE_JOB_TID;
293
  recover_job_arg.lock_mode_ = EXCLUSIVE;
294
  recover_job_arg.timeout_us_ = 0; // try lock
295
  recover_job_arg.op_type_ = IN_TRANS_COMMON_LOCK; // unlock when trans end
296
  observer::ObInnerSQLConnection *conn = nullptr;
297
  if (OB_ISNULL(conn = dynamic_cast<observer::ObInnerSQLConnection *>(trans.get_connection()))) {
298
    ret = OB_ERR_UNEXPECTED;
299
    LOG_WARN("conn is NULL", KR(ret));
300
  } else if (OB_FAIL(transaction::tablelock::ObInnerConnectionLockUtil::lock_table(tenant_id, recover_job_arg, conn))) {
301
    LOG_WARN("failed to lock table", K(ret));
302
  }
303
  return ret;
304
}
305

306
int ObRecoverTableJobScheduler::insert_user_job_(
307
    const share::ObRecoverTableJob &job,
308
    ObMySQLTransaction &trans,
309
    share::ObRecoverTablePersistHelper &helper)
310
{
311
  int ret = OB_SUCCESS;
312
  ObRecoverTableJob target_job;
313

314
  if (OB_FAIL(target_job.assign(job))) {
315
    LOG_WARN("failed to assign target job", K(ret));
316
  } else {
317
    target_job.set_tenant_id(job.get_target_tenant_id());
318
    target_job.set_initiator_tenant_id(job.get_tenant_id());
319
    target_job.set_initiator_job_id(job.get_job_id());
320
    target_job.set_target_tenant_id(target_job.get_tenant_id());
321
  }
322
  int64_t job_id = 0;
323
  if (FAILEDx(ObLSBackupInfoOperator::get_next_job_id(trans, job.get_target_tenant_id(), job_id))) {
324
    LOG_WARN("failed to get next job id", K(ret), "tenant_id", job.get_target_tenant_id());
325
  } else if (OB_FALSE_IT(target_job.set_job_id(job_id))) {
326
  } else if (OB_FAIL(helper.insert_recover_table_job(trans, target_job))) {
327
    LOG_WARN("failed to insert initial recover table job", K(ret));
328
  }
329

330
  return ret;
331
}
332

333
int ObRecoverTableJobScheduler::recovering_(share::ObRecoverTableJob &job)
334
{
335
  int ret = OB_SUCCESS;
336
  share::ObRecoverTablePersistHelper helper;
337
  ObRecoverTableJob target_job;
338
  bool user_job_finish = true;
339
  bool user_tenant_not_exist = false;
340
  int tmp_ret = OB_SUCCESS;
341
  DEBUG_SYNC(BEFORE_RECOVER_UESR_RECOVER_TABLE_JOB);
342
  if (OB_FAIL(helper.init(job.get_target_tenant_id()))) {
343
    LOG_WARN("failed to init recover table persist helper", K(ret));
344
  } else if (OB_FAIL(helper.get_recover_table_job_history_by_initiator(*sql_proxy_, job, target_job))) {
345
    if (OB_ENTRY_NOT_EXIST == ret) {
346
      user_job_finish = false;
347
      ret = OB_SUCCESS;
348
    } else {
349
      LOG_WARN("failed to get target tenant recover table job history", K(ret), K(job));
350
    }
351
  } else {
352
    ROOTSERVICE_EVENT_ADD("recover_table", "sys_wait_user_recover_finish",
353
                          "tenant_id", job.get_tenant_id(),
354
                          "job_id", job.get_job_id());
355
    job.set_result(target_job.get_result());
356
  }
357
  if (OB_FAIL(ret)) {
358
    schema::ObSchemaGetterGuard guard;
359
    if (OB_TMP_FAIL(ObImportTableUtil::get_tenant_schema_guard(*schema_service_, job.get_target_tenant_id(), guard))) {
360
      if (OB_TENANT_NOT_EXIST == tmp_ret) {
361
        user_tenant_not_exist = true;
362
      }
363
      LOG_WARN("failed to get tenant schema guard", K(tmp_ret));
364
    }
365
  }
366

367
  if ((OB_FAIL(ret) && user_tenant_not_exist) || (OB_SUCC(ret) && user_job_finish)) {
368
    if (OB_SUCC(ret) && !job.get_result().is_succeed()) {
369
      ret = OB_LS_RESTORE_FAILED;
370
    }
371
    job.set_end_ts(ObTimeUtility::current_time());
372
    if (OB_SUCCESS != (tmp_ret = try_advance_status_(job, ret))) {
373
      LOG_INFO("failed to advance status", K(tmp_ret), K(ret), K(job));
374
    }
375
  }
376
  return ret;
377
}
378

379
int ObRecoverTableJobScheduler::sys_finish_(const share::ObRecoverTableJob &job)
380
{
381
  int ret = OB_SUCCESS;
382
  ObMySQLTransaction trans;
383
  bool drop_aux_tenant = GCONF._auto_drop_recovering_auxiliary_tenant;
384
  if (drop_aux_tenant && OB_FAIL(drop_aux_tenant_(job))) {
385
    LOG_WARN("failed ot drop aux tenant", K(ret));
386
  } else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
387
    LOG_WARN("failed to start trans", K(ret));
388
  } else {
389
    if (OB_FAIL(helper_.insert_recover_table_job_history(trans, job))) {
390
      LOG_WARN("failed to insert recover table job history", K(ret), K(job));
391
    } else if (OB_FAIL(helper_.delete_recover_table_job(trans, job))) {
392
      LOG_WARN("failed to delete recover table job", K(ret), K(job));
393
    }
394

395
    if (OB_SUCC(ret)) {
396
      if (OB_FAIL(trans.end(true))) {
397
        LOG_WARN("failed to commit", K(ret));
398
      } else {
399
        ROOTSERVICE_EVENT_ADD("recover_table", "sys_recover_finish",
400
                      "tenant_id", job.get_tenant_id(),
401
                      "job_id", job.get_job_id());
402
      }
403
    } else {
404
      int tmp_ret = OB_SUCCESS;
405
      if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
406
        LOG_WARN("failed to roll back", K(tmp_ret), K(ret));
407
      }
408
    }
409
  }
410
  return ret;
411
}
412

413
int ObRecoverTableJobScheduler::drop_aux_tenant_(const share::ObRecoverTableJob &job)
414
{
415
  int ret = OB_SUCCESS;
416
  obrpc::ObDropTenantArg drop_tenant_arg;
417
  drop_tenant_arg.exec_tenant_id_ = OB_SYS_TENANT_ID;
418
  drop_tenant_arg.if_exist_ = false;
419
  drop_tenant_arg.force_drop_ = true;
420
  drop_tenant_arg.delay_to_drop_ = false;
421
  drop_tenant_arg.open_recyclebin_ = false;
422
  drop_tenant_arg.tenant_name_ = job.get_aux_tenant_name();
423
  drop_tenant_arg.drop_only_in_restore_ = false;
424
  common::ObAddr rs_addr;
425
  if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) {
426
    ret = OB_ERR_UNEXPECTED;
427
    LOG_WARN("rootserver rpc proxy or rs mgr must not be NULL", K(ret), K(GCTX));
428
  } else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) {
429
    LOG_WARN("failed to get rootservice address", K(ret));
430
  } else if (OB_FAIL(GCTX.rs_rpc_proxy_->to(rs_addr).drop_tenant(drop_tenant_arg))) {
431
    if (OB_TENANT_NOT_EXIST == ret) {
432
      ret = OB_SUCCESS;
433
    } else {
434
      LOG_WARN("failed to drop tenant", K(ret), K(drop_tenant_arg));
435
    }
436
  } else {
437
    LOG_INFO("[RECOVER_TABLE]drop aux tenant succeed", K(job));
438
    ROOTSERVICE_EVENT_ADD("recover_table", "drop_aux_tenant",
439
                          "tenant_id", job.get_tenant_id(),
440
                          "job_id", job.get_job_id(),
441
                          "aux_tenant_name", job.get_aux_tenant_name());
442
  }
443
  return ret;
444
}
445

446
void ObRecoverTableJobScheduler::user_process_(share::ObRecoverTableJob &job)
447
{
448
  int ret = OB_SUCCESS;
449
  LOG_INFO("ready to schedule user recover table job", K(job));
450
  switch(job.get_status()) {
451
    case ObRecoverTableStatus::Status::PREPARE: {
452
      if (OB_FAIL(user_prepare_(job))) {
453
        LOG_WARN("failed to do user prepare work", K(ret), K(job));
454
      }
455
      break;
456
    }
457
    case ObRecoverTableStatus::Status::RESTORE_AUX_TENANT: {
458
      if (OB_FAIL(restore_aux_tenant_(job))) {
459
        LOG_WARN("failed to do user restore aux tenant work", K(ret), K(job));
460
      }
461
      break;
462
    }
463
    case ObRecoverTableStatus::Status::ACTIVE_AUX_TENANT: {
464
      if (OB_FAIL(active_aux_tenant_(job))) {
465
        LOG_WARN("failed to do user active aux tenant work", K(ret), K(job));
466
      }
467
      break;
468
    }
469
    case ObRecoverTableStatus::Status::GEN_IMPORT_JOB: {
470
      if (OB_FAIL(gen_import_job_(job))) {
471
        LOG_WARN("failed to do user import work", K(ret), K(job));
472
      }
473
      break;
474
    }
475
    case ObRecoverTableStatus::Status::CANCELING: {
476
      if (OB_FAIL(canceling_(job))) {
477
        LOG_WARN("failed to do user canceling", K(ret), K(job));
478
      }
479
      break;
480
    }
481
    case ObRecoverTableStatus::Status::IMPORTING: {
482
      if (OB_FAIL(importing_(job))) {
483
        LOG_WARN("failed to do user importing work", K(ret), K(job));
484
      }
485
      break;
486
    }
487
    case ObRecoverTableStatus::Status::COMPLETED:
488
    case ObRecoverTableStatus::Status::FAILED: {
489
      if (OB_FAIL(user_finish_(job))) {
490
        LOG_WARN("failed to do user finish work", K(ret), K(job));
491
      }
492
      break;
493
    }
494
    default: {
495
      ret = OB_ERR_SYS;
496
      LOG_WARN("invalid sys recover job status", K(ret), K(job));
497
      break;
498
    }
499
  }
500
}
501

502
int ObRecoverTableJobScheduler::canceling_(share::ObRecoverTableJob &job)
503
{
504
  int ret = OB_SUCCESS;
505
  share::ObImportTableJobPersistHelper helper;
506
  share::ObImportTableJob import_job;
507
  bool cancel_import_job_finish = false;
508
  if (OB_FAIL(helper.init(job.get_tenant_id()))) {
509
    LOG_WARN("failed to init helper", K(ret));
510
  } else if (OB_FAIL(helper.get_import_table_job_by_initiator(
511
      *sql_proxy_, job.get_tenant_id(), job.get_job_id(), import_job))) {
512
    if (OB_ENTRY_NOT_EXIST == ret) {
513
      cancel_import_job_finish = true;
514
      ret = OB_SUCCESS;
515
    } else {
516
      LOG_WARN("failed to get import table job by initiator", K(ret));
517
    }
518
  }
519

520
  if (OB_SUCC(ret) && cancel_import_job_finish) {
521
    share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
522
    job.get_result().set_result(OB_CANCELED, trace_id, GCONF.self_addr_);
523
    if (OB_FAIL(try_advance_status_(job, OB_CANCELED))) {
524
      LOG_WARN("failed to advance status", K(ret));
525
    } else {
526
      LOG_INFO("[RECOVER_TABLE]cancel recover table job finish", K(job), K(import_job));
527
      ROOTSERVICE_EVENT_ADD("recover_table", "cancel recover job finish",
528
                            "tenant_id", job.get_tenant_id(),
529
                            "recover_job_id", job.get_job_id());
530
    }
531
  }
532
  return ret;
533
}
534

535
int ObRecoverTableJobScheduler::user_prepare_(share::ObRecoverTableJob &job)
536
{
537
  int ret = OB_SUCCESS;
538
  if (OB_FAIL(try_advance_status_(job, ret))) {
539
    LOG_WARN("failed to advance status", K(ret));
540
  }
541
  return ret;
542
}
543

544
int ObRecoverTableJobScheduler::restore_aux_tenant_(share::ObRecoverTableJob &job)
545
{
546
  int ret = OB_SUCCESS;
547
  ObRestorePersistHelper restore_helper;
548
  ObHisRestoreJobPersistInfo restore_history_info;
549
  bool aux_tenant_restore_finish = true;
550
  int tmp_ret = OB_SUCCESS;
551
  DEBUG_SYNC(BEFORE_RESTORE_AUX_TENANT);
552
  if (OB_FAIL(restore_helper.init(OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) {
553
    LOG_WARN("failed to init retore helper", K(ret));
554
  } else if (OB_FAIL(restore_helper.get_restore_job_history(
555
      *sql_proxy_, job.get_initiator_job_id(), job.get_initiator_tenant_id(), restore_history_info))) {
556
    if (OB_ENTRY_NOT_EXIST == ret) {
557
      aux_tenant_restore_finish = false;
558
      ret = OB_SUCCESS;
559
      if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {
560
        LOG_INFO("[RECOVER_TABLE]aux tenant restore not finish, wait later", K(job));
561
      }
562
    } else {
563
      LOG_WARN("failed to get restore job history", K(ret),
564
        "initiator_job_id", job.get_job_id(), "initiator_tenant_id", job.get_tenant_id());
565
    }
566
  } else {
567
    LOG_INFO("[RECOVER_TABLE]aux tenant restore finish", K(restore_history_info), K(job));
568
    ROOTSERVICE_EVENT_ADD("recover_table", "restore_aux_tenant_finish",
569
                          "tenant_id", job.get_tenant_id(),
570
                          "job_id", job.get_job_id(),
571
                          "aux_tenant_name", job.get_aux_tenant_name());
572
    const uint64_t aux_tenant_id = restore_history_info.restore_tenant_id_;
573
    schema::ObSchemaGetterGuard guard;
574
    schema::ObTenantStatus status;
575
    if (!restore_history_info.is_restore_success()) {
576
      ret = OB_LS_RESTORE_FAILED;  // TODO(zeyong) adjust error code to restore tenant failed later.
577
      LOG_WARN("[RECOVER_TABLE]restore aux tenant failed", K(ret), K(restore_history_info), K(job));
578
      job.get_result().set_result(false, restore_history_info.comment_);
579
    } else if (OB_FAIL(check_aux_tenant_(job, aux_tenant_id))) {
580
      LOG_WARN("failed to check aux tenant", K(ret), K(aux_tenant_id));
581
    }
582

583
    int tmp_ret = OB_SUCCESS;
584
    if (OB_SUCCESS != (tmp_ret = try_advance_status_(job, ret))) {
585
      LOG_WARN("failed to advance status", K(tmp_ret), K(ret));
586
    }
587
  }
588
  return ret;
589
}
590

591
int ObRecoverTableJobScheduler::active_aux_tenant_(share::ObRecoverTableJob &job)
592
{
593
  int ret = OB_SUCCESS;
594
  int tmp_ret = OB_SUCCESS;
595
  ObRestorePersistHelper restore_helper;
596
  ObHisRestoreJobPersistInfo restore_history_info;
597
  if (OB_FAIL(restore_helper.init(OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) {
598
    LOG_WARN("failed to init retore helper", K(ret));
599
  } else if (OB_FAIL(restore_helper.get_restore_job_history(
600
      *sql_proxy_, job.get_initiator_job_id(), job.get_initiator_tenant_id(), restore_history_info))) {
601
      LOG_WARN("failed to get restore job history", K(ret),
602
        "initiator_job_id", job.get_job_id(), "initiator_tenant_id", job.get_tenant_id());
603
  } else if (OB_FAIL(ban_multi_version_recycling_(job, restore_history_info.restore_tenant_id_))) {
604
    LOG_WARN("failed to ban multi version cecycling", K(ret));
605
  } else if (OB_FAIL(failover_to_primary_(job, restore_history_info.restore_tenant_id_))) {
606
    LOG_WARN("failed to failover to primary", K(ret), K(restore_history_info));
607
  }
608
  if (OB_FAIL(ret)) {
609
    int tmp_ret = OB_SUCCESS;
610
    schema::ObSchemaGetterGuard guard;
611
    if (OB_TMP_FAIL(ObImportTableUtil::get_tenant_schema_guard(*schema_service_,
612
                                                               restore_history_info.restore_tenant_id_,
613
                                                               guard))) {
614
      if (OB_TENANT_NOT_EXIST == tmp_ret) {
615
        ret = tmp_ret;
616
      }
617
      LOG_WARN("failed to get tenant schema guard", K(tmp_ret));
618
    }
619
  }
620

621
  if (OB_SUCC(ret) || OB_TENANT_NOT_EXIST == ret) {
622
    if (OB_SUCCESS != (tmp_ret = try_advance_status_(job, ret))) {
623
      LOG_WARN("failed to advance status", K(tmp_ret), K(ret));
624
    }
625
  }
626
  return ret;
627
}
628

629
int ObRecoverTableJobScheduler::ban_multi_version_recycling_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id)
630
{
631
  int ret = OB_SUCCESS;
632
  const int64_t tenant_id = aux_tenant_id;
633
  const int64_t MAX_UNDO_RETENTION = 31536000; // 1 year
634
  int64_t affected_row = 0;
635
  ObSqlString sql;
636
  MTL_SWITCH(OB_SYS_TENANT_ID) {
637
    if (OB_FAIL(sql.assign_fmt("alter system set undo_retention = %ld", MAX_UNDO_RETENTION))) {
638
      LOG_WARN("failed to assign fmt", K(ret));
639
    } else if (OB_FAIL(sql_proxy_->write(tenant_id, sql.ptr(), affected_row))) {
640
      LOG_WARN("failed to set undo retention", K(ret));
641
    }
642
  }
643
  return ret;
644
}
645

646
int ObRecoverTableJobScheduler::failover_to_primary_(
647
    share::ObRecoverTableJob &job, const uint64_t aux_tenant_id)
648
{
649
  int ret = OB_SUCCESS;
650
  common::ObAddr leader;
651
  obrpc::ObSwitchTenantArg switch_tenant_arg;
652
  MTL_SWITCH(OB_SYS_TENANT_ID) {
653
    if (OB_FAIL(switch_tenant_arg.init(aux_tenant_id, obrpc::ObSwitchTenantArg::OpType::FAILOVER_TO_PRIMARY, "", false))) {
654
      LOG_WARN("failed to init switch tenant arg", K(ret), K(aux_tenant_id));
655
    } else if (OB_FAIL(OB_PRIMARY_STANDBY_SERVICE.switch_tenant(switch_tenant_arg))) {
656
      LOG_WARN("failed to switch_tenant", KR(ret), K(switch_tenant_arg));
657
    } else {
658
      LOG_INFO("[RECOVER_TABLE]succeed to switch aux tenant role to primary", K(aux_tenant_id), K(job));
659
    }
660
  }
661
  return ret;
662
}
663

664
int ObRecoverTableJobScheduler::check_aux_tenant_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id)
665
{
666
  int ret = OB_SUCCESS;
667
  int tmp_ret = OB_SUCCESS;
668
  schema::ObTenantStatus status;
669
  schema::ObSchemaGetterGuard aux_tenant_guard;
670
  schema::ObSchemaGetterGuard recover_tenant_guard;
671
  bool is_compatible = true;
672
  if (OB_FAIL(schema_service_->get_tenant_schema_guard(aux_tenant_id, aux_tenant_guard))) {
673
    if (OB_TENANT_NOT_EXIST == ret) {
674
      ObImportResult::Comment comment;
675
      if (OB_TMP_FAIL(databuff_printf(comment.ptr(), comment.capacity(),
676
          "aux tenant %.*s has been dropped", job.get_aux_tenant_name().length(), job.get_aux_tenant_name().ptr()))) {
677
        LOG_WARN("failed to databuff printf", K(ret));
678
      } else {
679
        job.get_result().set_result(false, comment);
680
      }
681
    }
682
    LOG_WARN("failed to get tenant schema guard", K(ret), K(aux_tenant_id));
683
  } else if (OB_FAIL(aux_tenant_guard.get_tenant_status(aux_tenant_id, status))) {
684
    LOG_WARN("failed to get tenant status", K(ret), K(aux_tenant_id));
685
  } else if (schema::ObTenantStatus::TENANT_STATUS_NORMAL != status) {
686
    ret = OB_STATE_NOT_MATCH;
687
    LOG_WARN("aux tenant status is not normal", K(ret), K(aux_tenant_id), K(status));
688
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(job.get_tenant_id(), recover_tenant_guard))) {
689
    LOG_WARN("failed to get tenant schema guard", K(ret), "tenant_id", job.get_tenant_id());
690
  } else if (OB_FAIL(check_tenant_compatibility(aux_tenant_guard, recover_tenant_guard, is_compatible))) {
691
    LOG_WARN("failed to get check tenant compatibility", K(ret));
692
  } else if (!is_compatible) {
693
    ret = OB_NOT_SUPPORTED;
694
    LOG_WARN("recover from different compatibility tenant is not supported", K(ret));
695
    if (OB_TMP_FAIL(job.get_result().set_result(false, "recover from different compatibility tenant is not supported"))) {
696
      LOG_WARN("failed to set result", K(ret), K(tmp_ret));
697
    }
698
  } else if (OB_FAIL(check_case_sensitive_compatibility(aux_tenant_guard, recover_tenant_guard, is_compatible))) {
699
    LOG_WARN("failed to check case sensitive compatibility", K(ret));
700
  } else if (!is_compatible) {
701
    ret = OB_NOT_SUPPORTED;
702
    LOG_WARN("recover from different case sensitive compatibility tenant is not supported", K(ret));
703
    if (OB_TMP_FAIL(job.get_result().set_result(false, "recover from different case sensitive compatibility tenant is not supported"))) {
704
      LOG_WARN("failed to set result", K(ret), K(tmp_ret));
705
    }
706
  }
707
  return ret;
708
}
709

710
int ObRecoverTableJobScheduler::check_tenant_compatibility(
711
    share::schema::ObSchemaGetterGuard &aux_tenant_guard,
712
    share::schema::ObSchemaGetterGuard &recover_tenant_guard,
713
    bool &is_compatible)
714
{
715
  int ret = OB_SUCCESS;
716
  lib::Worker::CompatMode aux_compat_mode;
717
  lib::Worker::CompatMode recover_compat_mode;
718
  is_compatible = false;
719
  const uint64_t aux_tenant_id = aux_tenant_guard.get_tenant_id();
720
  const uint64_t recover_tenant_id = recover_tenant_guard.get_tenant_id();
721
  if (OB_FAIL(aux_tenant_guard.get_tenant_compat_mode(aux_tenant_id, aux_compat_mode))) {
722
    LOG_WARN("failed to get tenant compat mode", K(ret), K(aux_tenant_id));
723
  } else if (OB_FAIL(recover_tenant_guard.get_tenant_compat_mode(recover_tenant_id, recover_compat_mode))) {
724
    LOG_WARN("failed to get tenant compat mode", K(ret), K(recover_tenant_id));
725
  } else {
726
    is_compatible = aux_compat_mode == recover_compat_mode;
727
    if (!is_compatible) {
728
      LOG_WARN("[RECOVER_TABLE]tenant compat mode is different", K(is_compatible),
729
                                                                 K(aux_tenant_id),
730
                                                                 K(aux_compat_mode),
731
                                                                 K(recover_tenant_id),
732
                                                                 K(recover_compat_mode));
733
    }
734
  }
735
  return ret;
736
}
737

738
int ObRecoverTableJobScheduler::check_case_sensitive_compatibility(
739
    share::schema::ObSchemaGetterGuard &aux_tenant_guard,
740
    share::schema::ObSchemaGetterGuard &recover_tenant_guard,
741
    bool &is_compatible)
742
{
743
  int ret = OB_SUCCESS;
744
  common::ObNameCaseMode aux_mode;
745
  common::ObNameCaseMode recover_mode;
746
  is_compatible = false;
747
  const uint64_t aux_tenant_id = aux_tenant_guard.get_tenant_id();
748
  const uint64_t recover_tenant_id = recover_tenant_guard.get_tenant_id();
749
  if (OB_FAIL(aux_tenant_guard.get_tenant_name_case_mode(aux_tenant_id, aux_mode))) {
750
    LOG_WARN("failed to get tenant name case mode", K(ret), K(aux_tenant_id));
751
  } else if (OB_FAIL(recover_tenant_guard.get_tenant_name_case_mode(recover_tenant_id, recover_mode))) {
752
    LOG_WARN("failed to get tenant name case mode", K(ret), K(recover_tenant_id));
753
  } else {
754
    is_compatible = aux_mode == recover_mode;
755
    if (!is_compatible) {
756
      LOG_WARN("[RECOVER_TABLE]tenant name case mode is different", K(is_compatible),
757
                                                                    K(aux_tenant_id),
758
                                                                    K(aux_mode),
759
                                                                    K(recover_tenant_id),
760
                                                                    K(recover_mode));
761
    }
762
  }
763
  return ret;
764
}
765

766
int ObRecoverTableJobScheduler::gen_import_job_(share::ObRecoverTableJob &job)
767
{
768
  int ret = OB_SUCCESS;
769
  LOG_INFO("[RECOVER_TABLE]generate import table job", K(job));
770
  share::ObImportTableJobPersistHelper import_helper;
771
  share::ObImportTableJob import_job;
772
  import_job.set_tenant_id(job.get_tenant_id());
773
  import_job.set_job_id(job.get_job_id());
774
  import_job.set_initiator_job_id(job.get_job_id());
775
  import_job.set_initiator_tenant_id(job.get_tenant_id());
776
  import_job.set_start_ts(ObTimeUtility::current_time());
777
  import_job.set_status(ObImportTableJobStatus::INIT);
778
  int tmp_ret = OB_SUCCESS;
779
  schema::ObSchemaGetterGuard guard;
780
  uint64_t tenant_id = OB_INVALID_TENANT_ID;
781
  ObMySQLTransaction trans;
782
  int64_t job_id = 0;
783
  if (OB_FAIL(import_job.set_src_tenant_name(job.get_aux_tenant_name()))) {
784
    LOG_WARN("failed to set src tenant name", K(ret));
785
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
786
    LOG_WARN("failed to get schema guard", K(ret));
787
  } else if (OB_FAIL(guard.get_tenant_id(job.get_aux_tenant_name(), tenant_id))) {
788
    if (OB_ERR_INVALID_TENANT_NAME == ret) {
789
      ObImportResult::Comment comment;
790
      if (OB_TMP_FAIL(databuff_printf(comment.ptr(), comment.capacity(),
791
          "aux tenant %.*s has been dropped", job.get_aux_tenant_name().length(), job.get_aux_tenant_name().ptr()))) {
792
        LOG_WARN("failed to databuff printf", K(ret));
793
      } else {
794
        job.get_result().set_result(false, comment);
795
      }
796
    }
797
    LOG_WARN("failed to get tenant id", K(ret), "aux_tenant_name", job.get_aux_tenant_name());
798
  } else if (OB_FALSE_IT(import_job.set_src_tenant_id(tenant_id))) {
799
  } else if (OB_FAIL(import_job.get_import_arg().assign(job.get_import_arg()))) {
800
    LOG_WARN("failed to assign import arg", K(ret));
801
  } else if (OB_FAIL(import_helper.init(import_job.get_tenant_id()))) {
802
    LOG_WARN("failed to init import job", K(ret));
803
  } else if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(job.get_tenant_id())))) {
804
    LOG_WARN("failed to start trans", K(ret));
805
  } else if (OB_FAIL(ObLSBackupInfoOperator::get_next_job_id(trans, job.get_tenant_id(), job_id))) {
806
    LOG_WARN("failed to get next job id", K(ret));
807
  } else if (OB_FALSE_IT(import_job.set_job_id(job_id))) {
808
  } else if (OB_FAIL(import_helper.insert_import_table_job(trans, import_job))) {
809
    LOG_WARN("failed to insert into improt table job", K(ret));
810
  }
811

812
  if (trans.is_started()) {
813
    if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
814
      ret = OB_SUCC(ret) ? tmp_ret : ret;
815
      LOG_WARN("failed to commit trans", K(ret), K(tmp_ret));
816
    }
817
    if (OB_SUCC(ret)) {
818
      LOG_INFO("[RECOVER_TABLE]succeed generate import job", K(job), K(import_job));
819
      ROOTSERVICE_EVENT_ADD("recover_table", "generate_import_job",
820
                            "tenant_id", job.get_tenant_id(),
821
                            "job_id", job.get_job_id(),
822
                            "import_job_id", import_job.get_job_id());
823
    }
824
  }
825

826
  if (OB_TMP_FAIL(try_advance_status_(job, ret))) {
827
    LOG_WARN("failed to advance status", K(tmp_ret), K(ret));
828
  }
829
  return ret;
830
}
831

832
int ObRecoverTableJobScheduler::importing_(share::ObRecoverTableJob &job)
833
{
834
  int ret = OB_SUCCESS;
835
  share::ObImportTableJobPersistHelper helper;
836
  share::ObImportTableJob import_job;
837
  if (OB_FAIL(helper.init(job.get_tenant_id()))) {
838
    LOG_WARN("failed to init helper", K(ret));
839
  } else if (OB_FAIL(helper.get_import_table_job_history_by_initiator(
840
      *sql_proxy_, job.get_tenant_id(), job.get_job_id(), import_job))) {
841
    if (OB_ENTRY_NOT_EXIST == ret) {
842
      if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {
843
        LOG_INFO("[RECOVER_TABLE]import table is not finish, wait later", K(job));
844
      }
845
      ret = OB_SUCCESS;
846
    } else {
847
      LOG_WARN("failed to get recover table job histroy by initiator", K(ret));
848
    }
849
  } else if (OB_FALSE_IT(job.set_end_ts(ObTimeUtility::current_time()))) {
850
  } else if (OB_FALSE_IT(job.set_result(import_job.get_result()))) {
851
  } else if (!job.get_result().is_succeed() && OB_FALSE_IT(ret = OB_LS_RESTORE_FAILED)) {
852
  } else if (OB_FAIL(try_advance_status_(job, ret))) {
853
    LOG_WARN("failed to advance status", K(ret));
854
  } else {
855
    LOG_INFO("[RECOVER_TABLE]import table job finish", K(job), K(import_job));
856
    ROOTSERVICE_EVENT_ADD("recover_table", "import job finish",
857
                          "tenant_id", job.get_tenant_id(),
858
                          "job_id", job.get_job_id(),
859
                          "import_job_id", import_job.get_job_id());
860
  }
861
  return ret;
862
}
863

864
int ObRecoverTableJobScheduler::user_finish_(const share::ObRecoverTableJob &job)
865
{
866
  int ret = OB_SUCCESS;
867
  ObMySQLTransaction trans;
868
  const uint64_t tenant_id = job.get_tenant_id();
869
  const int64_t job_id = job.get_job_id();
870
  if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id)))) {
871
    LOG_WARN("failed to start trans", K(ret));
872
  } else if (OB_FAIL(helper_.insert_recover_table_job_history(trans, job))) {
873
    LOG_WARN("failed to insert recover table job history", K(ret), K(job));
874
  } else if (OB_FAIL(helper_.delete_recover_table_job(trans, job))) {
875
    LOG_WARN("failed to delete recover table job", K(ret), K(job));
876
  }
877

878
  if (trans.is_started()) {
879
    int tmp_ret = OB_SUCCESS;
880
    if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
881
      ret = OB_SUCC(ret) ? tmp_ret : ret;
882
      LOG_WARN("end trans failed", K(ret), K(tmp_ret));
883
    }
884
    if (OB_SUCC(ret)) {
885
      LOG_INFO("[RECOVER_TABLE] recover table finish", K(job));
886
      ROOTSERVICE_EVENT_ADD("recover_table", "recover table job finish",
887
                            "tenant_id", job.get_tenant_id(),
888
                            "job_id", job.get_job_id());
889
    }
890
  }
891
  return ret;
892
}
893

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.