oceanbase

Форк
0
/
ob_tenant_clone_util.cpp 
591 строка · 26.3 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS_RESTORE
14

15
#include "ob_tenant_clone_util.h"
16
#include "share/tenant_snapshot/ob_tenant_snapshot_table_operator.h"
17
#include "share/restore/ob_tenant_clone_table_operator.h"
18
#include "share/location_cache/ob_location_service.h"
19
#include "rootserver/tenant_snapshot/ob_tenant_snapshot_util.h"
20

21
using namespace oceanbase::rootserver;
22
using namespace oceanbase::share;
23

24
//a source tenant only has one clone_job at the same time
25
int ObTenantCloneUtil::check_source_tenant_has_clone_job(
26
    common::ObISQLClient &sql_client,
27
    const uint64_t source_tenant_id,
28
    bool &has_job)
29
{
30
  int ret = OB_SUCCESS;
31
  has_job = false;
32
  ObTenantCloneTableOperator clone_op;
33
  ObCloneJob clone_job;
34

35
  if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {
36
    LOG_WARN("fail to init clone op", KR(ret));
37
  } else if (OB_FAIL(clone_op.get_clone_job_by_source_tenant_id(
38
                            source_tenant_id, clone_job))) {
39
    if (OB_ENTRY_NOT_EXIST == ret) {
40
      ret = OB_SUCCESS;
41
    } else {
42
      LOG_WARN("fail to get job", KR(ret), K(source_tenant_id));
43
    }
44
  } else {
45
    has_job = true;
46
  }
47

48
  return ret;
49
}
50

51
int ObTenantCloneUtil::check_clone_tenant_exist(common::ObISQLClient &sql_client,
52
                                                const ObString &clone_tenant_name,
53
                                                bool &is_exist)
54
{
55
  int ret = OB_SUCCESS;
56
  is_exist = false;
57
  ObTenantCloneTableOperator clone_op;
58
  ObCloneJob clone_job;
59

60
  if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {
61
    LOG_WARN("fail to init clone op", KR(ret));
62
  } else if (OB_FAIL(clone_op.get_clone_job_by_clone_tenant_name(
63
                            clone_tenant_name, false/*need lock*/, clone_job))) {
64
    if (OB_ENTRY_NOT_EXIST == ret) {
65
      ret = OB_SUCCESS;
66
    } else {
67
      LOG_WARN("fail to get job", KR(ret), K(clone_tenant_name));
68
    }
69
  } else {
70
    is_exist = true;
71
    LOG_INFO("clone job exist", KR(ret), K(clone_tenant_name));
72
  }
73

74
  return ret;
75
}
76

77
int ObTenantCloneUtil::fill_clone_job(const int64_t job_id,
78
                                      const obrpc::ObCloneTenantArg &arg,
79
                                      const uint64_t source_tenant_id,
80
                                      const ObString &source_tenant_name,
81
                                      const ObTenantSnapItem &snapshot_item,
82
                                      ObCloneJob &clone_job)
