oceanbase
1404 строки · 57.6 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14
15#include "rootserver/ob_upgrade_executor.h"
16#include "rootserver/ob_ls_service_helper.h"
17#include "rootserver/tenant_snapshot/ob_tenant_snapshot_util.h" //ObTenantSnapshotUtil
18#include "observer/ob_server_struct.h"
19#include "share/ob_global_stat_proxy.h"
20#include "share/ob_cluster_event_history_table_operator.h"//CLUSTER_EVENT_INSTANCE
21#include "share/ob_primary_standby_service.h" // ObPrimaryStandbyService
22#include "share/ob_tenant_info_proxy.h" //ObAllTenantInfoProxy
23#include "observer/ob_service.h"
24
25namespace oceanbase
26{
27using namespace common;
28using namespace common::sqlclient;
29using namespace share;
30using namespace share::schema;
31
32namespace rootserver
33{
34
35int64_t ObUpgradeTask::get_deep_copy_size() const
36{
37return sizeof(*this);
38}
39
40ObAsyncTask *ObUpgradeTask::deep_copy(char *buf, const int64_t buf_size) const
41{
42ObAsyncTask *task = NULL;
43int ret = OB_SUCCESS;
44const int64_t need_size = get_deep_copy_size();
45if (NULL == buf) {
46ret = OB_INVALID_ARGUMENT;
47LOG_WARN("buf is null", KR(ret));
48} else if (buf_size < need_size) {
49ret = OB_INVALID_ARGUMENT;
50LOG_WARN("buf is not long enough", K(need_size), K(buf_size), KR(ret));
51} else {
52task = new(buf) ObUpgradeTask(*upgrade_executor_);
53if (OB_FAIL(static_cast<ObUpgradeTask *>(task)->init(arg_))) {
54LOG_WARN("fail to init task", KR(ret), K_(arg));
55}
56}
57return task;
58}
59
60int ObUpgradeTask::init(const obrpc::ObUpgradeJobArg &arg)
61{
62int ret = OB_SUCCESS;
63if (OB_FAIL(arg_.assign(arg))) {
64LOG_WARN("fail to assign arg", KR(ret));
65}
66return ret;
67}
68
69int ObUpgradeTask::process()
70{
71const int64_t start = ObTimeUtility::current_time();
72FLOG_INFO("[UPGRADE] start to do execute upgrade task", K(start), K_(arg));
73int ret = OB_SUCCESS;
74if (OB_ISNULL(upgrade_executor_)) {
75ret = OB_ERR_UNEXPECTED;
76LOG_WARN("upgrade_executor_ is null", KR(ret));
77} else if (OB_FAIL(upgrade_executor_->execute(arg_))) {
78LOG_WARN("fail to execute upgrade task", KR(ret), K_(arg));
79}
80FLOG_INFO("[UPGRADE] finish execute upgrade task",
81KR(ret), K_(arg), "cost_us", ObTimeUtility::current_time() - start);
82return ret;
83}
84
85ObUpgradeExecutor::ObUpgradeExecutor()
86: inited_(false), stopped_(false), execute_(false), rwlock_(ObLatchIds::DEFAULT_SPIN_RWLOCK),
87sql_proxy_(NULL), rpc_proxy_(NULL), common_rpc_proxy_(NULL),
88schema_service_(NULL), root_inspection_(NULL),
89upgrade_processors_()
90{}
91
92int ObUpgradeExecutor::init(
93share::schema::ObMultiVersionSchemaService &schema_service,
94rootserver::ObRootInspection &root_inspection,
95common::ObMySQLProxy &sql_proxy,
96obrpc::ObSrvRpcProxy &rpc_proxy,
97obrpc::ObCommonRpcProxy &common_proxy)
98{
99int ret = OB_SUCCESS;
100if (inited_) {
101ret = OB_INIT_TWICE;
102LOG_WARN("can't init twice", KR(ret));
103} else if (OB_FAIL(upgrade_processors_.init(
104ObBaseUpgradeProcessor::UPGRADE_MODE_OB,
105sql_proxy, rpc_proxy, common_proxy, schema_service, *this))) {
106LOG_WARN("fail to init upgrade processors", KR(ret));
107} else {
108schema_service_ = &schema_service;
109root_inspection_ = &root_inspection;
110sql_proxy_ = &sql_proxy;
111rpc_proxy_ = &rpc_proxy;
112common_rpc_proxy_ = &common_proxy;
113stopped_ = false;
114execute_ = false;
115inited_ = true;
116}
117return ret;
118}
119
120void ObUpgradeExecutor::start()
121{
122SpinWLockGuard guard(rwlock_);
123stopped_ = false;
124}
125
126int ObUpgradeExecutor::stop()
127{
128int ret = OB_SUCCESS;
129const uint64_t WAIT_US = 100 * 1000L; //100ms
130const uint64_t MAX_WAIT_US = 10 * 1000 * 1000L; //10s
131const int64_t start = ObTimeUtility::current_time();
132{
133SpinWLockGuard guard(rwlock_);
134stopped_ = true;
135}
136while (OB_SUCC(ret)) {
137if (ObTimeUtility::current_time() - start > MAX_WAIT_US) {
138ret = OB_TIMEOUT;
139LOG_WARN("use too much time", KR(ret), "cost_us", ObTimeUtility::current_time() - start);
140} else if (!check_execute()) {
141break;
142} else {
143ob_usleep(WAIT_US);
144}
145}
146return ret;
147}
148
149int ObUpgradeExecutor::check_stop() const
150{
151int ret = OB_SUCCESS;
152SpinRLockGuard guard(rwlock_);
153if (OB_FAIL(check_inner_stat_())) {
154LOG_WARN("fail to check inner stat", KR(ret));
155} else if (stopped_) {
156ret = OB_CANCELED;
157LOG_WARN("executor should stopped", KR(ret));
158}
159return ret;
160}
161
162bool ObUpgradeExecutor::check_execute() const
163{
164SpinRLockGuard guard(rwlock_);
165bool bret = execute_;
166return bret;
167}
168
169int ObUpgradeExecutor::set_execute_mark_()
170{
171int ret = OB_SUCCESS;
172SpinWLockGuard guard(rwlock_);
173if (OB_FAIL(check_inner_stat_())) {
174LOG_WARN("fail to check inner stat", KR(ret));
175} else if (stopped_ || execute_) {
176ret = OB_OP_NOT_ALLOW;
177LOG_WARN("can't run job at the same time", KR(ret));
178} else {
179execute_ = true;
180}
181return ret;
182}
183
184int ObUpgradeExecutor::can_execute()
185{
186int ret = OB_SUCCESS;
187SpinWLockGuard guard(rwlock_);
188if (OB_FAIL(check_inner_stat_())) {
189LOG_WARN("fail to check inner stat", KR(ret));
190} else if (stopped_ || execute_) {
191ret = OB_OP_NOT_ALLOW;
192LOG_WARN("status not matched", KR(ret),
193"stopped", stopped_ ? "true" : "false",
194"build", execute_ ? "true" : "false");
195}
196return ret;
197}
198
199int ObUpgradeExecutor::check_inner_stat_() const
200{
201int ret = OB_SUCCESS;
202if (!inited_) {
203ret = OB_NOT_INIT;
204LOG_WARN("not inited", KR(ret));
205} else if (OB_ISNULL(schema_service_)
206|| OB_ISNULL(root_inspection_)
207|| OB_ISNULL(sql_proxy_)
208|| OB_ISNULL(rpc_proxy_)
209|| OB_ISNULL(common_rpc_proxy_)){
210ret = OB_ERR_UNEXPECTED;
211LOG_WARN("ptr is null", KR(ret), KP_(schema_service), KP_(root_inspection),
212KP_(sql_proxy), KP_(rpc_proxy), KP_(common_rpc_proxy));
213}
214return ret;
215}
216
217// wait schema sync in cluster
218int ObUpgradeExecutor::check_schema_sync_(const uint64_t tenant_id)
219{
220const int64_t start = ObTimeUtility::current_time();
221LOG_INFO("[UPGRADE] start to check schema sync", K(tenant_id), K(start));
222int ret = OB_SUCCESS;
223if (OB_FAIL(check_inner_stat_())) {
224LOG_WARN("fail to check inner stat", KR(ret));
225} else {
226const int64_t WAIT_US = 1000 * 1000L; // 1 second
227bool is_sync = false;
228while (OB_SUCC(ret)) {
229if (OB_FAIL(check_stop())) {
230LOG_WARN("executor is stop", KR(ret));
231} else if (OB_FAIL(ObUpgradeUtils::check_schema_sync(tenant_id, is_sync))) {
232LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
233} else if (is_sync) {
234break;
235} else {
236LOG_INFO("schema not sync, should wait", KR(ret), K(tenant_id));
237ob_usleep(static_cast<useconds_t>((WAIT_US)));
238}
239}
240}
241LOG_INFO("[UPGRADE] check schema sync finish", KR(ret), K(tenant_id),
242"cost_us", ObTimeUtility::current_time() - start);
243return ret;
244}
245
246// Ensure primary cluster's schema_version is not greator than standby clusters'.
247int ObUpgradeExecutor::check_schema_sync_(
248obrpc::ObTenantSchemaVersions &primary_schema_versions,
249obrpc::ObTenantSchemaVersions &standby_schema_versions,
250bool &schema_sync)
251{
252int ret = OB_SUCCESS;
253int64_t primary_cnt = primary_schema_versions.tenant_schema_versions_.count();
254int64_t standby_cnt = standby_schema_versions.tenant_schema_versions_.count();
255if (primary_cnt <= 0 || standby_cnt <= 0) {
256ret = OB_INVALID_ARGUMENT;
257LOG_WARN("invalid cnt", KR(ret));
258} else if (OB_FAIL(check_stop())) {
259LOG_WARN("executor should stopped", KR(ret));
260} else {
261schema_sync = true;
262for (int64_t i = 0; schema_sync && OB_SUCC(ret) && i < primary_cnt; i++) {
263bool find = false;
264TenantIdAndSchemaVersion &primary = primary_schema_versions.tenant_schema_versions_.at(i);
265// check normal tenant only
266if (OB_SYS_TENANT_ID == primary.tenant_id_) {
267continue;
268} else {
269for (int64_t j = 0; !find && OB_SUCC(ret) && j < standby_cnt; j++) {
270TenantIdAndSchemaVersion &standby = standby_schema_versions.tenant_schema_versions_.at(j);
271if (OB_FAIL(check_stop())) {
272LOG_WARN("executor should stopped", KR(ret));
273} else if (primary.tenant_id_ == standby.tenant_id_) {
274find = true;
275schema_sync = (primary.schema_version_ <= standby.schema_version_);
276LOG_INFO("check if tenant schema is sync",
277KR(ret), K(primary), K(standby), K(schema_sync));
278}
279}
280if (OB_SUCC(ret) && !find) {
281schema_sync = false;
282}
283}
284}
285}
286return ret;
287}
288
289//TODO:
290//1. Run upgrade job by tenant.
291//2. Check tenant role/tenant status before run upgrade job.
292int ObUpgradeExecutor::execute(
293const obrpc::ObUpgradeJobArg &arg)
294{
295ObCurTraceId::init(GCONF.self_addr_);
296int ret = OB_SUCCESS;
297ObArray<uint64_t> tenant_ids;
298obrpc::ObUpgradeJobArg::Action action = arg.action_;
299int64_t version = arg.version_;
300ObRsJobType job_type = convert_to_job_type_(arg.action_);
301if (OB_FAIL(check_inner_stat_())) {
302LOG_WARN("fail to check inner stat", KR(ret));
303} else if (JOB_TYPE_INVALID == job_type) {
304ret = OB_INVALID_ARGUMENT;
305LOG_WARN("invalid job type", KR(ret), K(arg));
306} else if (version > 0 && !ObUpgradeChecker::check_data_version_exist(version)) {
307ret = OB_NOT_SUPPORTED;
308LOG_WARN("unsupported version to run upgrade job", KR(ret), K(arg));
309} else if (OB_FAIL(construct_tenant_ids_(arg.tenant_ids_, tenant_ids))) {
310LOG_WARN("fail to construct tenant_ids", KR(ret), K(arg));
311} else if (OB_FAIL(set_execute_mark_())) {
312LOG_WARN("fail to set execute mark", KR(ret));
313// NOTICE: don't add any `else if` after set_execute_mark_().
314} else {
315const uint64_t tenant_id = (1 == tenant_ids.count()) ? tenant_ids.at(0) : 0;
316const int64_t BUF_LEN = common::MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH;
317char extra_buf[BUF_LEN] = {'\0'};
318int64_t job_id = OB_INVALID_ID;
319uint64_t current_data_version = 0;
320if (0 != tenant_id && OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, current_data_version))) {
321LOG_WARN("fail to get min data version", KR(ret), K(tenant_id));
322} else if (OB_FAIL(fill_extra_info_(tenant_id, version,
323current_data_version, BUF_LEN, extra_buf))) {
324LOG_WARN("fail to fill extra info", KR(ret),
325K(tenant_id), K(version), K(current_data_version));
326} else if (OB_FAIL(RS_JOB_CREATE_WITH_RET(
327job_id, job_type, *sql_proxy_, "tenant_id", tenant_id,
328"extra_info", ObHexEscapeSqlStr(ObString(strlen(extra_buf), extra_buf))))) {
329LOG_WARN("fail to create rs job", KR(ret));
330} else if (job_id <= 0) {
331ret = OB_ERR_UNEXPECTED;
332LOG_WARN("job_id is invalid", KR(ret), K(job_id));
333} else {
334switch (action) {
335case obrpc::ObUpgradeJobArg::UPGRADE_POST_ACTION: {
336if (OB_FAIL(run_upgrade_post_job_(tenant_ids, version))) {
337LOG_WARN("fail to run upgrade post job", KR(ret), K(version));
338}
339break;
340}
341case obrpc::ObUpgradeJobArg::UPGRADE_BEGIN: {
342if (OB_FAIL(run_upgrade_begin_action_(tenant_ids))) {
343LOG_WARN("fail to run upgrade begin job", KR(ret));
344}
345break;
346}
347case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE: {
348if (OB_FAIL(run_upgrade_system_variable_job_(tenant_ids))) {
349LOG_WARN("fail to run upgrade system variable job", KR(ret));
350}
351break;
352}
353case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE: {
354if (OB_FAIL(run_upgrade_system_table_job_(tenant_ids))) {
355LOG_WARN("fail to run upgrade system table job", KR(ret));
356}
357break;
358}
359case obrpc::ObUpgradeJobArg::UPGRADE_VIRTUAL_SCHEMA: {
360if (OB_FAIL(run_upgrade_virtual_schema_job_(tenant_ids))) {
361LOG_WARN("fail to run upgrade virtual schema job", KR(ret));
362}
363break;
364}
365case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_PACKAGE: {
366if (OB_FAIL(run_upgrade_system_package_job_())) {
367LOG_WARN("fail to run upgrade system package job", KR(ret));
368}
369break;
370}
371case obrpc::ObUpgradeJobArg::UPGRADE_ALL_POST_ACTION: {
372if (OB_FAIL(run_upgrade_all_post_action_(tenant_ids))) {
373LOG_WARN("fail to run upgrade all post action", KR(ret));
374}
375break;
376}
377case obrpc::ObUpgradeJobArg::UPGRADE_INSPECTION: {
378if (OB_FAIL(run_upgrade_inspection_job_(tenant_ids))) {
379LOG_WARN("fail to run upgrade inspection job", KR(ret));
380}
381break;
382}
383case obrpc::ObUpgradeJobArg::UPGRADE_END: {
384if (OB_FAIL(run_upgrade_end_action_(tenant_ids))) {
385LOG_WARN("fail to run upgrade end job", KR(ret));
386}
387break;
388}
389case obrpc::ObUpgradeJobArg::UPGRADE_ALL: {
390if (OB_FAIL(run_upgrade_all_(tenant_ids))) {
391LOG_WARN("fail to run upgrade all action", KR(ret));
392}
393break;
394}
395default: {
396ret = OB_NOT_SUPPORTED;
397LOG_WARN("not support upgrade job type", KR(ret), K(action));
398break;
399}
400}
401}
402
403if (OB_SUCC(ret)) {
404const int64_t BUF_LEN = OB_SERVER_VERSION_LENGTH;
405char min_cluster_version_str[BUF_LEN] = {'\0'};
406const uint64_t min_cluster_version = GET_MIN_CLUSTER_VERSION();
407char targe_data_version_str[BUF_LEN] = {'\0'};
408const uint64_t target_data_version = DATA_CURRENT_VERSION;
409share::ObServerInfoInTable::ObBuildVersion build_version;
410if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
411min_cluster_version_str, BUF_LEN, min_cluster_version)) {
412ret = OB_SIZE_OVERFLOW;
413LOG_WARN("fail to print version str", KR(ret), K(min_cluster_version));
414} else if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
415targe_data_version_str, BUF_LEN, target_data_version)) {
416ret = OB_SIZE_OVERFLOW;
417LOG_WARN("fail to print version str", KR(ret), K(target_data_version));
418} else if (OB_FAIL(observer::ObService::get_build_version(build_version))) {
419LOG_WARN("fail to get build version", KR(ret));
420} else if (0 != tenant_id) {
421char current_data_version_str[BUF_LEN] = {'\0'};
422if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
423current_data_version_str, BUF_LEN, current_data_version)) {
424ret = OB_SIZE_OVERFLOW;
425LOG_WARN("fail to print version str", KR(ret), K(current_data_version));
426}
427CLUSTER_EVENT_SYNC_ADD("UPGRADE",
428ObRsJobTableOperator::get_job_type_str(job_type),
429"cluster_version", min_cluster_version_str,
430"build_version", build_version.ptr(),
431"target_data_version", targe_data_version_str,
432"current_data_version", current_data_version_str,
433"tenant_id", tenant_id)
434} else {
435CLUSTER_EVENT_SYNC_ADD("UPGRADE",
436ObRsJobTableOperator::get_job_type_str(job_type),
437"cluster_version", min_cluster_version_str,
438"build_version", build_version.ptr(),
439"target_data_version", targe_data_version_str);
440}
441}
442
443if (job_id > 0) {
444int tmp_ret = OB_SUCCESS;
445if (OB_SUCCESS != (tmp_ret = RS_JOB_COMPLETE(job_id, ret, *sql_proxy_))) {
446LOG_ERROR("fail to complete job", K(tmp_ret), KR(ret), K(job_id));
447ret = OB_FAIL(ret) ? ret : tmp_ret;
448}
449}
450execute_ = false;
451}
452return ret;
453}
454
455int ObUpgradeExecutor::fill_extra_info_(
456const uint64_t tenant_id,
457const int64_t specified_version,
458const uint64_t current_data_version,
459const int64_t buf_len,
460char *buf)
461{
462int ret = OB_SUCCESS;
463int64_t len = 0;
464const int64_t VERSION_LEN = common::OB_CLUSTER_VERSION_LENGTH;
465char version_buf[VERSION_LEN] = {'\0'};
466int64_t version_len = 0;
467if (specified_version > 0) {
468if (OB_INVALID_INDEX == (version_len = ObClusterVersion::print_version_str(
469version_buf, VERSION_LEN, static_cast<uint64_t>(specified_version)))) {
470ret = OB_SIZE_OVERFLOW;
471LOG_WARN("fail to print version", KR(ret), K(specified_version));
472} else if (OB_FAIL(databuff_printf(buf, buf_len, len,
473"SPECIFIED_DATA_VERSION: '%s'", version_buf))) {
474LOG_WARN("fail to print string", KR(ret), K(len));
475}
476} else {
477if (OB_SUCC(ret)) {
478uint64_t target_data_version = DATA_CURRENT_VERSION;
479if (OB_INVALID_INDEX == (version_len = ObClusterVersion::print_version_str(
480version_buf, VERSION_LEN, target_data_version))) {
481ret = OB_SIZE_OVERFLOW;
482LOG_WARN("fail to print version", KR(ret), K(target_data_version));
483} else if (OB_FAIL(databuff_printf(buf, buf_len, len,
484"TARGET_DATA_VERSION: '%s'", version_buf))) {
485LOG_WARN("fail to print string", KR(ret), K(len));
486}
487}
488if (OB_FAIL(ret)) {
489} else if (0 != tenant_id) {
490// record current data version when upgrade single tenant
491if (OB_UNLIKELY(len < 1)) {
492ret = OB_ERR_UNEXPECTED;
493LOG_WARN("str should not be empty", KR(ret), K(len));
494} else if (OB_INVALID_INDEX == (version_len = ObClusterVersion::print_version_str(
495version_buf, VERSION_LEN, current_data_version))) {
496ret = OB_SIZE_OVERFLOW;
497LOG_WARN("fail to print version", KR(ret), K(current_data_version));
498} else if (OB_FAIL(databuff_printf(buf, buf_len, len,
499", CURRENT_DATA_VERSION: '%s'", version_buf))) {
500LOG_WARN("fail to print string", KR(ret), K(len));
501}
502}
503}
504return ret;
505}
506
507// Python upgrade script may set enable_ddl = false before it run upgrade job.
508// this function won't raise current_data_version
509int ObUpgradeExecutor::run_upgrade_post_job_(
510const common::ObIArray<uint64_t> &tenant_ids,
511const int64_t version)
512{
513int ret = OB_SUCCESS;
514if (OB_FAIL(check_inner_stat_())) {
515LOG_WARN("fail to check inner stat", KR(ret));
516} else if (OB_FAIL(check_stop())) {
517LOG_WARN("executor should stopped", KR(ret));
518} else if (!ObUpgradeChecker::check_data_version_exist(version)) {
519ret = OB_NOT_SUPPORTED;
520LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version));
521} else {
522ObBaseUpgradeProcessor *processor = NULL;
523int64_t backup_ret = OB_SUCCESS;
524int tmp_ret = OB_SUCCESS;
525if (OB_FAIL(upgrade_processors_.get_processor_by_version(
526version, processor))) {
527LOG_WARN("fail to get processor by version", KR(ret), K(version));
528} else {
529for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
530const uint64_t tenant_id = tenant_ids.at(i);
531int64_t start_ts = ObTimeUtility::current_time();
532int64_t current_version = processor->get_version();
533processor->set_tenant_id(tenant_id);
534FLOG_INFO("[UPGRADE] start to run post upgrade job by version",
535K(tenant_id), K(current_version));
536if (OB_FAIL(check_stop())) {
537LOG_WARN("executor should stopped", KR(ret));
538} else if (OB_TMP_FAIL(processor->post_upgrade())) {
539LOG_WARN("run post upgrade by version failed",
540KR(tmp_ret), K(tenant_id), K(current_version));
541backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
542}
543FLOG_INFO("[UPGRADE] finish post upgrade job by version",
544KR(tmp_ret), K(tenant_id), K(current_version),
545"cost", ObTimeUtility::current_time() - start_ts);
546} // end for
547}
548ret = OB_SUCC(ret) ? backup_ret : ret;
549}
550return ret;
551}
552
553int ObUpgradeExecutor::run_upgrade_begin_action_(
554const common::ObIArray<uint64_t> &tenant_ids)
555{
556int ret = OB_SUCCESS;
557common::hash::ObHashMap<uint64_t, share::SCN> tenants_sys_ls_target_scn;
558lib::ObMemAttr attr(OB_SYS_TENANT_ID, "UPGRADE");
559const int BUCKET_NUM = hash::cal_next_prime(tenant_ids.count());
560if (OB_FAIL(check_inner_stat_())) {
561LOG_WARN("fail to check inner stat", KR(ret));
562} else if (OB_FAIL(check_stop())) {
563LOG_WARN("executor should stopped", KR(ret));
564} else if (OB_FAIL(tenants_sys_ls_target_scn.create(BUCKET_NUM, attr))) {
565LOG_WARN("fail to create tenants_sys_ls_target_scn", KR(ret));
566} else {
567int64_t backup_ret = OB_SUCCESS;
568int tmp_ret = OB_SUCCESS;
569tenants_sys_ls_target_scn.clear();
570for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
571const uint64_t tenant_id = tenant_ids.at(i);
572int64_t start_ts = ObTimeUtility::current_time();
573FLOG_INFO("[UPGRADE] start to run upgrade begin action", K(tenant_id));
574if (OB_FAIL(check_stop())) {
575LOG_WARN("executor should stopped", KR(ret));
576} else if (OB_TMP_FAIL(run_upgrade_begin_action_(tenant_id, tenants_sys_ls_target_scn))) {
577LOG_WARN("fail to upgrade begin action", KR(ret), K(tenant_id));
578backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
579}
580FLOG_INFO("[UPGRADE] finish run upgrade begin action step 1/2, write upgrade barrier log",
581KR(ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
582} // end for
583ret = OB_SUCC(ret) ? backup_ret : ret;
584if (OB_SUCC(ret)) {
585int64_t start_ts_step2 = ObTimeUtility::current_time();
586ret = ObLSServiceHelper::wait_all_tenants_user_ls_sync_scn(tenants_sys_ls_target_scn);
587FLOG_INFO("[UPGRADE] finish run upgrade begin action step 2/2, wait all tenants' sync_scn",
588KR(ret), "cost", ObTimeUtility::current_time() - start_ts_step2);
589}
590}
591return ret;
592}
593
594int ObUpgradeExecutor::run_upgrade_begin_action_(
595const uint64_t tenant_id,
596common::hash::ObHashMap<uint64_t, share::SCN> &tenants_sys_ls_target_scn)
597{
598int ret = OB_SUCCESS;
599ObMySQLTransaction trans;
600share::SCN sys_ls_target_scn = SCN::invalid_scn();
601ObConflictCaseWithClone case_to_check(ObConflictCaseWithClone::UPGRADE);
602if (OB_FAIL(check_inner_stat_())) {
603LOG_WARN("fail to check inner stat", KR(ret));
604} else if (OB_FAIL(check_stop())) {
605LOG_WARN("executor should stopped", KR(ret));
606} else if (OB_FAIL(trans.start(sql_proxy_, tenant_id))) {
607LOG_WARN("fail to start trans", KR(ret), K(tenant_id));
608} else {
609ObGlobalStatProxy proxy(trans, tenant_id);
610// get target_data_version
611uint64_t target_data_version = 0;
612const uint64_t DEFAULT_DATA_VERSION = DATA_VERSION_4_0_0_0;
613bool for_update = true;
614if (OB_FAIL(proxy.get_target_data_version(for_update, target_data_version))) {
615if (OB_ERR_NULL_VALUE == ret
616&& GET_MIN_CLUSTER_VERSION() <= CLUSTER_VERSION_4_1_0_0) {
617// 4.0 -> 4.1
618uint64_t current_data_version = 0;
619ret = proxy.get_current_data_version(current_data_version);
620if (OB_ERR_NULL_VALUE != ret) {
621ret = OB_SUCC(ret) ? OB_ERR_UNEXPECTED : ret;
622LOG_WARN("current data version should be not exist",
623KR(ret), K(tenant_id), K(current_data_version));
624} else if (OB_FAIL(proxy.update_current_data_version(DEFAULT_DATA_VERSION))) {
625// overwrite ret
626LOG_WARN("fail to init current data version",
627KR(ret), K(tenant_id), K(DEFAULT_DATA_VERSION));
628} else {
629target_data_version = DEFAULT_DATA_VERSION;
630LOG_INFO("[UPGRADE] init missing current data version",
631KR(ret), K(tenant_id), K(DEFAULT_DATA_VERSION));
632}
633} else {
634LOG_WARN("fail to get target data version", KR(ret), K(tenant_id));
635}
636}
637// check tenant not in cloning procedure in trans
638if (OB_FAIL(ret)) {
639} else if (OB_FAIL(ObTenantSnapshotUtil::check_tenant_not_in_cloning_procedure(tenant_id, case_to_check))) {
640LOG_WARN("fail to check whether tenant is in cloning produre", KR(ret), K(tenant_id));
641}
642// try update target_data_version
643if (OB_FAIL(ret)) {
644} else if (target_data_version >= DATA_CURRENT_VERSION) {
645LOG_INFO("[UPGRADE] target data version is new enough, just skip",
646KR(ret), K(tenant_id), K(target_data_version));
647} else if (OB_FAIL(proxy.update_target_data_version(DATA_CURRENT_VERSION))) {
648LOG_WARN("fail to update target data version",
649KR(ret), K(tenant_id), "version", DATA_CURRENT_VERSION);
650} else if (is_user_tenant(tenant_id)
651&& OB_FAIL(OB_PRIMARY_STANDBY_SERVICE.write_upgrade_barrier_log(
652trans, tenant_id, DATA_CURRENT_VERSION))) {
653LOG_WARN("fail to write_upgrade_barrier_log",
654KR(ret), K(tenant_id), "version", DATA_CURRENT_VERSION);
655} else {
656LOG_INFO("[UPGRADE] update target data version",
657KR(ret), K(tenant_id), "version", DATA_CURRENT_VERSION);
658}
659}
660if (trans.is_started()) {
661int tmp_ret = OB_SUCCESS;
662if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
663LOG_WARN("trans end failed", KR(tmp_ret), K(ret));
664ret = OB_SUCC(ret) ? tmp_ret : ret;
665}
666}
667if (OB_FAIL(ret)) {
668} else if (!is_user_tenant(tenant_id)) {
669// skip
670} else if (OB_FAIL(ObGlobalStatProxy::get_target_data_version_ora_rowscn(tenant_id, sys_ls_target_scn))) {
671LOG_WARN("fail to get sys_ls_target_scn", KR(ret), K(tenant_id));
672} else if (OB_FAIL(tenants_sys_ls_target_scn.set_refactored(
673tenant_id,
674sys_ls_target_scn,
6750 /* flag: 0 shows that not cover existing object. */))) {
676LOG_WARN("fail to push an element into tenants_sys_ls_target_scn", KR(ret), K(tenant_id),
677K(sys_ls_target_scn));
678}
679return ret;
680}
681
682int ObUpgradeExecutor::run_upgrade_system_variable_job_(
683const common::ObIArray<uint64_t> &tenant_ids)
684{
685int ret = OB_SUCCESS;
686if (OB_FAIL(check_inner_stat_())) {
687LOG_WARN("fail to check inner stat", KR(ret));
688} else if (OB_FAIL(check_stop())) {
689LOG_WARN("executor should stopped", KR(ret));
690} else {
691int backup_ret = OB_SUCCESS;
692int tmp_ret = OB_SUCCESS;
693for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
694const uint64_t tenant_id = tenant_ids.at(i);
695int64_t start_ts = ObTimeUtility::current_time();
696FLOG_INFO("[UPGRADE] start to run upgrade system variable job", K(tenant_id));
697if (OB_FAIL(check_stop())) {
698LOG_WARN("executor should stopped", KR(ret));
699} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
700LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
701} else if (OB_TMP_FAIL(ObUpgradeUtils::upgrade_sys_variable(*common_rpc_proxy_, *sql_proxy_, tenant_id))) {
702LOG_WARN("fail to upgrade sys variable", KR(tmp_ret), K(tenant_id));
703backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
704}
705FLOG_INFO("[UPGRADE] finish run upgrade system variable job",
706KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
707} // end for
708ret = OB_SUCC(ret) ? backup_ret : ret;
709}
710return ret;
711}
712
713// NOTICE: enable_sys_table_ddl should be true before run this job.
714int ObUpgradeExecutor::run_upgrade_system_table_job_(
715const common::ObIArray<uint64_t> &tenant_ids)
716{
717int ret = OB_SUCCESS;
718if (OB_FAIL(check_inner_stat_())) {
719LOG_WARN("fail to check inner stat", KR(ret));
720} else if (OB_FAIL(check_stop())) {
721LOG_WARN("executor should stopped", KR(ret));
722} else {
723int backup_ret = OB_SUCCESS;
724int tmp_ret = OB_SUCCESS;
725for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
726const uint64_t tenant_id = tenant_ids.at(i);
727int64_t start_ts = ObTimeUtility::current_time();
728FLOG_INFO("[UPGRADE] start to run upgrade system table job", K(tenant_id));
729if (OB_FAIL(check_stop())) {
730LOG_WARN("executor should stopped", KR(ret));
731} else if (OB_TMP_FAIL(upgrade_system_table_(tenant_id))) {
732LOG_WARN("fail to upgrade system table", KR(tmp_ret), K(tenant_id));
733backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
734}
735FLOG_INFO("[UPGRADE] finish run upgrade system table job",
736KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
737} // end for
738ret = OB_SUCC(ret) ? backup_ret : ret;
739}
740return ret;
741}
742
743int ObUpgradeExecutor::upgrade_system_table_(const uint64_t tenant_id)
744{
745int ret = OB_SUCCESS;
746if (OB_FAIL(check_inner_stat_())) {
747LOG_WARN("fail to check inner stat", KR(ret));
748} else if (OB_FAIL(check_stop())) {
749LOG_WARN("executor should stopped", KR(ret));
750} else {
751ObArray<uint64_t> upgrade_table_ids; // miss or mismatch
752// Only core/system tables can be upgraded here.
753// 1. __all_core_table can't be altered.
754// 2. sys index table and sys lob table will be added with sys data table, and can't be altered.
755const schema_create_func *creator_ptr_array[] = {
756share::core_table_schema_creators,
757share::sys_table_schema_creators, NULL };
758
759// check system table
760ObTableSchema table_schema;
761bool exist = false;
762for (const schema_create_func **creator_ptr_ptr = creator_ptr_array;
763OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr_ptr); ++creator_ptr_ptr) {
764for (const schema_create_func *creator_ptr = *creator_ptr_ptr;
765OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr); ++creator_ptr) {
766table_schema.reset();
767if (OB_FAIL(check_stop())) {
768LOG_WARN("check_cancel failed", KR(ret));
769} else if (OB_FAIL((*creator_ptr)(table_schema))) {
770LOG_WARN("create table schema failed", KR(ret));
771} else if (!is_sys_tenant(tenant_id)
772&& OB_FAIL(ObSchemaUtils::construct_tenant_space_full_table(
773tenant_id, table_schema))) {
774LOG_WARN("fail to construct tenant space table", KR(ret), K(tenant_id));
775} else if (OB_FAIL(ObSysTableChecker::is_inner_table_exist(
776tenant_id, table_schema, exist))) {
777LOG_WARN("fail to check inner table exist",
778KR(ret), K(tenant_id), K(table_schema));
779} else if (!exist) {
780// skip
781} else if (OB_FAIL(check_table_schema_(tenant_id, table_schema))) {
782const uint64_t table_id = table_schema.get_table_id();
783if (OB_SCHEMA_ERROR != ret) {
784LOG_WARN("check_table_schema failed", KR(ret), K(tenant_id), K(table_id));
785} else {
786FLOG_INFO("[UPGRADE] table need upgrade", K(tenant_id), K(table_id),
787"table_name", table_schema.get_table_name());
788if (OB_FAIL(upgrade_table_ids.push_back(table_id))) { // overwrite ret
789LOG_WARN("fail to push back upgrade table ids", KR(ret), K(tenant_id), K(table_id));
790}
791}
792}
793} // end for
794} // end for
795
796int tmp_ret = OB_SUCCESS;
797int backup_ret = OB_SUCCESS;
798// upgrade system table(create or alter)
799obrpc::ObUpgradeTableSchemaArg arg;
800bool upgrade_virtual_schema = false;
801const int64_t timeout = GCONF._ob_ddl_timeout;
802for (int64_t i = 0; OB_SUCC(ret) && i < upgrade_table_ids.count(); i++) {
803const uint64_t table_id = upgrade_table_ids.at(i);
804int64_t start_ts = ObTimeUtility::current_time();
805FLOG_INFO("[UPGRADE] start upgrade system table", K(tenant_id), K(table_id));
806if (OB_FAIL(check_stop())) {
807LOG_WARN("check_cancel failed", KR(ret));
808} else if (OB_FAIL(arg.init(tenant_id, table_id, upgrade_virtual_schema))) {
809LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(table_id));
810} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
811LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
812} else if (OB_TMP_FAIL(common_rpc_proxy_->timeout(timeout).upgrade_table_schema(arg))) {
813LOG_WARN("fail to uggrade table schema", KR(tmp_ret), K(timeout), K(arg));
814backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
815}
816FLOG_INFO("[UPGRADE] finish upgrade system table",
817KR(tmp_ret), K(tenant_id), K(table_id), "cost", ObTimeUtility::current_time() - start_ts);
818} // end for
819ret = OB_SUCC(ret) ? backup_ret : ret;
820}
821return ret;
822}
823
824int ObUpgradeExecutor::check_table_schema_(const uint64_t tenant_id, const ObTableSchema &hard_code_table)
825{
826int ret = OB_SUCCESS;
827const ObTableSchema *table = NULL;
828ObSchemaGetterGuard schema_guard;
829if (OB_FAIL(check_inner_stat_())) {
830LOG_WARN("fail to check inner stat", KR(ret));
831} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
832LOG_WARN("failed to get schema guard", KR(ret), K(tenant_id));
833} else if (OB_FAIL(schema_guard.get_table_schema(
834tenant_id, hard_code_table.get_table_id(), table))) {
835LOG_WARN("get_table_schema failed", KR(ret), K(tenant_id),
836"table_id", hard_code_table.get_table_id(),
837"table_name", hard_code_table.get_table_name());
838} else if (OB_ISNULL(table)) {
839ret = OB_SCHEMA_ERROR;
840LOG_WARN("table should not be null", KR(ret), K(tenant_id),
841"table_id", hard_code_table.get_table_id(),
842"table_name", hard_code_table.get_table_name());
843} else if (OB_FAIL(ObRootInspection::check_table_schema(hard_code_table, *table))) {
844LOG_WARN("fail to check table schema", KR(ret), K(tenant_id), K(hard_code_table), KPC(table));
845}
846return ret;
847}
848
849
850int ObUpgradeExecutor::run_upgrade_virtual_schema_job_(
851const common::ObIArray<uint64_t> &tenant_ids)
852{
853int ret = OB_SUCCESS;
854if (OB_FAIL(check_inner_stat_())) {
855LOG_WARN("fail to check inner stat", KR(ret));
856} else if (OB_FAIL(check_stop())) {
857LOG_WARN("executor should stopped", KR(ret));
858} else {
859int backup_ret = OB_SUCCESS;
860int tmp_ret = OB_SUCCESS;
861obrpc::ObUpgradeTableSchemaArg arg;
862uint64_t invalid_table_id = OB_INVALID_ID;
863bool upgrade_virtual_schema = true;
864// TODO:(yanmu.ztl) upgrade single virtual table/sys view
865int64_t timeout = GCONF._ob_ddl_timeout;
866for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
867const uint64_t tenant_id = tenant_ids.at(i);
868int64_t start_ts = ObTimeUtility::current_time();
869FLOG_INFO("[UPGRADE] start to run upgrade virtual schema job", K(tenant_id));
870if (OB_FAIL(check_stop())) {
871LOG_WARN("executor should stopped", KR(ret));
872} else if (OB_FAIL(arg.init(tenant_id, invalid_table_id, upgrade_virtual_schema))) {
873LOG_WARN("fail to init arg", KR(ret), K(tenant_id));
874} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
875LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
876} else if (OB_TMP_FAIL(common_rpc_proxy_->timeout(timeout).upgrade_table_schema(arg))) {
877LOG_WARN("fail to upgrade virtual schema", KR(tmp_ret), K(arg));
878backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
879}
880FLOG_INFO("[UPGRADE] finish run upgrade virtual schema job",
881KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
882} // end for
883ret = OB_SUCC(ret) ? backup_ret : ret;
884}
885return ret;
886}
887
888int ObUpgradeExecutor::run_upgrade_system_package_job_()
889{
890int ret = OB_SUCCESS;
891const uint64_t tenant_id = OB_SYS_TENANT_ID;
892if (OB_FAIL(check_inner_stat_())) {
893LOG_WARN("fail to check inner stat", KR(ret));
894} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
895LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
896} else if (OB_FAIL(upgrade_mysql_system_package_job_())) {
897LOG_WARN("fail to upgrade mysql system package", KR(ret));
898#ifdef OB_BUILD_ORACLE_PL
899} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
900LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
901} else if (OB_FAIL(upgrade_oracle_system_package_job_())) {
902LOG_WARN("fail to upgrade mysql system package", KR(ret));
903#endif
904}
905return ret;
906}
907
908int ObUpgradeExecutor::upgrade_mysql_system_package_job_()
909{
910int ret = OB_SUCCESS;
911int64_t start_ts = ObTimeUtility::current_time();
912FLOG_INFO("[UPGRADE] start to run upgrade mysql system package job");
913int64_t timeout = GCONF._ob_ddl_timeout;
914const char *create_package_sql =
915"CREATE OR REPLACE PACKAGE __DBMS_UPGRADE \
916PROCEDURE UPGRADE(package_name VARCHAR(1024)); \
917PROCEDURE UPGRADE_ALL(); \
918END;";
919const char *create_package_body_sql =
920"CREATE OR REPLACE PACKAGE BODY __DBMS_UPGRADE \
921PROCEDURE UPGRADE(package_name VARCHAR(1024)); \
922PRAGMA INTERFACE(c, UPGRADE_SINGLE); \
923PROCEDURE UPGRADE_ALL(); \
924PRAGMA INTERFACE(c, UPGRADE_ALL); \
925END;";
926const char *upgrade_sql = "CALL __DBMS_UPGRADE.UPGRADE_ALL();";
927ObTimeoutCtx ctx;
928int64_t affected_rows = 0;
929if (OB_FAIL(check_inner_stat_())) {
930LOG_WARN("fail to check inner stat", KR(ret));
931} else if (OB_FAIL(ctx.set_timeout(timeout))) {
932LOG_WARN("fail to set timeout", KR(ret));
933} else if (OB_FAIL(sql_proxy_->write(
934OB_SYS_TENANT_ID, create_package_sql, affected_rows))) {
935LOG_WARN("fail to execute sql", KR(ret), "sql", create_package_sql);
936} else if (0 != affected_rows) {
937ret = OB_ERR_UNEXPECTED;
938LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
939} else if (OB_FAIL(check_stop())) {
940LOG_WARN("executor is stop", KR(ret));
941} else if (OB_FAIL(ctx.set_timeout(timeout))) {
942LOG_WARN("fail to set timeout", KR(ret));
943} else if (OB_FAIL(sql_proxy_->write(
944OB_SYS_TENANT_ID, create_package_body_sql, affected_rows))) {
945LOG_WARN("fail to execute sql", KR(ret), "sql", create_package_body_sql);
946} else if (0 != affected_rows) {
947ret = OB_ERR_UNEXPECTED;
948LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
949} else if (OB_FAIL(check_stop())) {
950LOG_WARN("executor is stop", KR(ret));
951} else if (OB_FAIL(ctx.set_timeout(timeout))) {
952LOG_WARN("fail to set timeout", KR(ret));
953} else if (OB_FAIL(sql_proxy_->write(
954OB_SYS_TENANT_ID, upgrade_sql, affected_rows))) {
955LOG_WARN("fail to execute sql", KR(ret), "sql", upgrade_sql);
956} else if (0 != affected_rows) {
957ret = OB_ERR_UNEXPECTED;
958LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
959}
960FLOG_INFO("[UPGRADE] finish run upgrade mysql system package job",
961KR(ret), "cost", ObTimeUtility::current_time() - start_ts);
962return ret;
963}
964
965#ifdef OB_BUILD_ORACLE_PL
966int ObUpgradeExecutor::upgrade_oracle_system_package_job_()
967{
968int ret = OB_SUCCESS;
969int64_t start_ts = ObTimeUtility::current_time();
970FLOG_INFO("[UPGRADE] start to run upgrade oracle system package job");
971ObCompatibilityMode mode = ObCompatibilityMode::ORACLE_MODE;
972int64_t timeout = GCONF._ob_ddl_timeout;
973const char *create_package_sql =
974"CREATE OR REPLACE PACKAGE \"__DBMS_UPGRADE\" IS \
975PROCEDURE UPGRADE(package_name VARCHAR2); \
976PROCEDURE UPGRADE_ALL; \
977END;";
978const char *create_package_body_sql =
979"CREATE OR REPLACE PACKAGE BODY \"__DBMS_UPGRADE\" IS \
980PROCEDURE UPGRADE(package_name VARCHAR2); \
981PRAGMA INTERFACE(c, UPGRADE_SINGLE); \
982PROCEDURE UPGRADE_ALL; \
983PRAGMA INTERFACE(c, UPGRADE_ALL); \
984END;";
985const char *upgrade_sql = "CALL \"__DBMS_UPGRADE\".UPGRADE_ALL();";
986ObTimeoutCtx ctx;
987int64_t affected_rows = 0;
988if (OB_FAIL(check_inner_stat_())) {
989LOG_WARN("fail to check inner stat", KR(ret));
990} else if (OB_FAIL(ctx.set_timeout(timeout))) {
991LOG_WARN("fail to set timeout", KR(ret));
992} else if (OB_FAIL(sql_proxy_->write(
993OB_SYS_TENANT_ID, create_package_sql,
994affected_rows, static_cast<int64_t>(mode)))) {
995LOG_WARN("fail to execute sql", KR(ret), "sql", create_package_sql);
996} else if (0 != affected_rows) {
997ret = OB_ERR_UNEXPECTED;
998LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
999} else if (OB_FAIL(check_stop())) {
1000LOG_WARN("executor is stop", KR(ret));
1001} else if (OB_FAIL(ctx.set_timeout(timeout))) {
1002LOG_WARN("fail to set timeout", KR(ret));
1003} else if (OB_FAIL(sql_proxy_->write(
1004OB_SYS_TENANT_ID, create_package_body_sql,
1005affected_rows, static_cast<int64_t>(mode)))) {
1006LOG_WARN("fail to execute sql", KR(ret), "sql", create_package_body_sql);
1007} else if (0 != affected_rows) {
1008ret = OB_ERR_UNEXPECTED;
1009LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
1010} else if (OB_FAIL(check_stop())) {
1011LOG_WARN("executor is stop", KR(ret));
1012} else if (OB_FAIL(ctx.set_timeout(timeout))) {
1013LOG_WARN("fail to set timeout", KR(ret));
1014} else if (OB_FAIL(sql_proxy_->write(
1015OB_SYS_TENANT_ID, upgrade_sql,
1016affected_rows, static_cast<int64_t>(mode)))) {
1017LOG_WARN("fail to execute sql", KR(ret), "sql", upgrade_sql);
1018} else if (0 != affected_rows) {
1019ret = OB_ERR_UNEXPECTED;
1020LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
1021}
1022FLOG_INFO("[UPGRADE] finish run upgrade oracle system package job",
1023KR(ret), "cost", ObTimeUtility::current_time() - start_ts);
1024return ret;
1025}
1026#endif
1027
1028int ObUpgradeExecutor::run_upgrade_all_post_action_(
1029const common::ObIArray<uint64_t> &tenant_ids)
1030{
1031int ret = OB_SUCCESS;
1032if (OB_FAIL(check_inner_stat_())) {
1033LOG_WARN("fail to check inner stat", KR(ret));
1034} else if (OB_FAIL(check_stop())) {
1035LOG_WARN("executor should stopped", KR(ret));
1036} else {
1037int64_t backup_ret = OB_SUCCESS;
1038int tmp_ret = OB_SUCCESS;
1039for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
1040const uint64_t tenant_id = tenant_ids.at(i);
1041int64_t start_ts = ObTimeUtility::current_time();
1042FLOG_INFO("[UPGRADE] start to run upgrade all post action", K(tenant_id));
1043if (OB_FAIL(check_stop())) {
1044LOG_WARN("executor should stopped", KR(ret));
1045} else if (OB_TMP_FAIL(run_upgrade_all_post_action_(tenant_id))) {
1046LOG_WARN("fail to upgrade all post action", KR(ret), K(tenant_id));
1047backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
1048}
1049FLOG_INFO("[UPGRADE] finish run upgrade all post action",
1050KR(ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
1051} // end for
1052ret = OB_SUCC(ret) ? backup_ret : ret;
1053}
1054return ret;
1055}
1056
1057int ObUpgradeExecutor::run_upgrade_all_post_action_(
1058const uint64_t tenant_id)
1059{
1060int ret = OB_SUCCESS;
1061if (OB_FAIL(check_inner_stat_())) {
1062LOG_WARN("fail to check inner stat", KR(ret));
1063} else if (OB_FAIL(check_stop())) {
1064LOG_WARN("executor should stopped", KR(ret));
1065} else {
1066uint64_t current_data_version = 0;
1067int64_t start_idx = OB_INVALID_INDEX;
1068int64_t end_idx = OB_INVALID_INDEX;
1069ObGlobalStatProxy proxy(*sql_proxy_, tenant_id);
1070if (OB_FAIL(proxy.get_current_data_version(current_data_version))) {
1071LOG_WARN("fail to get current data version",
1072KR(ret), K(tenant_id), K(current_data_version));
1073} else if (OB_FAIL(upgrade_processors_.get_processor_idx_by_range(
1074current_data_version, DATA_CURRENT_VERSION,
1075start_idx, end_idx))) {
1076LOG_WARN("fail to get processor by version", KR(ret), K(current_data_version));
1077}
1078for (int64_t i = start_idx + 1; OB_SUCC(ret) && i <= end_idx; i++) {
1079ObBaseUpgradeProcessor *processor = NULL;
1080int64_t version = OB_INVALID_VERSION;
1081if (OB_FAIL(check_stop())) {
1082LOG_WARN("executor should stopped", KR(ret));
1083} else if (OB_FAIL(upgrade_processors_.get_processor_by_idx(i, processor))) {
1084LOG_WARN("fail to get processor", KR(ret), K(current_data_version), K(i));
1085} else if (FALSE_IT(version = processor->get_version())) {
1086} else if (FALSE_IT(processor->set_tenant_id(tenant_id))) {
1087} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
1088LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
1089} else if (OB_FAIL(processor->post_upgrade())) {
1090LOG_WARN("run post upgrade by version failed", KR(ret), K(tenant_id), K(version));
1091} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
1092LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
1093} else if (OB_FAIL(proxy.update_current_data_version(version))) {
1094LOG_WARN("fail to update current data version", KR(ret), K(tenant_id), K(version));
1095}
1096} // end for
1097}
1098return ret;
1099}
1100
1101int ObUpgradeExecutor::run_upgrade_inspection_job_(
1102const common::ObIArray<uint64_t> &tenant_ids)
1103{
1104int ret = OB_SUCCESS;
1105if (OB_FAIL(check_inner_stat_())) {
1106LOG_WARN("fail to check inner stat", KR(ret));
1107} else if (OB_FAIL(check_stop())) {
1108LOG_WARN("executor should stopped", KR(ret));
1109} else {
1110int backup_ret = OB_SUCCESS;
1111int tmp_ret = OB_SUCCESS;
1112for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
1113const uint64_t tenant_id = tenant_ids.at(i);
1114int64_t start_ts = ObTimeUtility::current_time();
1115FLOG_INFO("[UPGRADE] start to run upgrade inspection job", K(tenant_id));
1116if (OB_FAIL(check_stop())) {
1117LOG_WARN("executor should stopped", KR(ret));
1118} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
1119LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
1120} else if (OB_TMP_FAIL(root_inspection_->check_tenant(tenant_id))) {
1121LOG_WARN("fail to do upgrade inspection", KR(tmp_ret), K(tenant_id));
1122backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
1123}
1124FLOG_INFO("[UPGRADE] finish run upgrade inspection job",
1125KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
1126} // end for
1127ret = OB_SUCC(ret) ? backup_ret : ret;
1128}
1129return ret;
1130}
1131
1132int ObUpgradeExecutor::run_upgrade_end_action_(
1133const common::ObIArray<uint64_t> &tenant_ids)
1134{
1135int ret = OB_SUCCESS;
1136if (OB_FAIL(check_inner_stat_())) {
1137LOG_WARN("fail to check inner stat", KR(ret));
1138} else if (OB_FAIL(check_stop())) {
1139LOG_WARN("executor should stopped", KR(ret));
1140} else {
1141int64_t backup_ret = OB_SUCCESS;
1142int tmp_ret = OB_SUCCESS;
1143for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
1144const uint64_t tenant_id = tenant_ids.at(i);
1145int64_t start_ts = ObTimeUtility::current_time();
1146FLOG_INFO("[UPGRADE] start to run upgrade end action", K(tenant_id));
1147if (OB_FAIL(check_stop())) {
1148LOG_WARN("executor should stopped", KR(ret));
1149} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
1150LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
1151} else if (OB_TMP_FAIL(run_upgrade_end_action_(tenant_id))) {
1152LOG_WARN("fail to upgrade end action", KR(ret), K(tenant_id));
1153backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
1154}
1155FLOG_INFO("[UPGRADE] finish run upgrade end action",
1156KR(ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
1157} // end for
1158ret = OB_SUCC(ret) ? backup_ret : ret;
1159}
1160return ret;
1161}
1162
1163int ObUpgradeExecutor::run_upgrade_end_action_(
1164const uint64_t tenant_id)
1165{
1166int ret = OB_SUCCESS;
1167if (OB_FAIL(check_inner_stat_())) {
1168LOG_WARN("fail to check inner stat", KR(ret));
1169} else if (OB_FAIL(check_stop())) {
1170LOG_WARN("executor should stopped", KR(ret));
1171} else {
1172ObGlobalStatProxy proxy(*sql_proxy_, tenant_id);
1173uint64_t target_data_version = 0;
1174uint64_t current_data_version = 0;
1175bool for_update = false;
1176uint64_t data_version = 0;
1177if (OB_FAIL(proxy.get_target_data_version(for_update, target_data_version))) {
1178LOG_WARN("fail to get target data version", KR(ret), K(tenant_id));
1179} else if (OB_FAIL(proxy.get_current_data_version(current_data_version))) {
1180LOG_WARN("fail to get current data version", KR(ret), K(tenant_id));
1181} else if (target_data_version != current_data_version
1182|| target_data_version != DATA_CURRENT_VERSION) {
1183ret = OB_STATE_NOT_MATCH;
1184LOG_WARN("data_version not match, upgrade process should be run",
1185KR(ret), K(tenant_id), K(target_data_version), K(current_data_version));
1186} else {
1187// target_data_version == current_data_version == DATA_CURRENT_VERSION
1188if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
1189LOG_WARN("fail to get min data version", KR(ret), K(tenant_id));
1190} else if (data_version >= current_data_version) {
1191LOG_INFO("[UPGRADE] data version is not less than current data version, just skip",
1192K(tenant_id), K(data_version), K(current_data_version));
1193} else {
1194HEAP_VAR(obrpc::ObAdminSetConfigItem, item) {
1195ObSchemaGetterGuard guard;
1196const ObSimpleTenantSchema *tenant = NULL;
1197obrpc::ObAdminSetConfigArg arg;
1198item.exec_tenant_id_ = OB_SYS_TENANT_ID;
1199const int64_t timeout = GCONF.internal_sql_execute_timeout;
1200int64_t pos = ObClusterVersion::print_version_str(
1201item.value_.ptr(), item.value_.capacity(),
1202current_data_version);
1203if (pos <= 0) {
1204ret = OB_INVALID_ARGUMENT;
1205LOG_WARN("current_data_version is invalid",
1206KR(ret), K(tenant_id), K(current_data_version));
1207} else if (OB_FAIL(GSCHEMASERVICE.get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
1208LOG_WARN("fail to get schema guard", KR(ret));
1209} else if (OB_FAIL(guard.get_tenant_info(tenant_id, tenant))) {
1210LOG_WARN("fail to get tenant info", KR(ret), K(tenant_id));
1211} else if (OB_ISNULL(tenant)) {
1212ret = OB_TENANT_NOT_EXIST;
1213LOG_WARN("tenant not exist", KR(ret), K(tenant_id));
1214} else if (OB_FAIL(item.tenant_name_.assign(tenant->get_tenant_name()))) {
1215LOG_WARN("fail to assign tenant name", KR(ret), K(tenant_id));
1216} else if (OB_FAIL(item.name_.assign("compatible"))) {
1217LOG_WARN("fail to assign config name", KR(ret), K(tenant_id));
1218} else if (OB_FAIL(arg.items_.push_back(item))) {
1219LOG_WARN("fail to push back item", KR(ret), K(item));
1220} else if (OB_FAIL(common_rpc_proxy_->timeout(timeout).admin_set_config(arg))) {
1221LOG_WARN("fail to set config", KR(ret), K(arg), K(timeout));
1222} else {
1223int64_t start_ts = ObTimeUtility::current_time();
1224while (OB_SUCC(ret)) {
1225if (OB_FAIL(check_stop())) {
1226LOG_WARN("executor should stopped", KR(ret));
1227} else if (ObTimeUtility::current_time() - start_ts >= timeout) {
1228ret = OB_TIMEOUT;
1229LOG_WARN("wait config taking effective failed",
1230KR(ret), K(tenant_id), K(timeout));
1231} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
1232LOG_WARN("fail to get min data version", KR(ret), K(tenant_id));
1233} else if (data_version >= current_data_version) {
1234LOG_INFO("[UPGRADE] config take effective", K(tenant_id),
1235"cost", ObTimeUtility::current_time() - start_ts);
1236break;
1237} else {
1238LOG_INFO("[UPGRADE] config doesn't take effective", K(tenant_id));
1239usleep(1 * 1000 * 1000L); // 1s
1240}
1241}
1242}
1243} // end HEAP_VAR
1244}
1245}
1246}
1247return ret;
1248}
1249
1250int ObUpgradeExecutor::run_upgrade_all_(
1251const common::ObIArray<uint64_t> &tenant_ids)
1252{
1253int ret = OB_SUCCESS;
1254int64_t start_ts = ObTimeUtility::current_time();
1255FLOG_INFO("[UPGRADE] start to run upgrade all action");
1256if (OB_FAIL(check_inner_stat_())) {
1257LOG_WARN("fail to check inner stat", KR(ret));
1258} else if (OB_FAIL(check_stop())) {
1259LOG_WARN("executor should stopped", KR(ret));
1260} else if (OB_FAIL(run_upgrade_begin_action_(tenant_ids))) {
1261LOG_WARN("fail to run upgrade begin job", KR(ret));
1262} else if (OB_FAIL(run_upgrade_system_variable_job_(tenant_ids))) {
1263LOG_WARN("fail to run upgrade system variable job", KR(ret));
1264} else if (OB_FAIL(run_upgrade_system_table_job_(tenant_ids))) {
1265LOG_WARN("fail to run upgrade system table job", KR(ret));
1266} else if (OB_FAIL(run_upgrade_virtual_schema_job_(tenant_ids))) {
1267LOG_WARN("fail to run upgrade virtual schema job", KR(ret));
1268} else if (has_exist_in_array(tenant_ids, OB_SYS_TENANT_ID)
1269&& OB_FAIL(run_upgrade_system_package_job_())) {
1270LOG_WARN("fail to run upgrade system package job", KR(ret));
1271} else if (OB_FAIL(run_upgrade_all_post_action_(tenant_ids))) {
1272LOG_WARN("fail to run upgrade all post action", KR(ret));
1273} else if (OB_FAIL(run_upgrade_inspection_job_(tenant_ids))) {
1274LOG_WARN("fail to run upgrade inspection job", KR(ret));
1275} else if (OB_FAIL(run_upgrade_end_action_(tenant_ids))) {
1276LOG_WARN("fail to run upgrade end job", KR(ret));
1277}
1278FLOG_INFO("[UPGRADE] finish run upgrade all action",
1279KR(ret), "cost", ObTimeUtility::current_time() - start_ts);
1280return ret;
1281}
1282
1283int ObUpgradeExecutor::construct_tenant_ids_(
1284const common::ObIArray<uint64_t> &src_tenant_ids,
1285common::ObIArray<uint64_t> &dst_tenant_ids)
1286{
1287int ret = OB_SUCCESS;
1288ObArray<uint64_t> standby_tenants;
1289ObTenantRole tenant_role(share::ObTenantRole::INVALID_TENANT);
1290ObSchemaGetterGuard schema_guard;
1291if (OB_FAIL(check_inner_stat_())) {
1292LOG_WARN("fail to check inner stat", KR(ret));
1293} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
1294LOG_WARN("fail to get sys tenant schema guard", KR(ret));
1295} else if (src_tenant_ids.count() > 0) {
1296for (int64_t i = 0; OB_SUCC(ret) && i < src_tenant_ids.count(); i++) {
1297const uint64_t tenant_id = src_tenant_ids.at(i);
1298const ObSimpleTenantSchema *tenant_schema = nullptr;
1299if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
1300LOG_WARN("fail to get tenant info", KR(ret), K(tenant_id));
1301} else if (OB_ISNULL(tenant_schema)) {
1302ret = OB_ERR_UNEXPECTED;
1303LOG_WARN("tenant schema is null", KR(ret), KP(tenant_schema));
1304} else if (!tenant_schema->is_normal()) {
1305ret = OB_NOT_SUPPORTED;
1306LOG_WARN("tenant is not normal, can not do upgrade", KR(ret), K(tenant_id), KPC(tenant_schema));
1307} else if (OB_FAIL(ObAllTenantInfoProxy::get_tenant_role(sql_proxy_, tenant_id, tenant_role))) {
1308LOG_WARN("fail to get tenant role", KR(ret), K(tenant_id), K(tenant_role));
1309} else if (!tenant_role.is_primary()) {
1310ret = OB_NOT_SUPPORTED;
1311LOG_WARN("not support to upgrade a non-primary tenant", KR(ret), K(tenant_id), K(tenant_role));
1312}
1313} // end for
1314// tenant_list is specified
1315if (FAILEDx(dst_tenant_ids.assign(src_tenant_ids))) {
1316LOG_WARN("fail to assign tenant_ids", KR(ret));
1317}
1318} else {
1319ObArray<uint64_t> tenant_ids;
1320if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) {
1321LOG_WARN("fail to get tenant_ids", KR(ret));
1322}
1323for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); i++) {
1324const uint64_t tenant_id = tenant_ids.at(i);
1325const ObSimpleTenantSchema *tenant_schema = nullptr;
1326if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
1327LOG_WARN("fail to get tenant info", KR(ret), K(tenant_id));
1328} else if (OB_ISNULL(tenant_schema)) {
1329ret = OB_ERR_UNEXPECTED;
1330LOG_WARN("tenant schema is null", KR(ret), KP(tenant_schema));
1331} else if (!tenant_schema->is_normal()) {
1332ret = OB_NOT_SUPPORTED;
1333LOG_WARN("tenant is not normal, can not do upgrade", KR(ret), K(tenant_id), KPC(tenant_schema));
1334} else if (OB_FAIL(ObAllTenantInfoProxy::get_tenant_role(sql_proxy_, tenant_id, tenant_role))) {
1335LOG_WARN("fail to get tenant role", KR(ret), K(tenant_id), K(tenant_role));
1336} else if (tenant_role.is_standby()) {
1337// skip
1338} else if (!tenant_role.is_primary()) {
1339ret = OB_NOT_SUPPORTED;
1340LOG_WARN("not support do upgrade with tenant role is neither primary nor standby",
1341KR(ret), K(tenant_id), K(tenant_role));
1342} else if (OB_FAIL(dst_tenant_ids.push_back(tenant_id))) {
1343LOG_WARN("fail to push back tenant_id", KR(ret), K(tenant_id));
1344}
1345} // end for
1346}
1347return ret;
1348}
1349
1350ObRsJobType ObUpgradeExecutor::convert_to_job_type_(
1351const obrpc::ObUpgradeJobArg::Action &action)
1352{
1353ObRsJobType job_type = JOB_TYPE_INVALID;
1354switch (action) {
1355case obrpc::ObUpgradeJobArg::UPGRADE_POST_ACTION: {
1356job_type = ObRsJobType::JOB_TYPE_UPGRADE_POST_ACTION;
1357break;
1358}
1359case obrpc::ObUpgradeJobArg::UPGRADE_BEGIN: {
1360job_type = ObRsJobType::JOB_TYPE_UPGRADE_BEGIN;
1361break;
1362}
1363case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE: {
1364job_type = ObRsJobType::JOB_TYPE_UPGRADE_SYSTEM_VARIABLE;
1365break;
1366}
1367case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE: {
1368job_type = ObRsJobType::JOB_TYPE_UPGRADE_SYSTEM_TABLE;
1369break;
1370}
1371case obrpc::ObUpgradeJobArg::UPGRADE_VIRTUAL_SCHEMA: {
1372job_type = ObRsJobType::JOB_TYPE_UPGRADE_VIRTUAL_SCHEMA;
1373break;
1374}
1375case obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_PACKAGE: {
1376job_type = ObRsJobType::JOB_TYPE_UPGRADE_SYSTEM_PACKAGE;
1377break;
1378}
1379case obrpc::ObUpgradeJobArg::UPGRADE_ALL_POST_ACTION: {
1380job_type = ObRsJobType::JOB_TYPE_UPGRADE_ALL_POST_ACTION;
1381break;
1382}
1383case obrpc::ObUpgradeJobArg::UPGRADE_INSPECTION: {
1384job_type = ObRsJobType::JOB_TYPE_UPGRADE_INSPECTION;
1385break;
1386}
1387case obrpc::ObUpgradeJobArg::UPGRADE_END: {
1388job_type = ObRsJobType::JOB_TYPE_UPGRADE_END;
1389break;
1390}
1391case obrpc::ObUpgradeJobArg::UPGRADE_ALL: {
1392job_type = ObRsJobType::JOB_TYPE_UPGRADE_ALL;
1393break;
1394}
1395default: {
1396job_type = JOB_TYPE_INVALID;
1397break;
1398}
1399}
1400return job_type;
1401}
1402
1403}//end rootserver
1404}//end oceanbase
1405