oceanbase

Форк
0
/
ob_upgrade_executor.cpp 
1404 строки · 57.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "rootserver/ob_upgrade_executor.h"
16
#include "rootserver/ob_ls_service_helper.h"
17
#include "rootserver/tenant_snapshot/ob_tenant_snapshot_util.h" //ObTenantSnapshotUtil
18
#include "observer/ob_server_struct.h"
19
#include "share/ob_global_stat_proxy.h"
20
#include "share/ob_cluster_event_history_table_operator.h"//CLUSTER_EVENT_INSTANCE
21
#include "share/ob_primary_standby_service.h" // ObPrimaryStandbyService
22
#include "share/ob_tenant_info_proxy.h" //ObAllTenantInfoProxy
23
#include "observer/ob_service.h"
24

25
namespace oceanbase
26
{
27
using namespace common;
28
using namespace common::sqlclient;
29
using namespace share;
30
using namespace share::schema;
31

32
namespace rootserver
33
{
34

35
int64_t ObUpgradeTask::get_deep_copy_size() const
36
{
37
  return sizeof(*this);
38
}
39

40
ObAsyncTask *ObUpgradeTask::deep_copy(char *buf, const int64_t buf_size) const
41
{
42
  ObAsyncTask *task = NULL;
43
  int ret = OB_SUCCESS;
44
  const int64_t need_size = get_deep_copy_size();
45
  if (NULL == buf) {
46
    ret = OB_INVALID_ARGUMENT;
47
    LOG_WARN("buf is null", KR(ret));
48
  } else if (buf_size < need_size) {
49
    ret = OB_INVALID_ARGUMENT;
50
    LOG_WARN("buf is not long enough", K(need_size), K(buf_size), KR(ret));
51
  } else {
52
    task = new(buf) ObUpgradeTask(*upgrade_executor_);
53
    if (OB_FAIL(static_cast<ObUpgradeTask *>(task)->init(arg_))) {
54
      LOG_WARN("fail to init task", KR(ret), K_(arg));
55
    }
56
  }
57
  return task;
58
}
59

60
int ObUpgradeTask::init(const obrpc::ObUpgradeJobArg &arg)
61
{
62
  int ret = OB_SUCCESS;
63
  if (OB_FAIL(arg_.assign(arg))) {
64
    LOG_WARN("fail to assign arg", KR(ret));
65
  }
66
  return ret;
67
}
68

69
int ObUpgradeTask::process()
70
{
71
  const int64_t start = ObTimeUtility::current_time();
72
  FLOG_INFO("[UPGRADE] start to do execute upgrade task", K(start), K_(arg));
73
  int ret = OB_SUCCESS;
74
  if (OB_ISNULL(upgrade_executor_)) {
75
    ret = OB_ERR_UNEXPECTED;
76
    LOG_WARN("upgrade_executor_ is null", KR(ret));
77
  } else if (OB_FAIL(upgrade_executor_->execute(arg_))) {
78
    LOG_WARN("fail to execute upgrade task", KR(ret), K_(arg));
79
  }
80
  FLOG_INFO("[UPGRADE] finish execute upgrade task",
81
            KR(ret), K_(arg), "cost_us", ObTimeUtility::current_time() - start);
82
  return ret;
83
}
84

85
ObUpgradeExecutor::ObUpgradeExecutor()
86
    : inited_(false), stopped_(false), execute_(false), rwlock_(ObLatchIds::DEFAULT_SPIN_RWLOCK),
87
      sql_proxy_(NULL), rpc_proxy_(NULL), common_rpc_proxy_(NULL),
88
      schema_service_(NULL), root_inspection_(NULL),
89
      upgrade_processors_()
90
{}
91

92
int ObUpgradeExecutor::init(
93
    share::schema::ObMultiVersionSchemaService &schema_service,
94
    rootserver::ObRootInspection &root_inspection,
95
    common::ObMySQLProxy &sql_proxy,
96
    obrpc::ObSrvRpcProxy &rpc_proxy,
97
    obrpc::ObCommonRpcProxy &common_proxy)
98
{
99
  int ret = OB_SUCCESS;
100
  if (inited_) {
101
    ret = OB_INIT_TWICE;
102
    LOG_WARN("can't init twice", KR(ret));
103
  } else if (OB_FAIL(upgrade_processors_.init(
104
                     ObBaseUpgradeProcessor::UPGRADE_MODE_OB,
105
                     sql_proxy, rpc_proxy, common_proxy, schema_service, *this))) {
106
    LOG_WARN("fail to init upgrade processors", KR(ret));
107
  } else {
108
    schema_service_ = &schema_service;
109
    root_inspection_ = &root_inspection;
110
    sql_proxy_ = &sql_proxy;
111
    rpc_proxy_ = &rpc_proxy;
112
    common_rpc_proxy_ = &common_proxy;
113
    stopped_ = false;
114
    execute_ = false;
115
    inited_ = true;
116
  }
117
  return ret;
118
}
119

120
void ObUpgradeExecutor::start()
121
{
122
  SpinWLockGuard guard(rwlock_);
123
  stopped_ = false;
124
}
125

126
int ObUpgradeExecutor::stop()
127
{
128
  int ret = OB_SUCCESS;
129
  const uint64_t WAIT_US = 100 * 1000L; //100ms
130
  const uint64_t MAX_WAIT_US = 10 * 1000 * 1000L; //10s
131
  const int64_t start = ObTimeUtility::current_time();
132
  {
133
    SpinWLockGuard guard(rwlock_);
134
    stopped_ = true;
135
  }
136
  while (OB_SUCC(ret)) {
137
    if (ObTimeUtility::current_time() - start > MAX_WAIT_US) {
138
      ret = OB_TIMEOUT;
139
      LOG_WARN("use too much time", KR(ret), "cost_us", ObTimeUtility::current_time() - start);
140
    } else if (!check_execute()) {
141
      break;
142
    } else {
143
      ob_usleep(WAIT_US);
144
    }
145
  }
146
  return ret;
147
}
148

149
int ObUpgradeExecutor::check_stop() const
150
{
151
  int ret = OB_SUCCESS;
152
  SpinRLockGuard guard(rwlock_);
153
  if (OB_FAIL(check_inner_stat_())) {
154
    LOG_WARN("fail to check inner stat", KR(ret));
155
  } else if (stopped_) {
156
    ret = OB_CANCELED;
157
    LOG_WARN("executor should stopped", KR(ret));
158
  }
159
  return ret;
160
}
161

162
bool ObUpgradeExecutor::check_execute() const
163
{
164
  SpinRLockGuard guard(rwlock_);
165
  bool bret = execute_;
166
  return bret;
167
}
168

169
int ObUpgradeExecutor::set_execute_mark_()
170
{
171
  int ret = OB_SUCCESS;
172
  SpinWLockGuard guard(rwlock_);
173
  if (OB_FAIL(check_inner_stat_())) {
174
    LOG_WARN("fail to check inner stat", KR(ret));
175
  } else if (stopped_ || execute_) {
176
    ret = OB_OP_NOT_ALLOW;
177
    LOG_WARN("can't run job at the same time", KR(ret));
178
  } else {
179
    execute_ = true;
180
  }
181
  return ret;
182
}
183

184
int ObUpgradeExecutor::can_execute()
185
{
186
  int ret = OB_SUCCESS;
187
  SpinWLockGuard guard(rwlock_);
188
  if (OB_FAIL(check_inner_stat_())) {
189
    LOG_WARN("fail to check inner stat", KR(ret));
190
  } else if (stopped_ || execute_) {
191
    ret = OB_OP_NOT_ALLOW;
192
    LOG_WARN("status not matched", KR(ret),
193
             "stopped", stopped_ ? "true" : "false",
194
             "build", execute_ ? "true" : "false");
195
  }
196
  return ret;
197
}
198

199
int ObUpgradeExecutor::check_inner_stat_() const
200
{
201
  int ret = OB_SUCCESS;
202
  if (!inited_) {
203
    ret = OB_NOT_INIT;
204
    LOG_WARN("not inited", KR(ret));
205
  } else if (OB_ISNULL(schema_service_)
206
             || OB_ISNULL(root_inspection_)
207
             || OB_ISNULL(sql_proxy_)
208
             || OB_ISNULL(rpc_proxy_)
209
             || OB_ISNULL(common_rpc_proxy_)){
210
    ret = OB_ERR_UNEXPECTED;
211
    LOG_WARN("ptr is null", KR(ret), KP_(schema_service), KP_(root_inspection),
212
             KP_(sql_proxy), KP_(rpc_proxy), KP_(common_rpc_proxy));
213
  }
214
  return ret;
215
}
216

217
// wait schema sync in cluster
218
int ObUpgradeExecutor::check_schema_sync_(const uint64_t tenant_id)
219
{
220
  const int64_t start = ObTimeUtility::current_time();
221
  LOG_INFO("[UPGRADE] start to check schema sync", K(tenant_id), K(start));
222
  int ret = OB_SUCCESS;
223
  if (OB_FAIL(check_inner_stat_())) {
224
    LOG_WARN("fail to check inner stat", KR(ret));
225
  } else {
226
    const int64_t WAIT_US = 1000 * 1000L; // 1 second
227
    bool is_sync = false;
228
    while (OB_SUCC(ret)) {
229
      if (OB_FAIL(check_stop())) {
230
        LOG_WARN("executor is stop", KR(ret));
231
      } else if (OB_FAIL(ObUpgradeUtils::check_schema_sync(tenant_id, is_sync))) {
232
        LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
233
      } else if (is_sync) {
234
        break;
235
      } else {
236
        LOG_INFO("schema not sync, should wait", KR(ret), K(tenant_id));
237
        ob_usleep(static_cast<useconds_t>((WAIT_US)));
238
      }
239
    }
240
  }
241
  LOG_INFO("[UPGRADE] check schema sync finish", KR(ret), K(tenant_id),
242
           "cost_us", ObTimeUtility::current_time() - start);
243
  return ret;
244
}
245

246
// Ensure primary cluster's schema_version is not greator than standby clusters'.
247
int ObUpgradeExecutor::check_schema_sync_(
248
    obrpc::ObTenantSchemaVersions &primary_schema_versions,
249
    obrpc::ObTenantSchemaVersions &standby_schema_versions,
250
    bool &schema_sync)
251
{
252
  int ret = OB_SUCCESS;
253
  int64_t primary_cnt = primary_schema_versions.tenant_schema_versions_.count();
254
  int64_t standby_cnt = standby_schema_versions.tenant_schema_versions_.count();
255
  if (primary_cnt <= 0 || standby_cnt <= 0) {
256
    ret = OB_INVALID_ARGUMENT;
257
    LOG_WARN("invalid cnt", KR(ret));
258
  } else if (OB_FAIL(check_stop())) {
259
    LOG_WARN("executor should stopped", KR(ret));
260
  } else {
261
    schema_sync = true;
262
    for (int64_t i = 0; schema_sync && OB_SUCC(ret) && i < primary_cnt; i++) {
263
      bool find = false;
264
      TenantIdAndSchemaVersion &primary = primary_schema_versions.tenant_schema_versions_.at(i);
265
      // check normal tenant only
266
      if (OB_SYS_TENANT_ID == primary.tenant_id_) {
267
        continue;
268
      } else {
269
        for (int64_t j = 0; !find && OB_SUCC(ret) && j < standby_cnt; j++) {
270
          TenantIdAndSchemaVersion &standby = standby_schema_versions.tenant_schema_versions_.at(j);
271
          if (OB_FAIL(check_stop())) {
272
            LOG_WARN("executor should stopped", KR(ret));
273
          } else if (primary.tenant_id_ == standby.tenant_id_) {
274
            find = true;
275
            schema_sync = (primary.schema_version_ <= standby.schema_version_);
276
            LOG_INFO("check if tenant schema is sync",
277
                     KR(ret), K(primary), K(standby), K(schema_sync));
278
          }
279
        }
280
        if (OB_SUCC(ret) && !find) {
281
          schema_sync = false;
282
        }
283
      }
284
    }
285
  }
286
  return ret;
287
}
288

289
//TODO:
290
//1. Run upgrade job by tenant.
291
//2. Check tenant role/tenant status before run upgrade job.
292
int ObUpgradeExecutor::execute(
293
    const obrpc::ObUpgradeJobArg &arg)
294
{
295
  ObCurTraceId::init(GCONF.self_addr_);
296
  int ret = OB_SUCCESS;
297
  ObArray<uint64_t> tenant_ids;
298
  obrpc::ObUpgradeJobArg::Action action = arg.action_;
299
  int64_t version = arg.version_;
300
  ObRsJobType job_type = convert_to_job_type_(arg.action_);
301
  if (OB_FAIL(check_inner_stat_())) {
302
    LOG_WARN("fail to check inner stat", KR(ret));
303
  } else if (JOB_TYPE_INVALID == job_type) {
304
    ret = OB_INVALID_ARGUMENT;
305
    LOG_WARN("invalid job type", KR(ret), K(arg));
306
  } else if (version > 0 && !ObUpgradeChecker::check_data_version_exist(version)) {
307
    ret = OB_NOT_SUPPORTED;
308
    LOG_WARN("unsupported version to run upgrade job", KR(ret), K(arg));
309
  } else if (OB_FAIL(construct_tenant_ids_(arg.tenant_ids_, tenant_ids))) {
310
    LOG_WARN("fail to construct tenant_ids", KR(ret), K(arg));
311
  } else if (OB_FAIL(set_execute_mark_())) {
312
    LOG_WARN("fail to set execute mark", KR(ret));
313
    // NOTICE: don't add any `else if` after set_execute_mark_().
314
  } else {
315
    const uint64_t tenant_id = (1 == tenant_ids.count()) ?  tenant_ids.at(0) : 0;
316
    const int64_t BUF_LEN = common::MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH;
317
    char extra_buf[BUF_LEN] = {'\0'};
318
    int64_t job_id = OB_INVALID_ID;
319
    uint64_t current_data_version = 0;
320
    if (0 != tenant_id && OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, current_data_version))) {
321
      LOG_WARN("fail to get min data version", KR(ret), K(tenant_id));
322
    } else if (OB_FAIL(fill_extra_info_(tenant_id, version,
323
               current_data_version, BUF_LEN, extra_buf))) {
324
      LOG_WARN("fail to fill extra info", KR(ret),
325
               K(tenant_id), K(version), K(current_data_version));
326
    } else if (OB_FAIL(RS_JOB_CREATE_WITH_RET(
327
               job_id, job_type, *sql_proxy_, "tenant_id", tenant_id,
328
               "extra_info", ObHexEscapeSqlStr(ObString(strlen(extra_buf), extra_buf))))) {
329
      LOG_WARN("fail to create rs job", KR(ret));
330
    } else if (job_id <= 0) {
331
      ret = OB_ERR_UNEXPECTED;
332
      LOG_WARN("job_id is invalid", KR(ret), K(job_id));
333
    } else {
334
      switch (action) {
335
        case obrpc::ObUpgradeJobArg::UPGRADE_POST_ACTION: {
336
          if (OB_FAIL(run_upgrade_post_job_(tenant_ids, version))) {
337
            LOG_WARN("fail to run upgrade post job", KR(ret), K(version));
338
          }
339
          break;
340
        }
341
        case obrpc::ObUpgradeJobArg::UPGRADE_BEGIN: {
342
          if (OB_FAIL(run_upgrade_begin_action_(tenant_ids))) {
343
            LOG_WARN("fail to run upgrade begin job", KR(ret));
344
          }
345
          break;
346
        }
347
        case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE: {
348
          if (OB_FAIL(run_upgrade_system_variable_job_(tenant_ids))) {
349
            LOG_WARN("fail to run upgrade system variable job", KR(ret));
350
          }
351
          break;
352
        }
353
        case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE: {
354
          if (OB_FAIL(run_upgrade_system_table_job_(tenant_ids))) {
355
            LOG_WARN("fail to run upgrade system table job", KR(ret));
356
          }
357
          break;
358
        }
359
        case obrpc::ObUpgradeJobArg::UPGRADE_VIRTUAL_SCHEMA: {
360
          if (OB_FAIL(run_upgrade_virtual_schema_job_(tenant_ids))) {
361
            LOG_WARN("fail to run upgrade virtual schema job", KR(ret));
362
          }
363
          break;
364
        }
365
        case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_PACKAGE: {
366
          if (OB_FAIL(run_upgrade_system_package_job_())) {
367
            LOG_WARN("fail to run upgrade system package job", KR(ret));
368
          }
369
          break;
370
        }
371
        case obrpc::ObUpgradeJobArg::UPGRADE_ALL_POST_ACTION: {
372
          if (OB_FAIL(run_upgrade_all_post_action_(tenant_ids))) {
373
            LOG_WARN("fail to run upgrade all post action", KR(ret));
374
          }
375
          break;
376
        }
377
        case obrpc::ObUpgradeJobArg::UPGRADE_INSPECTION: {
378
          if (OB_FAIL(run_upgrade_inspection_job_(tenant_ids))) {
379
            LOG_WARN("fail to run upgrade inspection job", KR(ret));
380
          }
381
          break;
382
        }
383
        case obrpc::ObUpgradeJobArg::UPGRADE_END: {
384
          if (OB_FAIL(run_upgrade_end_action_(tenant_ids))) {
385
            LOG_WARN("fail to run upgrade end job", KR(ret));
386
          }
387
          break;
388
        }
389
        case obrpc::ObUpgradeJobArg::UPGRADE_ALL: {
390
          if (OB_FAIL(run_upgrade_all_(tenant_ids))) {
391
            LOG_WARN("fail to run upgrade all action", KR(ret));
392
          }
393
          break;
394
        }
395
        default: {
396
          ret = OB_NOT_SUPPORTED;
397
          LOG_WARN("not support upgrade job type", KR(ret), K(action));
398
          break;
399
        }
400
      }
401
    }
402

403
    if (OB_SUCC(ret)) {
404
      const int64_t BUF_LEN = OB_SERVER_VERSION_LENGTH;
405
      char min_cluster_version_str[BUF_LEN] = {'\0'};
406
      const uint64_t min_cluster_version = GET_MIN_CLUSTER_VERSION();
407
      char targe_data_version_str[BUF_LEN] = {'\0'};
408
      const uint64_t target_data_version = DATA_CURRENT_VERSION;
409
      share::ObServerInfoInTable::ObBuildVersion build_version;
410
      if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
411
          min_cluster_version_str, BUF_LEN, min_cluster_version)) {
412
         ret = OB_SIZE_OVERFLOW;
413
         LOG_WARN("fail to print version str", KR(ret), K(min_cluster_version));
414
      } else if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
415
                 targe_data_version_str, BUF_LEN, target_data_version)) {
416
         ret = OB_SIZE_OVERFLOW;
417
         LOG_WARN("fail to print version str", KR(ret), K(target_data_version));
418
      } else if (OB_FAIL(observer::ObService::get_build_version(build_version))) {
419
        LOG_WARN("fail to get build version", KR(ret));
420
      } else if (0 != tenant_id) {
421
        char current_data_version_str[BUF_LEN] = {'\0'};
422
        if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
423
            current_data_version_str, BUF_LEN, current_data_version)) {
424
           ret = OB_SIZE_OVERFLOW;
425
           LOG_WARN("fail to print version str", KR(ret), K(current_data_version));
426
        }