83
{
84
  int ret = OB_SUCCESS;
85
  clone_job.reset();
86
  ObTenantCloneJobType job_type = ObTenantCloneJobType::CLONE_JOB_MAX_TYPE;
87
  common::ObCurTraceId::TraceId trace_id;
88

89
  if (OB_UNLIKELY(job_id < 0
90
                  || !arg.is_valid()
91
                  || !is_user_tenant(source_tenant_id)
92
                  || source_tenant_name.empty())) {
93
    ret = OB_INVALID_ARGUMENT;
94
    LOG_WARN("invalid argument", KR(ret), K(job_id), K(arg), K(source_tenant_id),
95
                                 K(source_tenant_name), K(snapshot_item));
96
  } else if (FALSE_IT(job_type = snapshot_item.is_valid() ?
97
                                 ObTenantCloneJobType::RESTORE :
98
                                 ObTenantCloneJobType::FORK)) {
99
  } else {
100
    ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id();
101
    if (nullptr != cur_trace_id) {
102
      trace_id = *cur_trace_id;
103
    } else {
104
      trace_id.init(GCONF.self_addr_);
105
    }
106
    const ObCloneJob::ObCloneJobInitArg init_arg = {
107
          .trace_id_                   = trace_id,
108
          .tenant_id_                  = OB_SYS_TENANT_ID,
109
          .job_id_                     = job_id,
110
          .source_tenant_id_           = source_tenant_id,
111
          .source_tenant_name_         = source_tenant_name,
112
          .clone_tenant_id_            = OB_INVALID_TENANT_ID,
113
          .clone_tenant_name_          = arg.get_new_tenant_name(),
114
          .tenant_snapshot_id_         = snapshot_item.get_tenant_snapshot_id(),
115
          .tenant_snapshot_name_       = snapshot_item.get_snapshot_name(),
116
          .resource_pool_id_           = OB_INVALID_ID,
117
          .resource_pool_name_         = arg.get_resource_pool_name(),
118
          .unit_config_name_           = arg.get_unit_config_name(),
119
          .restore_scn_                = snapshot_item.get_snapshot_scn(),
120
          .status_                     = ObTenantCloneStatus(ObTenantCloneStatus::Status::CLONE_SYS_LOCK),
121
          .job_type_                   = job_type,
122
          .ret_code_                   = OB_SUCCESS,
123
    };
124
    if (OB_FAIL(clone_job.init(init_arg))) {
125
      LOG_WARN("fail to init clone job", KR(ret), K(init_arg));
126
    }
127
  }
128

129
  return ret;
130
}
131

132
int ObTenantCloneUtil::record_clone_job(common::ObISQLClient &sql_client,
133
                                        const share::ObCloneJob &clone_job)
134
{
135
  int ret = OB_SUCCESS;
136
  ObTenantCloneTableOperator clone_op;
137
  uint64_t source_tenant_id = clone_job.get_source_tenant_id();
138
  ObString clone_tenant_name = clone_job.get_clone_tenant_name();
139
  bool has_job = false;
140
  bool tenant_exist_in_clone_job = false;
141

142
  if (OB_UNLIKELY(!clone_job.is_valid())) {
143
    ret = OB_INVALID_ARGUMENT;
144
    LOG_WARN("invalid argument", KR(ret), K(clone_job));
145
  } else if (OB_FAIL(check_source_tenant_has_clone_job(sql_client, source_tenant_id, has_job))) {
146
    LOG_WARN("fail to check source tenant has clone job", KR(ret), K(source_tenant_id));
147
  } else if (has_job) {
148
    ret = OB_OP_NOT_ALLOW;
149
    LOG_WARN("source tenant already has clone job", KR(ret), K(source_tenant_id));
150
    LOG_USER_ERROR(OB_OP_NOT_ALLOW, "source tenant has a running clone job, clone tenant now");
151
  } else if (OB_FAIL(ObTenantCloneUtil::check_clone_tenant_exist(sql_client,
152
                              clone_tenant_name, tenant_exist_in_clone_job))) {
153
    LOG_WARN("failed to check clone tenant exist in clone job", KR(ret), K(clone_tenant_name));
154
  } else if (tenant_exist_in_clone_job) {
155
    ret = OB_OP_NOT_ALLOW;
156
    LOG_WARN("duplicate clone tenant name in clone job", KR(ret), K(clone_tenant_name));
157
    LOG_USER_ERROR(OB_OP_NOT_ALLOW, "clone tenant name exists in a running clone job, clone tenant now");
158
  } else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {
159
    LOG_WARN("fail to init clone op", KR(ret));
160
  } else if (OB_FAIL(clone_op.insert_clone_job(clone_job))) {
161
    LOG_WARN("fail to insert clone job", KR(ret), K(clone_job));
162
  }
163

164
  return ret;
165
}
166

167
int ObTenantCloneUtil::update_resource_pool_id_of_clone_job(common::ObISQLClient &sql_client,
168
                                                            const int64_t job_id,
169
                                                            const uint64_t resource_pool_id)