427
        CLUSTER_EVENT_SYNC_ADD("UPGRADE",
428
                               ObRsJobTableOperator::get_job_type_str(job_type),
429
                               "cluster_version", min_cluster_version_str,
430
                               "build_version", build_version.ptr(),
431
                               "target_data_version", targe_data_version_str,
432
                               "current_data_version", current_data_version_str,
433
                               "tenant_id", tenant_id)
434
      } else {
435
        CLUSTER_EVENT_SYNC_ADD("UPGRADE",
436
                               ObRsJobTableOperator::get_job_type_str(job_type),
437
                               "cluster_version", min_cluster_version_str,
438
                               "build_version", build_version.ptr(),
439
                               "target_data_version", targe_data_version_str);
440
      }
441
    }
442

443
    if (job_id > 0) {
444
      int tmp_ret = OB_SUCCESS;
445
      if (OB_SUCCESS != (tmp_ret = RS_JOB_COMPLETE(job_id, ret, *sql_proxy_))) {
446
        LOG_ERROR("fail to complete job", K(tmp_ret), KR(ret), K(job_id));
447
        ret = OB_FAIL(ret) ? ret : tmp_ret;
448
      }
449
    }
450
    execute_ = false;
451
  }
452
  return ret;
453
}
454

455
int ObUpgradeExecutor::fill_extra_info_(
456
    const uint64_t tenant_id,
457
    const int64_t specified_version,
458
    const uint64_t current_data_version,
459
    const int64_t buf_len,
460
    char *buf)
461
{
462
  int ret = OB_SUCCESS;
463
  int64_t len = 0;
464
  const int64_t VERSION_LEN = common::OB_CLUSTER_VERSION_LENGTH;
465
  char version_buf[VERSION_LEN] = {'\0'};
466
  int64_t version_len = 0;
467
  if (specified_version > 0) {
468
    if (OB_INVALID_INDEX == (version_len = ObClusterVersion::print_version_str(
469
        version_buf, VERSION_LEN, static_cast<uint64_t>(specified_version)))) {
470
      ret = OB_SIZE_OVERFLOW;
471
      LOG_WARN("fail to print version", KR(ret), K(specified_version));
472
    } else if (OB_FAIL(databuff_printf(buf, buf_len, len,
473
               "SPECIFIED_DATA_VERSION: '%s'", version_buf))) {
474
      LOG_WARN("fail to print string", KR(ret), K(len));
475
    }
476
  } else {
477
    if (OB_SUCC(ret)) {
478
      uint64_t target_data_version = DATA_CURRENT_VERSION;
479
      if (OB_INVALID_INDEX == (version_len = ObClusterVersion::print_version_str(
480
          version_buf, VERSION_LEN, target_data_version))) {
481
        ret = OB_SIZE_OVERFLOW;
482
        LOG_WARN("fail to print version", KR(ret), K(target_data_version));
483
      } else if (OB_FAIL(databuff_printf(buf, buf_len, len,
484
                 "TARGET_DATA_VERSION: '%s'", version_buf))) {
485
        LOG_WARN("fail to print string", KR(ret), K(len));
486
      }
487
    }
488
    if (OB_FAIL(ret)) {
489
    } else if (0 != tenant_id) {
490
      // record current data version when upgrade single tenant
491
      if (OB_UNLIKELY(len < 1)) {
492
        ret = OB_ERR_UNEXPECTED;
493
        LOG_WARN("str should not be empty", KR(ret), K(len));
494
      } else if (OB_INVALID_INDEX == (version_len = ObClusterVersion::print_version_str(
495
          version_buf, VERSION_LEN, current_data_version))) {
496
        ret = OB_SIZE_OVERFLOW;
497
        LOG_WARN("fail to print version", KR(ret), K(current_data_version));
498
      } else if (OB_FAIL(databuff_printf(buf, buf_len, len,
499
                 ", CURRENT_DATA_VERSION: '%s'", version_buf))) {
500
        LOG_WARN("fail to print string", KR(ret), K(len));
501
      }
502
    }
503
  }
504
  return ret;
505
}
506

507
// Python upgrade script may set enable_ddl = false before it run upgrade job.
508
// this function won't raise current_data_version
509
int ObUpgradeExecutor::run_upgrade_post_job_(
510
    const common::ObIArray<uint64_t> &tenant_ids,
511
    const int64_t version)
512
{
513
  int ret = OB_SUCCESS;
514
  if (OB_FAIL(check_inner_stat_())) {
515
    LOG_WARN("fail to check inner stat", KR(ret));
516
  } else if (OB_FAIL(check_stop())) {
517
    LOG_WARN("executor should stopped", KR(ret));
518
  } else if (!ObUpgradeChecker::check_data_version_exist(version)) {
519
    ret = OB_NOT_SUPPORTED;
520
    LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version));
521
  } else {
522
    ObBaseUpgradeProcessor *processor = NULL;
523
    int64_t backup_ret = OB_SUCCESS;
524
    int tmp_ret = OB_SUCCESS;
525
    if (OB_FAIL(upgrade_processors_.get_processor_by_version(
526
                       version, processor))) {
527
      LOG_WARN("fail to get processor by version", KR(ret), K(version));
528
    } else {
529
      for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
530
        const uint64_t tenant_id = tenant_ids.at(i);
531
        int64_t start_ts = ObTimeUtility::current_time();
532
        int64_t current_version = processor->get_version();
533
        processor->set_tenant_id(tenant_id);
534
        FLOG_INFO("[UPGRADE] start to run post upgrade job by version",
535
                  K(tenant_id), K(current_version));
536
        if (OB_FAIL(check_stop())) {
537
          LOG_WARN("executor should stopped", KR(ret));
538
        } else if (OB_TMP_FAIL(processor->post_upgrade())) {
539
          LOG_WARN("run post upgrade by version failed",
540
                   KR(tmp_ret), K(tenant_id), K(current_version));
541
          backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
542
        }
543
        FLOG_INFO("[UPGRADE] finish post upgrade job by version",
544
                  KR(tmp_ret), K(tenant_id), K(current_version),
545
                  "cost", ObTimeUtility::current_time() - start_ts);
546
      } // end for
547
    }
548
    ret = OB_SUCC(ret) ? backup_ret : ret;
549
  }
550
  return ret;
551
}
552

553
int ObUpgradeExecutor::run_upgrade_begin_action_(
554
    const common::ObIArray<uint64_t> &tenant_ids)