170
{
171
  int ret = OB_SUCCESS;
172
  ObTenantCloneTableOperator clone_op;
173

174
  if (OB_UNLIKELY(job_id < 0 || OB_INVALID_ID == resource_pool_id)) {
175
    ret = OB_INVALID_ARGUMENT;
176
    LOG_WARN("invalid argument", KR(ret), K(job_id), K(resource_pool_id));
177
  } else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {
178
    LOG_WARN("fail to init clone op", KR(ret));
179
  } else if (OB_FAIL(clone_op.update_job_resource_pool_id(job_id, resource_pool_id))) {
180
    LOG_WARN("fail to update clone job resource pool id", KR(ret), K(job_id), K(resource_pool_id));
181
  }
182

183
  return ret;
184
}
185

186
int ObTenantCloneUtil::update_snapshot_info_for_fork_job(common::ObISQLClient &sql_client,
187
                                                         const int64_t job_id,
188
                                                         const ObTenantSnapshotID tenant_snapshot_id,
189
                                                         const ObString &tenant_snapshot_name)
190
{
191
  int ret = OB_SUCCESS;
192
  ObTenantCloneTableOperator clone_op;
193

194
  if (OB_UNLIKELY(job_id < 0 || !tenant_snapshot_id.is_valid() ||
195
                  tenant_snapshot_name.empty())) {
196
    ret = OB_INVALID_ARGUMENT;
197
    LOG_WARN("invalid argument", KR(ret), K(job_id), K(tenant_snapshot_id), K(tenant_snapshot_name));
198
  } else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {
199
    LOG_WARN("fail to init clone op", KR(ret));
200
  } else if (OB_FAIL(clone_op.update_job_snapshot_info(job_id,
201
                                                       tenant_snapshot_id,
202
                                                       tenant_snapshot_name))) {
203
    LOG_WARN("fail to update clone job id and name", KR(ret), K(job_id), K(tenant_snapshot_id), K(tenant_snapshot_name));
204
  }
205

206
  return ret;
207
}
208

209
int ObTenantCloneUtil::update_restore_scn_for_fork_job(common::ObISQLClient &sql_client,
210
                                                       const int64_t job_id,
211
                                                       const SCN &restore_scn)
212
{
213
  int ret = OB_SUCCESS;
214
  ObTenantCloneTableOperator clone_op;
215

216
  if (OB_UNLIKELY(OB_INVALID_ID == job_id || !restore_scn.is_valid())) {
217
    ret = OB_INVALID_ARGUMENT;
218
    LOG_WARN("invalid argument", KR(ret), K(job_id), K(restore_scn));
219
  } else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {
220
    LOG_WARN("fail to init clone op", KR(ret));
221
  } else if (OB_FAIL(clone_op.update_job_snapshot_scn(job_id,
222
                                                      restore_scn))) {
223
    LOG_WARN("fail to update clone job snapshot scn", KR(ret), K(job_id), K(restore_scn));
224
  }
225

226
  return ret;
227
}
228

229
int ObTenantCloneUtil::insert_user_tenant_clone_job(common::ObISQLClient &sql_client,
230
                                                    const ObString &clone_tenant_name,
231
                                                    const uint64_t user_tenant_id)
232
{
233
  int ret = OB_SUCCESS;
234
  if (OB_UNLIKELY(clone_tenant_name.empty() || !is_user_tenant(user_tenant_id))) {
235
    ret = OB_INVALID_ARGUMENT;
236
    LOG_WARN("invalid argument", KR(ret), K(clone_tenant_name), K(user_tenant_id));
237
  } else {
238
    ObTenantCloneTableOperator clone_op;
239
    ObCloneJob clone_job;
240
    ObCloneJob user_clone_job;
241
    if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &sql_client))) {
242
      LOG_WARN("fail init clone op", KR(ret));
243
    } else if (OB_FAIL(clone_op.get_clone_job_by_clone_tenant_name(
244
                              clone_tenant_name, false/*need_lock*/, clone_job))) {
245
      LOG_WARN("fail to get clone job", KR(ret), K(clone_tenant_name));
246
    } else if (OB_FAIL(user_clone_job.assign(clone_job))) {
247
      LOG_WARN("fail to assign clone job", KR(ret), K(clone_job));
248
    } else {
249
      user_clone_job.set_tenant_id(user_tenant_id);
250
      user_clone_job.set_clone_tenant_id(user_tenant_id);
251
      user_clone_job.set_status(ObTenantCloneStatus::Status::CLONE_USER_PREPARE);
252
      ObTenantCloneTableOperator user_clone_op;
253
      if (OB_FAIL(user_clone_op.init(user_tenant_id, &sql_client))) {
254
        LOG_WARN("fail init clone op", KR(ret), K(user_tenant_id));
255
      } else if (OB_FAIL(user_clone_op.insert_clone_job(user_clone_job))) {
256
        LOG_WARN("fail to insert clone job", KR(ret), K(user_clone_job));
257
      }
258
    }
259
  }
260
  return ret;
261
}
262

263
int ObTenantCloneUtil::recycle_clone_job(common::ObISQLClient &sql_client,
264
                                         const ObCloneJob &job)
265
{
266
  int ret = OB_SUCCESS;
267
  ObTenantCloneTableOperator clone_op;
268
  ObMySQLTransaction trans;
269
  const uint64_t tenant_id = job.get_tenant_id();
270

271
  if (OB_UNLIKELY(!job.is_valid())) {
272
    ret = OB_INVALID_ARGUMENT;
273
    LOG_WARN("invalid argument", KR(ret), K(job));
274
  } else if (OB_FAIL(trans.start(&sql_client, gen_meta_tenant_id(tenant_id)))) {
275
    LOG_WARN("fail to start trans", KR(ret), K(gen_meta_tenant_id(tenant_id)));
276
  } else if (OB_FAIL(clone_op.init(tenant_id, &trans))) {
277
    LOG_WARN("fail to init", KR(ret), K(tenant_id));
278
  } else if (OB_FAIL(clone_op.insert_clone_job_history(job))) {
279
    LOG_WARN("fail to insert clone job history", KR(ret), K(job));
280
  } else if (OB_FAIL(clone_op.remove_clone_job(job))) {
281
    LOG_WARN("fail to remove clone job", KR(ret), K(job));
282
  }
283
  if (trans.is_started()) {
284
    int tmp_ret = OB_SUCCESS;
285
    if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
286
      LOG_WARN("trans end failed", "is_commit", OB_SUCCESS == ret, KR(tmp_ret), KR(ret));
287
      ret = (OB_SUCC(ret)) ? tmp_ret : ret;
288
    }
289
  }
290

291
  return ret;
292
}
293

294
int ObTenantCloneUtil::notify_clone_scheduler(const uint64_t tenant_id)
295
{
296
  int ret = OB_SUCCESS;
297
  common::ObAddr leader_addr;
298
  obrpc::ObNotifyCloneSchedulerArg arg;
299
  arg.set_tenant_id(tenant_id);
300
  obrpc::ObNotifyCloneSchedulerResult res;
301

302
  if (OB_UNLIKELY(!is_sys_tenant(tenant_id))) {
303
    ret = OB_INVALID_ARGUMENT;
304
    LOG_WARN("invalid argument", KR(ret), K(tenant_id));
305
  } else if (OB_ISNULL(GCTX.srv_rpc_proxy_) || OB_ISNULL(GCTX.location_service_)) {
306
    ret = OB_ERR_UNEXPECTED;
307
    LOG_WARN("pointer is null", KR(ret), KP(GCTX.srv_rpc_proxy_), KP(GCTX.location_service_));
308
  } else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout(
309
                  GCONF.cluster_id, tenant_id, ObLSID(ObLSID::SYS_LS_ID), leader_addr))) {
310
    LOG_WARN("failed to get leader address", KR(ret), K(tenant_id));
311
  } else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr).by(tenant_id).notify_clone_scheduler(arg, res))) {
312
    LOG_WARN("failed to notify clone scheduler", KR(ret), K(leader_addr), K(arg));
313
  } else {
314
    int res_ret = res.get_result();
315
    if (OB_SUCCESS != res_ret) {
316
      ret = res_ret;
317
      LOG_WARN("the result of notify clone scheduler failed", KR(res_ret), K(leader_addr), K(arg));
318
    }
319
  }