555
{
556
  int ret = OB_SUCCESS;
557
  common::hash::ObHashMap<uint64_t, share::SCN> tenants_sys_ls_target_scn;
558
  lib::ObMemAttr attr(OB_SYS_TENANT_ID, "UPGRADE");
559
  const int BUCKET_NUM  = hash::cal_next_prime(tenant_ids.count());
560
  if (OB_FAIL(check_inner_stat_())) {
561
    LOG_WARN("fail to check inner stat", KR(ret));
562
  } else if (OB_FAIL(check_stop())) {
563
    LOG_WARN("executor should stopped", KR(ret));
564
  } else if (OB_FAIL(tenants_sys_ls_target_scn.create(BUCKET_NUM, attr))) {
565
    LOG_WARN("fail to create tenants_sys_ls_target_scn", KR(ret));
566
  } else {
567
    int64_t backup_ret = OB_SUCCESS;
568
    int tmp_ret = OB_SUCCESS;
569
    tenants_sys_ls_target_scn.clear();
570
    for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
571
      const uint64_t tenant_id = tenant_ids.at(i);
572
      int64_t start_ts = ObTimeUtility::current_time();
573
      FLOG_INFO("[UPGRADE] start to run upgrade begin action", K(tenant_id));
574
      if (OB_FAIL(check_stop())) {
575
        LOG_WARN("executor should stopped", KR(ret));
576
      } else if (OB_TMP_FAIL(run_upgrade_begin_action_(tenant_id, tenants_sys_ls_target_scn))) {
577
        LOG_WARN("fail to upgrade begin action", KR(ret), K(tenant_id));
578
        backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
579
      }
580
      FLOG_INFO("[UPGRADE] finish run upgrade begin action step 1/2, write upgrade barrier log",
581
                KR(ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
582
    } // end for
583
    ret = OB_SUCC(ret) ? backup_ret : ret;
584
    if (OB_SUCC(ret)) {
585
      int64_t start_ts_step2 = ObTimeUtility::current_time();
586
      ret = ObLSServiceHelper::wait_all_tenants_user_ls_sync_scn(tenants_sys_ls_target_scn);
587
      FLOG_INFO("[UPGRADE] finish run upgrade begin action step 2/2, wait all tenants' sync_scn",
588
          KR(ret), "cost", ObTimeUtility::current_time() - start_ts_step2);
589
    }
590
  }
591
  return ret;
592
}
593

594
int ObUpgradeExecutor::run_upgrade_begin_action_(
595
    const uint64_t tenant_id,
596
    common::hash::ObHashMap<uint64_t, share::SCN> &tenants_sys_ls_target_scn)
597
{
598
  int ret = OB_SUCCESS;
599
  ObMySQLTransaction trans;
600
  share::SCN sys_ls_target_scn = SCN::invalid_scn();
601
  ObConflictCaseWithClone case_to_check(ObConflictCaseWithClone::UPGRADE);
602
  if (OB_FAIL(check_inner_stat_())) {
603
    LOG_WARN("fail to check inner stat", KR(ret));
604
  } else if (OB_FAIL(check_stop())) {
605
    LOG_WARN("executor should stopped", KR(ret));
606
  } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id))) {
607
    LOG_WARN("fail to start trans", KR(ret), K(tenant_id));
608
  } else {
609
    ObGlobalStatProxy proxy(trans, tenant_id);
610
    // get target_data_version
611
    uint64_t target_data_version = 0;
612
    const uint64_t DEFAULT_DATA_VERSION = DATA_VERSION_4_0_0_0;
613
    bool for_update = true;
614
    if (OB_FAIL(proxy.get_target_data_version(for_update, target_data_version))) {
615
      if (OB_ERR_NULL_VALUE == ret
616
          && GET_MIN_CLUSTER_VERSION() <= CLUSTER_VERSION_4_1_0_0) {
617
        // 4.0 -> 4.1
618
        uint64_t current_data_version = 0;
619
        ret = proxy.get_current_data_version(current_data_version);
620
        if (OB_ERR_NULL_VALUE != ret) {
621
          ret = OB_SUCC(ret) ? OB_ERR_UNEXPECTED : ret;
622
          LOG_WARN("current data version should be not exist",
623
                   KR(ret), K(tenant_id), K(current_data_version));
624
        } else if (OB_FAIL(proxy.update_current_data_version(DEFAULT_DATA_VERSION))) {
625
          // overwrite ret
626
          LOG_WARN("fail to init current data version",
627
                   KR(ret), K(tenant_id), K(DEFAULT_DATA_VERSION));
628
        } else {
629
          target_data_version = DEFAULT_DATA_VERSION;
630
          LOG_INFO("[UPGRADE] init missing current data version",
631
                   KR(ret), K(tenant_id), K(DEFAULT_DATA_VERSION));
632
        }
633
      } else {
634
        LOG_WARN("fail to get target data version", KR(ret), K(tenant_id));
635
      }
636
    }
637
    // check tenant not in cloning procedure in trans
638
    if (OB_FAIL(ret)) {
639
    } else if (OB_FAIL(ObTenantSnapshotUtil::check_tenant_not_in_cloning_procedure(tenant_id, case_to_check))) {
640
      LOG_WARN("fail to check whether tenant is in cloning produre", KR(ret), K(tenant_id));
641
    }
642
    // try update target_data_version
643
    if (OB_FAIL(ret)) {
644
    } else if (target_data_version >= DATA_CURRENT_VERSION) {
645
      LOG_INFO("[UPGRADE] target data version is new enough, just skip",
646
               KR(ret), K(tenant_id), K(target_data_version));
647
    } else if (OB_FAIL(proxy.update_target_data_version(DATA_CURRENT_VERSION))) {
648
      LOG_WARN("fail to update target data version",
649
               KR(ret), K(tenant_id), "version", DATA_CURRENT_VERSION);
650
    } else if (is_user_tenant(tenant_id)
651
               && OB_FAIL(OB_PRIMARY_STANDBY_SERVICE.write_upgrade_barrier_log(
652
                                                     trans, tenant_id, DATA_CURRENT_VERSION))) {
653
      LOG_WARN("fail to write_upgrade_barrier_log",
654
               KR(ret), K(tenant_id), "version", DATA_CURRENT_VERSION);
655
    } else {
656
      LOG_INFO("[UPGRADE] update target data version",
657
               KR(ret), K(tenant_id), "version", DATA_CURRENT_VERSION);
658
    }
659
  }
660
  if (trans.is_started()) {
661
    int tmp_ret = OB_SUCCESS;
662
    if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
663
      LOG_WARN("trans end failed", KR(tmp_ret), K(ret));
664
      ret = OB_SUCC(ret) ? tmp_ret : ret;
665
    }
666
  }
667
  if (OB_FAIL(ret)) {
668
  } else if (!is_user_tenant(tenant_id)) {
669
    // skip
670
  } else if (OB_FAIL(ObGlobalStatProxy::get_target_data_version_ora_rowscn(tenant_id, sys_ls_target_scn))) {
671
    LOG_WARN("fail to get sys_ls_target_scn", KR(ret), K(tenant_id));
672
  } else if (OB_FAIL(tenants_sys_ls_target_scn.set_refactored(
673
      tenant_id,
674
      sys_ls_target_scn,
675
      0 /* flag:  0 shows that not cover existing object. */))) {
676
    LOG_WARN("fail to push an element into tenants_sys_ls_target_scn", KR(ret), K(tenant_id),
677
        K(sys_ls_target_scn));
678
  }
679
  return ret;
680
}
681

682
int ObUpgradeExecutor::run_upgrade_system_variable_job_(
683
    const common::ObIArray<uint64_t> &tenant_ids)
684
{
685
  int ret = OB_SUCCESS;
686
  if (OB_FAIL(check_inner_stat_())) {
687
    LOG_WARN("fail to check inner stat", KR(ret));
688
  } else if (OB_FAIL(check_stop())) {
689
    LOG_WARN("executor should stopped", KR(ret));
690
  } else {
691
    int backup_ret = OB_SUCCESS;
692
    int tmp_ret = OB_SUCCESS;
693
    for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
694
      const uint64_t tenant_id = tenant_ids.at(i);
695
      int64_t start_ts = ObTimeUtility::current_time();
696
      FLOG_INFO("[UPGRADE] start to run upgrade system variable job", K(tenant_id));
697
      if (OB_FAIL(check_stop())) {
698
        LOG_WARN("executor should stopped", KR(ret));
699
      } else if (OB_FAIL(check_schema_sync_(tenant_id))) {
700
        LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
701
      } else if (OB_TMP_FAIL(ObUpgradeUtils::upgrade_sys_variable(*common_rpc_proxy_, *sql_proxy_, tenant_id))) {
702
        LOG_WARN("fail to upgrade sys variable", KR(tmp_ret), K(tenant_id));
703
        backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
704
      }
705
      FLOG_INFO("[UPGRADE] finish run upgrade system variable job",
706
                KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
707
    } // end for
708
    ret = OB_SUCC(ret) ? backup_ret : ret;
709
  }
710
  return ret;
711
}
712

713
// NOTICE: enable_sys_table_ddl should be true before run this job.
714
int ObUpgradeExecutor::run_upgrade_system_table_job_(
715
    const common::ObIArray<uint64_t> &tenant_ids)
716
{
717
  int ret = OB_SUCCESS;
718
  if (OB_FAIL(check_inner_stat_())) {
719
    LOG_WARN("fail to check inner stat", KR(ret));
720
  } else if (OB_FAIL(check_stop())) {
721
    LOG_WARN("executor should stopped", KR(ret));
722
  } else {
723
    int backup_ret = OB_SUCCESS;
724
    int tmp_ret = OB_SUCCESS;
725
    for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
726
      const uint64_t tenant_id = tenant_ids.at(i);
727
      int64_t start_ts = ObTimeUtility::current_time();
728
      FLOG_INFO("[UPGRADE] start to run upgrade system table job", K(tenant_id));
729
      if (OB_FAIL(check_stop())) {
730
        LOG_WARN("executor should stopped", KR(ret));
731
      } else if (OB_TMP_FAIL(upgrade_system_table_(tenant_id))) {
732
        LOG_WARN("fail to upgrade system table", KR(tmp_ret), K(tenant_id));
733
        backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
734
      }
735
      FLOG_INFO("[UPGRADE] finish run upgrade system table job",
736
                KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
737
    } // end for
738
    ret = OB_SUCC(ret) ? backup_ret : ret;
739
  }
740
  return ret;
741
}
742