320
  return ret;
321
}
322

323
int ObTenantCloneUtil::release_clone_tenant_resource_of_clone_job(const ObCloneJob &clone_job)
324
{
325
  int ret = OB_SUCCESS;
326
  const int64_t timeout = GCONF._ob_ddl_timeout;
327
  const uint64_t resource_pool_id = clone_job.get_resource_pool_id();
328
  const uint64_t clone_tenant_id = clone_job.get_clone_tenant_id();
329
  const ObString &clone_tenant_name = clone_job.get_clone_tenant_name();
330

331
  if (!clone_job.get_status().is_sys_failed_status()) {
332
    ret = OB_ERR_UNEXPECTED;
333
    LOG_WARN("try to release resource of a processing or success job", KR(ret), K(clone_job));
334
  } else if (OB_ISNULL(GCTX.rs_rpc_proxy_)) {
335
    ret = OB_ERR_UNEXPECTED;
336
    LOG_WARN("unexpected null", KR(ret));
337
  } else if (OB_INVALID_ID == resource_pool_id) {
338
    // clone tenant and resource pool have not been created
339
    // it's no need to release resource
340
  } else {
341
    if (OB_INVALID_TENANT_ID != clone_tenant_id) {
342
      obrpc::ObDropTenantArg arg;
343
      ObArenaAllocator allocator;
344
      arg.exec_tenant_id_ = clone_job.get_tenant_id();
345
      arg.if_exist_ = false;
346
      arg.delay_to_drop_ = false;
347
      arg.force_drop_ = true;
348
      arg.drop_only_in_restore_ = true;
349
      arg.tenant_id_ = clone_tenant_id;
350

351
      if (OB_FAIL(deep_copy_ob_string(allocator, clone_tenant_name, arg.tenant_name_))) {
352
        LOG_WARN("fail to assign", KR(ret), K(clone_job));
353
      } else if (OB_FAIL(GCTX.rs_rpc_proxy_->timeout(timeout).drop_tenant(arg))) {
354
        if (ret == OB_TENANT_NOT_EXIST) {
355
          ret = OB_SUCCESS;
356
        } else {
357
          LOG_WARN("fail to drop tenant", KR(ret), K(clone_job), K(arg));
358
        }
359
      }
360
      LOG_INFO("recycle clone tenant", KR(ret), K(clone_job));
361
    }
362
    if (OB_SUCC(ret)) {
363
      obrpc::ObDropResourcePoolArg arg;
364
      arg.exec_tenant_id_ = clone_job.get_tenant_id();
365
      arg.pool_id_ = resource_pool_id;
366
      arg.if_exist_ = true;
367
      if (OB_FAIL(GCTX.rs_rpc_proxy_->timeout(timeout).drop_resource_pool(arg))) {
368
        LOG_WARN("drop_resource_pool failed", KR(ret), K(clone_job));
369
      }
370
      LOG_INFO("recycle clone resource pool", KR(ret), K(clone_job));
371
    }
372
  }
373

374
  return ret;
375
}
376

377
int ObTenantCloneUtil::release_source_tenant_resource_of_clone_job(common::ObISQLClient &sql_client,
378
                                                                   const ObCloneJob &clone_job)
379
{
380
  int ret = OB_SUCCESS;
381
  int tmp_ret = OB_SUCCESS;
382
  ObMySQLTransaction trans;
383
  ObTenantSnapshotTableOperator table_op;
384
  const int64_t job_id = clone_job.get_job_id();
385
  const uint64_t source_tenant_id = clone_job.get_source_tenant_id();
386
  const ObTenantSnapshotID tenant_snapshot_id = clone_job.get_tenant_snapshot_id();
387
  const ObTenantCloneJobType job_type = clone_job.get_job_type();
388
  const ObTenantCloneStatus status = clone_job.get_status();
389
  bool is_already_unlocked = false;
390
  bool is_source_tenant_exist = true;
391
  bool need_notify_tenant_snapshot_scheduler = false;
392

393
  share::schema::ObSchemaGetterGuard schema_guard;
394

395
  if (!clone_job.get_status().is_sys_release_resource_status()) {
396
    ret = OB_ERR_UNEXPECTED;
397
    LOG_WARN("try to release resource of a processing or success job", KR(ret), K(clone_job));
398
  } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
399
    LOG_WARN("fail to get schema guard", KR(ret), K(source_tenant_id));
400
  } else if (OB_FAIL(schema_guard.check_tenant_exist(source_tenant_id, is_source_tenant_exist))) {
401
    LOG_WARN("get tenant ids failed", K(ret));
402
  } else if (OB_UNLIKELY(!is_source_tenant_exist)) {
403
    LOG_INFO("source tenant doesn't exist while release source tenant resource", KR(ret), K(source_tenant_id));
404
  } else if (OB_FAIL(schema_guard.reset())) {
405
    LOG_WARN("fail to reset schema guard", KR(ret));
406
  } else if (OB_FAIL(trans.start(&sql_client, gen_meta_tenant_id(source_tenant_id)))) {
407
    LOG_WARN("trans start failed", KR(ret), K(clone_job));
408
  } else if (OB_FAIL(table_op.init(source_tenant_id, &trans))) {
409
    LOG_WARN("failed to init table op", KR(ret), K(clone_job));
410
  } else { // is_source_tenant_exist == true
411
    // release global lock
412
    ObTenantSnapItem global_lock;
413
    if (OB_FAIL(table_op.get_tenant_snap_item(
414
                     ObTenantSnapshotID(ObTenantSnapshotTableOperator::GLOBAL_STATE_ID),
415
                     true /*for update*/,
416
                     global_lock))) {
417
      if (OB_TENANT_SNAPSHOT_NOT_EXIST == ret) {
418
        ret = OB_SUCCESS;
419
        LOG_INFO("global lock has not been created", KR(ret), K(clone_job));
420
        is_already_unlocked = true;
421
      } else {
422
        LOG_WARN("fail to get global_lock", KR(ret), K(clone_job));
423
      }
424
    } else if (ObTenantSnapStatus::CLONING != global_lock.get_status()) {
425
      is_already_unlocked = true;
426
      LOG_INFO("global lock has been released", KR(ret), K(clone_job));
427
    } else if (OB_FAIL(ObTenantSnapshotUtil::unlock_tenant_snapshot_simulated_mutex_from_clone_release_task(
428
                                                trans,
429
                                                source_tenant_id,
430
                                                job_id,
431
                                                ObTenantSnapStatus::CLONING,
432
                                                is_already_unlocked))) {
433
      LOG_WARN("fail to unlock", KR(ret), K(clone_job), K(global_lock));
434
    }
435

436
    // release snapshot
437
    // "is_already_unlocked == true" means the release trans of this clone job has been committed,
438
    // or clone job is failed at the first status (CLONE_SYS_LOCK)
439
    // thus, no need to handle with the snapshot
440
    if (OB_SUCC(ret) && !is_already_unlocked) {
441
      ObTenantSnapItem tenant_snapshot_item;
442
      ObTenantSnapStatus next_snap_status = ObTenantCloneJobType::RESTORE == job_type ?
443
                                            ObTenantSnapStatus::NORMAL :
444
                                            ObTenantSnapStatus::DELETING;
445
      if (!tenant_snapshot_id.is_valid()) {
446
        if (ObTenantCloneJobType::FORK == job_type &&
447
            !status.is_sys_valid_snapshot_status_for_fork()) {
448
          LOG_INFO("fork tenant snapshot has not been created", K(clone_job));
449
        } else {
450
          ret = OB_ERR_UNEXPECTED;
451
          LOG_WARN("tenant snapshot is invalid", KR(ret), K(clone_job));
452
        }
453
      } else if (OB_FAIL(ObTenantSnapshotUtil::get_tenant_snapshot_info(trans,
454
                                                                        source_tenant_id,
455
                                                                        tenant_snapshot_id,
456
                                                                        tenant_snapshot_item))) {
457
        if (OB_TENANT_SNAPSHOT_NOT_EXIST == ret && ObTenantCloneJobType::FORK == job_type) {
458
          // fork clone job will generate tenant_snapshot_id at first, and then create snapshot.
459
          // thus, it is possible that job has valid tenant_snapshot_id, but the snapshot doesn't exist
460
          ret = OB_SUCCESS;
461
          LOG_INFO("tenant snapshot has not been created", K(clone_job));
462
        } else {
463
          LOG_WARN("fail to get tenant snapshot", KR(ret), K(clone_job));
464
        }
465
      } else if (tenant_snapshot_item.get_status() == next_snap_status) {
466
        LOG_INFO("tenant snapshot item already in next status",
467
            K(clone_job), K(tenant_snapshot_item), K(next_snap_status));
468
      } else if (OB_FAIL(table_op.update_tenant_snap_item(tenant_snapshot_id,
469
                                                          tenant_snapshot_item.get_status(),
470
                                                          next_snap_status))) {
471
        LOG_WARN("failed to update snapshot status", KR(ret), K(clone_job));
472
      } else if (ObTenantSnapStatus::DELETING == next_snap_status) {
473
        ObTenantSnapJobItem job_item(source_tenant_id,
474
                                     tenant_snapshot_id,
475
                                     ObTenantSnapOperation::DELETE,
476
                                     clone_job.get_trace_id());
477
        if (OB_FAIL(table_op.insert_tenant_snap_job_item(job_item))) {
478
          LOG_WARN("fail to insert tenant snapshot job", KR(ret), K(job_item));
479
        } else if (OB_FAIL(ObTenantSnapshotUtil::recycle_tenant_snapshot_ls_replicas(trans, source_tenant_id,
480
                                                                              clone_job.get_tenant_snapshot_name()))) {
481
          LOG_WARN("fail to recycle tenant snapshot ls replicas", KR(ret), K(clone_job));
482
        } else {
483
          need_notify_tenant_snapshot_scheduler = true;
484
          LOG_INFO("release source tenant resource", KR(ret), K(clone_job));
485
        }
486
      }
487
    }
488
  }