743
int ObUpgradeExecutor::upgrade_system_table_(const uint64_t tenant_id)
744
{
745
  int ret = OB_SUCCESS;
746
  if (OB_FAIL(check_inner_stat_())) {
747
    LOG_WARN("fail to check inner stat", KR(ret));
748
  } else if (OB_FAIL(check_stop())) {
749
    LOG_WARN("executor should stopped", KR(ret));
750
  } else {
751
    ObArray<uint64_t> upgrade_table_ids; // miss or mismatch
752
    // Only core/system tables can be upgraded here.
753
    // 1. __all_core_table can't be altered.
754
    // 2. sys index table and sys lob table will be added with sys data table, and can't be altered.
755
    const schema_create_func *creator_ptr_array[] = {
756
      share::core_table_schema_creators,
757
      share::sys_table_schema_creators, NULL };
758

759
    // check system table
760
    ObTableSchema table_schema;
761
    bool exist = false;
762
    for (const schema_create_func **creator_ptr_ptr = creator_ptr_array;
763
         OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr_ptr); ++creator_ptr_ptr) {
764
      for (const schema_create_func *creator_ptr = *creator_ptr_ptr;
765
           OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr); ++creator_ptr) {
766
        table_schema.reset();
767
        if (OB_FAIL(check_stop())) {
768
          LOG_WARN("check_cancel failed", KR(ret));
769
        } else if (OB_FAIL((*creator_ptr)(table_schema))) {
770
          LOG_WARN("create table schema failed", KR(ret));
771
        } else if (!is_sys_tenant(tenant_id)
772
                   && OB_FAIL(ObSchemaUtils::construct_tenant_space_full_table(
773
                                  tenant_id, table_schema))) {
774
          LOG_WARN("fail to construct tenant space table", KR(ret), K(tenant_id));
775
        } else if (OB_FAIL(ObSysTableChecker::is_inner_table_exist(
776
                   tenant_id, table_schema, exist))) {
777
          LOG_WARN("fail to check inner table exist",
778
                   KR(ret), K(tenant_id), K(table_schema));
779
        } else if (!exist) {
780
          // skip
781
        } else if (OB_FAIL(check_table_schema_(tenant_id, table_schema))) {
782
          const uint64_t table_id = table_schema.get_table_id();
783
          if (OB_SCHEMA_ERROR != ret) {
784
            LOG_WARN("check_table_schema failed", KR(ret), K(tenant_id), K(table_id));
785
          } else {
786
            FLOG_INFO("[UPGRADE] table need upgrade", K(tenant_id), K(table_id),
787
                      "table_name", table_schema.get_table_name());
788
            if (OB_FAIL(upgrade_table_ids.push_back(table_id))) { // overwrite ret
789
              LOG_WARN("fail to push back upgrade table ids", KR(ret), K(tenant_id), K(table_id));
790
            }
791
          }
792
        }
793
      } // end for
794
    } // end for
795

796
    int tmp_ret = OB_SUCCESS;
797
    int backup_ret = OB_SUCCESS;
798
    // upgrade system table(create or alter)
799
    obrpc::ObUpgradeTableSchemaArg arg;
800
    bool upgrade_virtual_schema = false;
801
    const int64_t timeout = GCONF._ob_ddl_timeout;
802
    for (int64_t i = 0; OB_SUCC(ret) && i < upgrade_table_ids.count(); i++) {
803
      const uint64_t table_id = upgrade_table_ids.at(i);
804
      int64_t start_ts = ObTimeUtility::current_time();
805
      FLOG_INFO("[UPGRADE] start upgrade system table", K(tenant_id), K(table_id));
806
      if (OB_FAIL(check_stop())) {
807
        LOG_WARN("check_cancel failed", KR(ret));
808
      } else if (OB_FAIL(arg.init(tenant_id, table_id, upgrade_virtual_schema))) {
809
        LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(table_id));
810
      } else if (OB_FAIL(check_schema_sync_(tenant_id))) {
811
        LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
812
      } else if (OB_TMP_FAIL(common_rpc_proxy_->timeout(timeout).upgrade_table_schema(arg))) {
813
        LOG_WARN("fail to uggrade table schema", KR(tmp_ret), K(timeout), K(arg));
814
        backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
815
      }
816
      FLOG_INFO("[UPGRADE] finish upgrade system table",
817
                KR(tmp_ret), K(tenant_id), K(table_id), "cost", ObTimeUtility::current_time() - start_ts);
818
    } // end for
819
    ret = OB_SUCC(ret) ? backup_ret : ret;
820
  }
821
  return ret;
822
}
823

824
int ObUpgradeExecutor::check_table_schema_(const uint64_t tenant_id, const ObTableSchema &hard_code_table)
825
{
826
  int ret = OB_SUCCESS;
827
  const ObTableSchema *table = NULL;
828
  ObSchemaGetterGuard schema_guard;
829
  if (OB_FAIL(check_inner_stat_())) {
830
    LOG_WARN("fail to check inner stat", KR(ret));
831
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
832
    LOG_WARN("failed to get schema guard", KR(ret), K(tenant_id));
833
  } else if (OB_FAIL(schema_guard.get_table_schema(
834
             tenant_id, hard_code_table.get_table_id(), table))) {
835
    LOG_WARN("get_table_schema failed", KR(ret), K(tenant_id),
836
             "table_id", hard_code_table.get_table_id(),
837
             "table_name", hard_code_table.get_table_name());
838
  } else if (OB_ISNULL(table)) {
839
    ret = OB_SCHEMA_ERROR;
840
    LOG_WARN("table should not be null", KR(ret), K(tenant_id),
841
             "table_id", hard_code_table.get_table_id(),
842
             "table_name", hard_code_table.get_table_name());
843
  } else if (OB_FAIL(ObRootInspection::check_table_schema(hard_code_table, *table))) {
844
    LOG_WARN("fail to check table schema", KR(ret), K(tenant_id), K(hard_code_table), KPC(table));
845
  }
846
  return ret;
847
}
848

849

850
int ObUpgradeExecutor::run_upgrade_virtual_schema_job_(
851
    const common::ObIArray<uint64_t> &tenant_ids)
852
{
853
  int ret = OB_SUCCESS;
854
  if (OB_FAIL(check_inner_stat_())) {
855
    LOG_WARN("fail to check inner stat", KR(ret));
856
  } else if (OB_FAIL(check_stop())) {
857
    LOG_WARN("executor should stopped", KR(ret));
858
  } else {
859
    int backup_ret = OB_SUCCESS;
860
    int tmp_ret = OB_SUCCESS;
861
    obrpc::ObUpgradeTableSchemaArg arg;
862
    uint64_t invalid_table_id = OB_INVALID_ID;
863
    bool upgrade_virtual_schema = true;
864
    // TODO:(yanmu.ztl) upgrade single virtual table/sys view
865
    int64_t timeout = GCONF._ob_ddl_timeout;
866
    for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
867
      const uint64_t tenant_id = tenant_ids.at(i);
868
      int64_t start_ts = ObTimeUtility::current_time();
869
      FLOG_INFO("[UPGRADE] start to run upgrade virtual schema job", K(tenant_id));
870
      if (OB_FAIL(check_stop())) {
871
        LOG_WARN("executor should stopped", KR(ret));
872
      } else if (OB_FAIL(arg.init(tenant_id, invalid_table_id, upgrade_virtual_schema))) {
873
        LOG_WARN("fail to init arg", KR(ret), K(tenant_id));
874
      } else if (OB_FAIL(check_schema_sync_(tenant_id))) {
875
        LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
876
      } else if (OB_TMP_FAIL(common_rpc_proxy_->timeout(timeout).upgrade_table_schema(arg))) {
877
        LOG_WARN("fail to upgrade virtual schema", KR(tmp_ret), K(arg));
878
        backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
879
      }
880
      FLOG_INFO("[UPGRADE] finish run upgrade virtual schema job",
881
                KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
882
    } // end for
883
    ret = OB_SUCC(ret) ? backup_ret : ret;
884
  }
885
  return ret;
886
}
887

888
int ObUpgradeExecutor::run_upgrade_system_package_job_()
889
{
890
  int ret = OB_SUCCESS;
891
  const uint64_t tenant_id = OB_SYS_TENANT_ID;
892
  if (OB_FAIL(check_inner_stat_())) {
893
    LOG_WARN("fail to check inner stat", KR(ret));
894
  } else if (OB_FAIL(check_schema_sync_(tenant_id))) {
895
    LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
896
  } else if (OB_FAIL(upgrade_mysql_system_package_job_())) {
897
    LOG_WARN("fail to upgrade mysql system package", KR(ret));
898
#ifdef OB_BUILD_ORACLE_PL
899
  } else if (OB_FAIL(check_schema_sync_(tenant_id))) {
900
    LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
901
  } else if (OB_FAIL(upgrade_oracle_system_package_job_())) {
902
    LOG_WARN("fail to upgrade mysql system package", KR(ret));
903
#endif
904
  }
905
  return ret;
906
}
907