489

490
  if (trans.is_started()) {
491
    if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
492
      LOG_WARN("trans end failed", "is_commit", OB_SUCCESS == ret, K(tmp_ret));
493
      ret = (OB_SUCC(ret)) ? tmp_ret : ret;
494
    }
495
  }
496

497
  if (OB_SUCC(ret) && need_notify_tenant_snapshot_scheduler) {
498
    if (OB_TMP_FAIL(ObTenantSnapshotUtil::notify_scheduler(source_tenant_id))) {
499
      LOG_WARN("notify tenant snapshot scheduler failed", KR(tmp_ret), K(clone_job));
500
    }
501
  }
502

503
  return ret;
504
}
505

506
int ObTenantCloneUtil::get_clone_job_failed_message(common::ObISQLClient &sql_client,
507
                                                    const int64_t job_id,
508
                                                    const uint64_t tenant_id,
509
                                                    ObIAllocator &allocator,
510
                                                    ObString &err_msg)
511
{
512
  int ret = OB_SUCCESS;
513
  ObTenantCloneTableOperator clone_op;
514

515
  if (OB_UNLIKELY(job_id < 0 || !is_sys_tenant(tenant_id))) {
516
    ret = OB_INVALID_ARGUMENT;
517
    LOG_WARN("invalid argument", KR(ret), K(job_id), K(tenant_id));
518
  } else if (OB_FAIL(clone_op.init(tenant_id, &sql_client))) {
519
    LOG_WARN("fail to init", KR(ret), K(tenant_id));
520
  } else if (OB_FAIL(clone_op.get_job_failed_message(job_id, allocator, err_msg))) {
521
    LOG_WARN("fail to get clone job failed message", KR(ret), K(job_id), K(tenant_id));
522
  }
523
  return ret;
524
}
525