908
int ObUpgradeExecutor::upgrade_mysql_system_package_job_()
909
{
910
  int ret = OB_SUCCESS;
911
  int64_t start_ts = ObTimeUtility::current_time();
912
  FLOG_INFO("[UPGRADE] start to run upgrade mysql system package job");
913
  int64_t timeout = GCONF._ob_ddl_timeout;
914
  const char *create_package_sql =
915
        "CREATE OR REPLACE PACKAGE __DBMS_UPGRADE \
916
           PROCEDURE UPGRADE(package_name VARCHAR(1024)); \
917
           PROCEDURE UPGRADE_ALL(); \
918
         END;";
919
  const char *create_package_body_sql =
920
        "CREATE OR REPLACE PACKAGE BODY __DBMS_UPGRADE \
921
           PROCEDURE UPGRADE(package_name VARCHAR(1024)); \
922
             PRAGMA INTERFACE(c, UPGRADE_SINGLE); \
923
           PROCEDURE UPGRADE_ALL(); \
924
             PRAGMA INTERFACE(c, UPGRADE_ALL); \
925
         END;";
926
  const char *upgrade_sql = "CALL __DBMS_UPGRADE.UPGRADE_ALL();";
927
  ObTimeoutCtx ctx;
928
  int64_t affected_rows = 0;
929
  if (OB_FAIL(check_inner_stat_())) {
930
    LOG_WARN("fail to check inner stat", KR(ret));
931
  } else if (OB_FAIL(ctx.set_timeout(timeout))) {
932
    LOG_WARN("fail to set timeout", KR(ret));
933
  } else if (OB_FAIL(sql_proxy_->write(
934
             OB_SYS_TENANT_ID, create_package_sql, affected_rows))) {
935
    LOG_WARN("fail to execute sql", KR(ret), "sql", create_package_sql);
936
  } else if (0 != affected_rows) {
937
    ret = OB_ERR_UNEXPECTED;
938
    LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
939
  } else if (OB_FAIL(check_stop())) {
940
    LOG_WARN("executor is stop", KR(ret));
941
  } else if (OB_FAIL(ctx.set_timeout(timeout))) {
942
    LOG_WARN("fail to set timeout", KR(ret));
943
  } else if (OB_FAIL(sql_proxy_->write(
944
             OB_SYS_TENANT_ID, create_package_body_sql, affected_rows))) {
945
    LOG_WARN("fail to execute sql", KR(ret), "sql", create_package_body_sql);
946
  } else if (0 != affected_rows) {
947
    ret = OB_ERR_UNEXPECTED;
948
    LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
949
  } else if (OB_FAIL(check_stop())) {
950
    LOG_WARN("executor is stop", KR(ret));
951
  } else if (OB_FAIL(ctx.set_timeout(timeout))) {
952
    LOG_WARN("fail to set timeout", KR(ret));
953
  } else if (OB_FAIL(sql_proxy_->write(
954
             OB_SYS_TENANT_ID, upgrade_sql, affected_rows))) {
955
    LOG_WARN("fail to execute sql", KR(ret), "sql", upgrade_sql);
956
  } else if (0 != affected_rows) {
957
    ret = OB_ERR_UNEXPECTED;
958
    LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
959
  }
960
  FLOG_INFO("[UPGRADE] finish run upgrade mysql system package job",
961
            KR(ret), "cost", ObTimeUtility::current_time() - start_ts);
962
  return ret;
963
}
964

965
#ifdef OB_BUILD_ORACLE_PL
966
int ObUpgradeExecutor::upgrade_oracle_system_package_job_()
967
{
968
  int ret = OB_SUCCESS;
969
  int64_t start_ts = ObTimeUtility::current_time();
970
  FLOG_INFO("[UPGRADE] start to run upgrade oracle system package job");
971
  ObCompatibilityMode mode = ObCompatibilityMode::ORACLE_MODE;
972
  int64_t timeout = GCONF._ob_ddl_timeout;
973
  const char *create_package_sql =
974
        "CREATE OR REPLACE PACKAGE \"__DBMS_UPGRADE\" IS \
975
           PROCEDURE UPGRADE(package_name VARCHAR2); \
976
           PROCEDURE UPGRADE_ALL; \
977
         END;";
978
  const char *create_package_body_sql =
979
        "CREATE OR REPLACE PACKAGE BODY \"__DBMS_UPGRADE\" IS \
980
           PROCEDURE UPGRADE(package_name VARCHAR2); \
981
             PRAGMA INTERFACE(c, UPGRADE_SINGLE); \
982
           PROCEDURE UPGRADE_ALL; \
983
             PRAGMA INTERFACE(c, UPGRADE_ALL); \
984
         END;";
985
  const char *upgrade_sql = "CALL \"__DBMS_UPGRADE\".UPGRADE_ALL();";
986
  ObTimeoutCtx ctx;
987
  int64_t affected_rows = 0;
988
  if (OB_FAIL(check_inner_stat_())) {
989
    LOG_WARN("fail to check inner stat", KR(ret));
990
  } else if (OB_FAIL(ctx.set_timeout(timeout))) {
991
    LOG_WARN("fail to set timeout", KR(ret));
992
  } else if (OB_FAIL(sql_proxy_->write(
993
             OB_SYS_TENANT_ID, create_package_sql,
994
             affected_rows, static_cast<int64_t>(mode)))) {
995
    LOG_WARN("fail to execute sql", KR(ret), "sql", create_package_sql);
996
  } else if (0 != affected_rows) {
997
    ret = OB_ERR_UNEXPECTED;
998
    LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
999
  } else if (OB_FAIL(check_stop())) {
1000
    LOG_WARN("executor is stop", KR(ret));
1001
  } else if (OB_FAIL(ctx.set_timeout(timeout))) {
1002
    LOG_WARN("fail to set timeout", KR(ret));
1003
  } else if (OB_FAIL(sql_proxy_->write(
1004
             OB_SYS_TENANT_ID, create_package_body_sql,
1005
             affected_rows, static_cast<int64_t>(mode)))) {
1006
    LOG_WARN("fail to execute sql", KR(ret), "sql", create_package_body_sql);
1007
  } else if (0 != affected_rows) {
1008
    ret = OB_ERR_UNEXPECTED;
1009
    LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
1010
  } else if (OB_FAIL(check_stop())) {
1011
    LOG_WARN("executor is stop", KR(ret));
1012
  } else if (OB_FAIL(ctx.set_timeout(timeout))) {
1013
    LOG_WARN("fail to set timeout", KR(ret));
1014
  } else if (OB_FAIL(sql_proxy_->write(
1015
             OB_SYS_TENANT_ID, upgrade_sql,
1016
             affected_rows, static_cast<int64_t>(mode)))) {
1017
    LOG_WARN("fail to execute sql", KR(ret), "sql", upgrade_sql);
1018
  } else if (0 != affected_rows) {
1019
    ret = OB_ERR_UNEXPECTED;
1020
    LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
1021
  }
1022
  FLOG_INFO("[UPGRADE] finish run upgrade oracle system package job",
1023
            KR(ret), "cost", ObTimeUtility::current_time() - start_ts);
1024
  return ret;
1025
}
1026
#endif
1027

1028
int ObUpgradeExecutor::run_upgrade_all_post_action_(
1029
    const common::ObIArray<uint64_t> &tenant_ids)
1030
{
1031
  int ret = OB_SUCCESS;
1032
  if (OB_FAIL(check_inner_stat_())) {
1033
    LOG_WARN("fail to check inner stat", KR(ret));
1034
  } else if (OB_FAIL(check_stop())) {
1035
    LOG_WARN("executor should stopped", KR(ret));
1036
  } else {
1037
    int64_t backup_ret = OB_SUCCESS;
1038
    int tmp_ret = OB_SUCCESS;
1039
    for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
1040
      const uint64_t tenant_id = tenant_ids.at(i);
1041
      int64_t start_ts = ObTimeUtility::current_time();
1042
      FLOG_INFO("[UPGRADE] start to run upgrade all post action", K(tenant_id));
1043
      if (OB_FAIL(check_stop())) {
1044
        LOG_WARN("executor should stopped", KR(ret));
1045
      } else if (OB_TMP_FAIL(run_upgrade_all_post_action_(tenant_id))) {
1046
        LOG_WARN("fail to upgrade all post action", KR(ret), K(tenant_id));
1047
        backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
1048
      }
1049
      FLOG_INFO("[UPGRADE] finish run upgrade all post action",
1050
                KR(ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
1051
    } // end for
1052
    ret = OB_SUCC(ret) ? backup_ret : ret;
1053
  }
1054
  return ret;
1055
}
1056

1057
int ObUpgradeExecutor::run_upgrade_all_post_action_(
1058
    const uint64_t tenant_id)
1059
{
1060
  int ret = OB_SUCCESS;
1061
  if (OB_FAIL(check_inner_stat_())) {
1062
    LOG_WARN("fail to check inner stat", KR(ret));
1063
  } else if (OB_FAIL(check_stop())) {
1064
    LOG_WARN("executor should stopped", KR(ret));
1065
  } else {
1066
    uint64_t current_data_version = 0;
1067
    int64_t start_idx = OB_INVALID_INDEX;
1068
    int64_t end_idx = OB_INVALID_INDEX;
1069
    ObGlobalStatProxy proxy(*sql_proxy_, tenant_id);
1070
    if (OB_FAIL(proxy.get_current_data_version(current_data_version))) {
1071
      LOG_WARN("fail to get current data version",
1072
               KR(ret), K(tenant_id), K(current_data_version));
1073
    } else if (OB_FAIL(upgrade_processors_.get_processor_idx_by_range(
1074
                       current_data_version, DATA_CURRENT_VERSION,
1075
                       start_idx, end_idx))) {
1076
      LOG_WARN("fail to get processor by version", KR(ret), K(current_data_version));
1077
    }
1078
    for (int64_t i = start_idx + 1; OB_SUCC(ret) && i <= end_idx; i++) {
1079
      ObBaseUpgradeProcessor *processor  = NULL;
1080
      int64_t version = OB_INVALID_VERSION;
1081
      if (OB_FAIL(check_stop())) {
1082
        LOG_WARN("executor should stopped", KR(ret));
1083
      } else if (OB_FAIL(upgrade_processors_.get_processor_by_idx(i, processor))) {
1084
        LOG_WARN("fail to get processor", KR(ret), K(current_data_version), K(i));
1085
      } else if (FALSE_IT(version = processor->get_version())) {
1086
      } else if (FALSE_IT(processor->set_tenant_id(tenant_id))) {
1087
      } else if (OB_FAIL(check_schema_sync_(tenant_id))) {
1088
        LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
1089
      } else if (OB_FAIL(processor->post_upgrade())) {
1090
        LOG_WARN("run post upgrade by version failed", KR(ret), K(tenant_id), K(version));
1091
      } else if (OB_FAIL(check_schema_sync_(tenant_id))) {
1092
        LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
1093
      } else if (OB_FAIL(proxy.update_current_data_version(version))) {
1094
        LOG_WARN("fail to update current data version", KR(ret), K(tenant_id), K(version));
1095
      }
1096
    } // end for
1097
  }
1098
  return ret;
1099
}
1100

1101
int ObUpgradeExecutor::run_upgrade_inspection_job_(
1102
    const common::ObIArray<uint64_t> &tenant_ids)
1103
{
1104
  int ret = OB_SUCCESS;
1105
  if (OB_FAIL(check_inner_stat_())) {
1106
    LOG_WARN("fail to check inner stat", KR(ret));
1107
  } else if (OB_FAIL(check_stop())) {
1108
    LOG_WARN("executor should stopped", KR(ret));
1109
  } else {
1110
    int backup_ret = OB_SUCCESS;
1111
    int tmp_ret = OB_SUCCESS;
1112
    for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
1113
      const uint64_t tenant_id = tenant_ids.at(i);
1114
      int64_t start_ts = ObTimeUtility::current_time();
1115
      FLOG_INFO("[UPGRADE] start to run upgrade inspection job", K(tenant_id));
1116
      if (OB_FAIL(check_stop())) {
1117
        LOG_WARN("executor should stopped", KR(ret));
1118
      } else if (OB_FAIL(check_schema_sync_(tenant_id))) {
1119
        LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
1120
      } else if (OB_TMP_FAIL(root_inspection_->check_tenant(tenant_id))) {
1121
        LOG_WARN("fail to do upgrade inspection", KR(tmp_ret), K(tenant_id));
1122
        backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
1123
      }
1124
      FLOG_INFO("[UPGRADE] finish run upgrade inspection job",
1125
                KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
1126
    } // end for
1127
    ret = OB_SUCC(ret) ? backup_ret : ret;
1128
  }
1129
  return ret;
1130
}
1131

1132
int ObUpgradeExecutor::run_upgrade_end_action_(
1133
    const common::ObIArray<uint64_t> &tenant_ids)
1134
{
1135
  int ret = OB_SUCCESS;
1136
  if (OB_FAIL(check_inner_stat_())) {
1137
    LOG_WARN("fail to check inner stat", KR(ret));
1138
  } else if (OB_FAIL(check_stop())) {
1139
    LOG_WARN("executor should stopped", KR(ret));
1140
  } else {
1141
    int64_t backup_ret = OB_SUCCESS;
1142
    int tmp_ret = OB_SUCCESS;
1143
    for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
1144
      const uint64_t tenant_id = tenant_ids.at(i);
1145
      int64_t start_ts = ObTimeUtility::current_time();
1146
      FLOG_INFO("[UPGRADE] start to run upgrade end action", K(tenant_id));
1147
      if (OB_FAIL(check_stop())) {
1148
        LOG_WARN("executor should stopped", KR(ret));
1149
      } else if (OB_FAIL(check_schema_sync_(tenant_id))) {
1150
        LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
1151
      } else if (OB_TMP_FAIL(run_upgrade_end_action_(tenant_id))) {
1152
        LOG_WARN("fail to upgrade end action", KR(ret), K(tenant_id));
1153
        backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
1154
      }
1155
      FLOG_INFO("[UPGRADE] finish run upgrade end action",
1156
                KR(ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
1157
    } // end for
1158
    ret = OB_SUCC(ret) ? backup_ret : ret;
1159
  }
1160
  return ret;
1161
}
1162

1163
int ObUpgradeExecutor::run_upgrade_end_action_(
1164
    const uint64_t tenant_id)
1165
{
1166
  int ret = OB_SUCCESS;
1167
  if (OB_FAIL(check_inner_stat_())) {
1168
    LOG_WARN("fail to check inner stat", KR(ret));
1169
  } else if (OB_FAIL(check_stop())) {
1170
    LOG_WARN("executor should stopped", KR(ret));
1171
  } else {
1172
    ObGlobalStatProxy proxy(*sql_proxy_, tenant_id);
1173
    uint64_t target_data_version = 0;
1174
    uint64_t current_data_version = 0;
1175
    bool for_update = false;
1176
    uint64_t data_version = 0;
1177
    if (OB_FAIL(proxy.get_target_data_version(for_update, target_data_version))) {
1178
      LOG_WARN("fail to get target data version", KR(ret), K(tenant_id));
1179
    } else if (OB_FAIL(proxy.get_current_data_version(current_data_version))) {
1180
      LOG_WARN("fail to get current data version", KR(ret), K(tenant_id));
1181
    } else if (target_data_version != current_data_version
1182
               || target_data_version != DATA_CURRENT_VERSION) {
1183
      ret = OB_STATE_NOT_MATCH;
1184
      LOG_WARN("data_version not match, upgrade process should be run",
1185
               KR(ret), K(tenant_id), K(target_data_version), K(current_data_version));
1186
    } else {
1187
      // target_data_version == current_data_version == DATA_CURRENT_VERSION
1188
      if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
1189
        LOG_WARN("fail to get min data version", KR(ret), K(tenant_id));
1190
      } else if (data_version >= current_data_version) {
1191
        LOG_INFO("[UPGRADE] data version is not less than current data version, just skip",
1192
                 K(tenant_id), K(data_version), K(current_data_version));
1193
      } else {
1194
        HEAP_VAR(obrpc::ObAdminSetConfigItem, item) {
1195
        ObSchemaGetterGuard guard;
1196
        const ObSimpleTenantSchema *tenant = NULL;
1197
        obrpc::ObAdminSetConfigArg arg;
1198
        item.exec_tenant_id_ = OB_SYS_TENANT_ID;
1199
        const int64_t timeout = GCONF.internal_sql_execute_timeout;
1200
        int64_t pos = ObClusterVersion::print_version_str(
1201
                      item.value_.ptr(), item.value_.capacity(),
1202
                      current_data_version);
1203
        if (pos <= 0) {
1204
          ret = OB_INVALID_ARGUMENT;
1205
          LOG_WARN("current_data_version is invalid",
1206
                   KR(ret), K(tenant_id), K(current_data_version));
1207
        } else if (OB_FAIL(GSCHEMASERVICE.get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
1208
          LOG_WARN("fail to get schema guard", KR(ret));
1209
        } else if (OB_FAIL(guard.get_tenant_info(tenant_id, tenant))) {
1210
          LOG_WARN("fail to get tenant info", KR(ret), K(tenant_id));
1211
        } else if (OB_ISNULL(tenant)) {
1212
          ret = OB_TENANT_NOT_EXIST;
1213
          LOG_WARN("tenant not exist", KR(ret), K(tenant_id));
1214
        } else if (OB_FAIL(item.tenant_name_.assign(tenant->get_tenant_name()))) {
1215
          LOG_WARN("fail to assign tenant name", KR(ret), K(tenant_id));
1216
        } else if (OB_FAIL(item.name_.assign("compatible"))) {
1217
          LOG_WARN("fail to assign config name", KR(ret), K(tenant_id));
1218
        } else if (OB_FAIL(arg.items_.push_back(item))) {
1219
          LOG_WARN("fail to push back item", KR(ret), K(item));
1220
        } else if (OB_FAIL(common_rpc_proxy_->timeout(timeout).admin_set_config(arg))) {
1221
          LOG_WARN("fail to set config", KR(ret), K(arg), K(timeout));
1222
        } else {
1223
          int64_t start_ts = ObTimeUtility::current_time();
1224
          while (OB_SUCC(ret)) {
1225
            if (OB_FAIL(check_stop())) {
1226
              LOG_WARN("executor should stopped", KR(ret));
1227
            } else if (ObTimeUtility::current_time() - start_ts >= timeout) {
1228
              ret = OB_TIMEOUT;
1229
              LOG_WARN("wait config taking effective failed",
1230
                       KR(ret), K(tenant_id), K(timeout));
1231
            } else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
1232
              LOG_WARN("fail to get min data version", KR(ret), K(tenant_id));
1233
            } else if (data_version >= current_data_version) {
1234
              LOG_INFO("[UPGRADE] config take effective", K(tenant_id),
1235
                       "cost", ObTimeUtility::current_time() - start_ts);
1236
              break;
1237
            } else {
1238
              LOG_INFO("[UPGRADE] config doesn't take effective", K(tenant_id));
1239
              usleep(1 * 1000 * 1000L); // 1s
1240
            }
1241
          }
1242
        }
1243
        } // end HEAP_VAR
1244
      }
1245
    }
1246
  }
1247
  return ret;
1248
}
1249

1250
int ObUpgradeExecutor::run_upgrade_all_(
1251
    const common::ObIArray<uint64_t> &tenant_ids)
1252
{
1253
  int ret = OB_SUCCESS;
1254
  int64_t start_ts = ObTimeUtility::current_time();
1255
  FLOG_INFO("[UPGRADE] start to run upgrade all action");
1256
  if (OB_FAIL(check_inner_stat_())) {
1257
    LOG_WARN("fail to check inner stat", KR(ret));
1258
  } else if (OB_FAIL(check_stop())) {
1259
    LOG_WARN("executor should stopped", KR(ret));
1260
  } else if (OB_FAIL(run_upgrade_begin_action_(tenant_ids))) {
1261
    LOG_WARN("fail to run upgrade begin job", KR(ret));
1262
  } else if (OB_FAIL(run_upgrade_system_variable_job_(tenant_ids))) {
1263
    LOG_WARN("fail to run upgrade system variable job", KR(ret));
1264
  } else if (OB_FAIL(run_upgrade_system_table_job_(tenant_ids))) {
1265
    LOG_WARN("fail to run upgrade system table job", KR(ret));
1266
  } else if (OB_FAIL(run_upgrade_virtual_schema_job_(tenant_ids))) {
1267
    LOG_WARN("fail to run upgrade virtual schema job", KR(ret));
1268
  } else if (has_exist_in_array(tenant_ids, OB_SYS_TENANT_ID)
1269
             && OB_FAIL(run_upgrade_system_package_job_())) {
1270
    LOG_WARN("fail to run upgrade system package job", KR(ret));
1271
  } else if (OB_FAIL(run_upgrade_all_post_action_(tenant_ids))) {
1272
    LOG_WARN("fail to run upgrade all post action", KR(ret));
1273
  } else if (OB_FAIL(run_upgrade_inspection_job_(tenant_ids))) {
1274
    LOG_WARN("fail to run upgrade inspection job", KR(ret));
1275
  } else if (OB_FAIL(run_upgrade_end_action_(tenant_ids))) {
1276
    LOG_WARN("fail to run upgrade end job", KR(ret));
1277
  }
1278
  FLOG_INFO("[UPGRADE] finish run upgrade all action",
1279
            KR(ret), "cost", ObTimeUtility::current_time() - start_ts);
1280
  return ret;
1281
}
1282

1283
int ObUpgradeExecutor::construct_tenant_ids_(
1284
    const common::ObIArray<uint64_t> &src_tenant_ids,
1285
    common::ObIArray<uint64_t> &dst_tenant_ids)
1286
{
1287
  int ret = OB_SUCCESS;
1288
  ObArray<uint64_t> standby_tenants;
1289
  ObTenantRole tenant_role(share::ObTenantRole::INVALID_TENANT);
1290
  ObSchemaGetterGuard schema_guard;
1291
  if (OB_FAIL(check_inner_stat_())) {
1292
    LOG_WARN("fail to check inner stat", KR(ret));
1293
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
1294
    LOG_WARN("fail to get sys tenant schema guard", KR(ret));
1295
  } else if (src_tenant_ids.count() > 0) {
1296
    for (int64_t i = 0; OB_SUCC(ret) && i < src_tenant_ids.count(); i++) {
1297
      const uint64_t tenant_id = src_tenant_ids.at(i);
1298
      const ObSimpleTenantSchema *tenant_schema = nullptr;
1299
      if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
1300
        LOG_WARN("fail to get tenant info", KR(ret), K(tenant_id));
1301
      } else if (OB_ISNULL(tenant_schema)) {
1302
        ret = OB_ERR_UNEXPECTED;
1303
        LOG_WARN("tenant schema is null", KR(ret), KP(tenant_schema));
1304
      } else if (!tenant_schema->is_normal()) {
1305
        ret = OB_NOT_SUPPORTED;
1306
        LOG_WARN("tenant is not normal, can not do upgrade", KR(ret), K(tenant_id), KPC(tenant_schema));
1307
      } else if (OB_FAIL(ObAllTenantInfoProxy::get_tenant_role(sql_proxy_, tenant_id, tenant_role))) {
1308
        LOG_WARN("fail to get tenant role", KR(ret), K(tenant_id), K(tenant_role));
1309
      } else if (!tenant_role.is_primary()) {
1310
        ret = OB_NOT_SUPPORTED;
1311
        LOG_WARN("not support to upgrade a non-primary tenant", KR(ret), K(tenant_id), K(tenant_role));
1312
      }
1313
    } // end for
1314
    // tenant_list is specified
1315
    if (FAILEDx(dst_tenant_ids.assign(src_tenant_ids))) {
1316
      LOG_WARN("fail to assign tenant_ids", KR(ret));
1317
    }
1318
  } else {
1319
    ObArray<uint64_t> tenant_ids;
1320
    if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) {
1321
      LOG_WARN("fail to get tenant_ids", KR(ret));
1322
    }
1323
    for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); i++) {
1324
      const uint64_t tenant_id = tenant_ids.at(i);
1325
      const ObSimpleTenantSchema *tenant_schema = nullptr;
1326
      if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
1327
        LOG_WARN("fail to get tenant info", KR(ret), K(tenant_id));
1328
      } else if (OB_ISNULL(tenant_schema)) {
1329
        ret = OB_ERR_UNEXPECTED;
1330
        LOG_WARN("tenant schema is null", KR(ret), KP(tenant_schema));
1331
      } else if (!tenant_schema->is_normal()) {
1332
        ret = OB_NOT_SUPPORTED;
1333
        LOG_WARN("tenant is not normal, can not do upgrade", KR(ret), K(tenant_id), KPC(tenant_schema));
1334
      } else if (OB_FAIL(ObAllTenantInfoProxy::get_tenant_role(sql_proxy_, tenant_id, tenant_role))) {
1335
        LOG_WARN("fail to get tenant role", KR(ret), K(tenant_id), K(tenant_role));
1336
      } else if (tenant_role.is_standby()) {
1337
        // skip
1338
      } else if (!tenant_role.is_primary()) {
1339
        ret = OB_NOT_SUPPORTED;
1340
        LOG_WARN("not support do upgrade with tenant role is neither primary nor standby",
1341
                 KR(ret), K(tenant_id), K(tenant_role));
1342
      } else if (OB_FAIL(dst_tenant_ids.push_back(tenant_id))) {
1343
        LOG_WARN("fail to push back tenant_id", KR(ret), K(tenant_id));
1344
      }