526
//This function is called by the user executing "cancel clone" sql.
527
int ObTenantCloneUtil::cancel_clone_job(common::ObISQLClient &sql_client,
528
                                        const ObString &clone_tenant_name,
529
                                        bool &clone_already_finish)
530
{
531
  int ret = OB_SUCCESS;
532
  clone_already_finish = false;
533
  ObTenantCloneTableOperator clone_op;
534
  ObCloneJob clone_job;
535
  ObMySQLTransaction trans;
536
  ObSqlString err_msg;
537
  const ObTenantCloneStatus next_status(ObTenantCloneStatus::Status::CLONE_SYS_CANCELING);
538

539
  if (OB_UNLIKELY(clone_tenant_name.empty())) {
540
    ret = OB_INVALID_ARGUMENT;
541
    LOG_WARN("invalid argument", KR(ret), K(clone_tenant_name));
542
  } else if (OB_FAIL(trans.start(&sql_client, OB_SYS_TENANT_ID))) {
543
    LOG_WARN("failed to start trans", KR(ret));
544
  } else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &trans))) {
545
    LOG_WARN("fail init clone op", KR(ret));
546
  } else if (OB_FAIL(clone_op.get_clone_job_by_clone_tenant_name(
547
                            clone_tenant_name, true/*need_lock*/, clone_job))) {
548
    if (OB_ENTRY_NOT_EXIST != ret) {
549
      LOG_WARN("fail to get clone job", KR(ret), K(clone_tenant_name));
550
    } else {
551
      ret = OB_SUCCESS;
552
      clone_already_finish = true;
553
    }
554
  } else if (clone_job.get_status().is_user_status()) {
555
    ret = OB_ERR_UNEXPECTED;
556
    LOG_WARN("unexpected sys clone job status", KR(ret), K(clone_job));
557
  } else if (ObTenantCloneStatus::Status::CLONE_SYS_RELEASE_RESOURCE == clone_job.get_status()
558
             || !clone_job.get_status().is_sys_processing_status()) {
559
    clone_already_finish = true;
560
  } else if (OB_FAIL(clone_op.update_job_status(clone_job.get_job_id(),
561
                                                clone_job.get_status(), /*old_status*/
562
                                                next_status))) {
563
    LOG_WARN("fail to update job status", KR(ret), K(clone_tenant_name), K(clone_job));
564
  } else if (OB_FAIL(err_msg.append_fmt("clone job has been canceled in %s status",
565
                                        ObTenantCloneStatus::get_clone_status_str(clone_job.get_status())))) {
566
  } else if (OB_FAIL(clone_op.update_job_failed_info(clone_job.get_job_id(), OB_CANCELED, err_msg.string()))) {
567
    LOG_WARN("fail to update job failed info", KR(ret), K(clone_job));
568
  }
569

570
  if (trans.is_started()) {
571
    int tmp_ret = OB_SUCCESS;
572
    if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
573
      LOG_WARN("trans end failed", "is_commit", (OB_SUCCESS == ret), KR(tmp_ret));
574
      ret = (OB_SUCC(ret)) ? tmp_ret : ret;
575
    }
576
  }
577

578
  if (OB_SUCC(ret)) {
579
    LOG_INFO("[RESTORE] switch job status", KR(ret), K(clone_job), K(next_status));
580

581
    const char *prev_status_str = ObTenantCloneStatus::get_clone_status_str(clone_job.get_status());
582
    const char *cur_status_str = ObTenantCloneStatus::get_clone_status_str(next_status);
583

584
    ROOTSERVICE_EVENT_ADD("clone", "change_clone_status",
585
                          "job_id", clone_job.get_job_id(),
586
                          K(ret),
587
                          "prev_clone_status", prev_status_str,
588
                          "cur_clone_status", cur_status_str);
589
  }
590
  return ret;
591
}
592

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.