1345
    } // end for
1346
  }
1347
  return ret;
1348
}
1349

1350
ObRsJobType ObUpgradeExecutor::convert_to_job_type_(
1351
  const obrpc::ObUpgradeJobArg::Action &action)
1352
{
1353
  ObRsJobType job_type = JOB_TYPE_INVALID;
1354
  switch (action) {
1355
    case obrpc::ObUpgradeJobArg::UPGRADE_POST_ACTION: {
1356
      job_type = ObRsJobType::JOB_TYPE_UPGRADE_POST_ACTION;
1357
      break;
1358
    }
1359
    case obrpc::ObUpgradeJobArg::UPGRADE_BEGIN: {
1360
      job_type = ObRsJobType::JOB_TYPE_UPGRADE_BEGIN;
1361
      break;
1362
    }
1363
    case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE: {
1364
      job_type = ObRsJobType::JOB_TYPE_UPGRADE_SYSTEM_VARIABLE;
1365
      break;
1366
    }
1367
    case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE: {
1368
      job_type = ObRsJobType::JOB_TYPE_UPGRADE_SYSTEM_TABLE;
1369
      break;
1370
    }
1371
    case obrpc::ObUpgradeJobArg::UPGRADE_VIRTUAL_SCHEMA: {
1372
      job_type = ObRsJobType::JOB_TYPE_UPGRADE_VIRTUAL_SCHEMA;
1373
      break;
1374
    }
1375
    case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_PACKAGE: {
1376
      job_type = ObRsJobType::JOB_TYPE_UPGRADE_SYSTEM_PACKAGE;
1377
      break;
1378
    }
1379
    case obrpc::ObUpgradeJobArg::UPGRADE_ALL_POST_ACTION: {
1380
      job_type = ObRsJobType::JOB_TYPE_UPGRADE_ALL_POST_ACTION;
1381
      break;
1382
    }
1383
    case obrpc::ObUpgradeJobArg::UPGRADE_INSPECTION: {
1384
      job_type = ObRsJobType::JOB_TYPE_UPGRADE_INSPECTION;
1385
      break;
1386
    }
1387
    case obrpc::ObUpgradeJobArg::UPGRADE_END: {
1388
      job_type = ObRsJobType::JOB_TYPE_UPGRADE_END;
1389
      break;
1390
    }
1391
    case obrpc::ObUpgradeJobArg::UPGRADE_ALL: {
1392
      job_type = ObRsJobType::JOB_TYPE_UPGRADE_ALL;
1393
      break;
1394
    }
1395
    default: {
1396
      job_type = JOB_TYPE_INVALID;
1397
      break;
1398
    }
1399
  }
1400
  return job_type;
1401
}
1402

1403
}//end rootserver
1404
}//end oceanbase
1405

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.