oceanbase

Форк
0
/
ob_system_admin_util.cpp 
2255 строк · 89.0 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#include "io/easy_connection.h"
14
#include "lib/list/ob_dlist.h"
15
#include "lib/ob_errno.h"
16
#include "lib/string/ob_string_holder.h"
17
#include "logservice/leader_coordinator/failure_event.h"
18
#include "logservice/leader_coordinator/ob_failure_detector.h"
19
#include "share/inner_table/ob_inner_table_schema_constants.h"
20
#include "share/ob_table_access_helper.h"
21
#include "share/rc/ob_tenant_base.h"
22
#define USING_LOG_PREFIX RS
23

24
#include "ob_system_admin_util.h"
25

26
#include "lib/time/ob_time_utility.h"
27
#include "lib/container/ob_array_iterator.h"
28
#include "share/ob_srv_rpc_proxy.h"
29
#include "share/ob_rpc_struct.h"
30
#include "share/schema/ob_schema_getter_guard.h"
31
#include "share/schema/ob_multi_version_schema_service.h"
32
#include "share/config/ob_server_config.h"
33
#include "share/config/ob_config_manager.h"
34
#include "share/ob_dml_sql_splicer.h"
35
#include "share/ob_cluster_version.h"
36
#include "share/ob_upgrade_utils.h"
37
#include "share/ob_share_util.h" // ObShareUtil
38
#include "storage/ob_file_system_router.h"
39
#include "observer/ob_server_struct.h"
40
#include "observer/omt/ob_tenant_config_mgr.h"
41
#include "observer/omt/ob_multi_tenant.h"
42
#include "observer/ob_srv_network_frame.h"
43
#include "ob_server_manager.h"
44
#include "ob_ddl_operator.h"
45
#include "ob_zone_manager.h"
46
#include "ob_ddl_service.h"
47
#include "ob_unit_manager.h"
48
#include "ob_root_inspection.h"
49
#include "ob_root_service.h"
50
#include "storage/ob_file_system_router.h"
51
#include "logservice/leader_coordinator/table_accessor.h"
52
#include "rootserver/freeze/ob_major_freeze_helper.h"
53
#include "share/ob_cluster_event_history_table_operator.h"//CLUSTER_EVENT_INSTANCE
54
#include "observer/ob_service.h"
55
namespace oceanbase
56
{
57
using namespace common;
58
using namespace common::hash;
59
using namespace share;
60
using namespace share::schema;
61
using namespace obrpc;
62

63
namespace rootserver
64
{
65

66
int ObSystemAdminUtil::check_service() const
67
{
68
  int ret = OB_SUCCESS;
69
  if (!ctx_.is_inited()) {
70
    ret = OB_NOT_INIT;
71
    LOG_WARN("not init", KR(ret));
72
  } else {
73
    ret = ctx_.rs_status_->in_service()? OB_SUCCESS : OB_CANCELED;
74
  }
75
  return ret;
76
}
77

78
int ObAdminSwitchReplicaRole::execute(const ObAdminSwitchReplicaRoleArg &arg)
79
{
80
  LOG_INFO("execute switch replica role request", K(arg));
81
  ObArenaAllocator allocator(ObModIds::OB_RS_PARTITION_TABLE_TEMP);
82
  int ret = OB_SUCCESS;
83
  const ObLSID ls_id(arg.ls_id_);
84
  uint64_t tenant_id = OB_INVALID_TENANT_ID;
85
  ObLSInfo ls_info;
86
  auto get_tenant_id_by_name = [this](const ObAdminSwitchReplicaRoleArg &arg, uint64_t &tenant_id) -> int {
87
    int ret = OB_SUCCESS;
88
    ObSchemaGetterGuard schema_guard;
89
    ObString tenant_name;
90
    tenant_name.assign_ptr(arg.tenant_name_.ptr(),
91
        static_cast<int32_t>(strlen(arg.tenant_name_.ptr())));
92
    if (tenant_name.empty()) {
93
      tenant_id = OB_INVALID_TENANT_ID;
94
    } else if (OB_FAIL(ctx_.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
95
      LOG_WARN("get schema manager failed", KR(ret));
96
    } else if (OB_FAIL(schema_guard.get_tenant_id(tenant_name, tenant_id))
97
        || OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
98
      ret = OB_TENANT_NOT_EXIST;
99
      LOG_WARN("tenant not exist", K(tenant_name), KR(ret));
100
    }
101
    return ret;
102
  };
103
  auto check_server_valid = [](const ObAddr &server) -> int {
104
    int ret = OB_SUCCESS;
105
    const char *columns[1] = {"status"};
106
    constexpr int64_t buffer_size = 128;
107
    char where_condition[buffer_size] = {0};
108
    char ip_str_buffer[buffer_size] = {0};
109
    if (!server.ip_to_string(ip_str_buffer, buffer_size)) {
110
      ret = OB_ERR_UNEXPECTED;
111
      LOG_WARN("ip to string failed", K(ip_str_buffer), K(server));
112
    } else {
113
      if (OB_FAIL(databuff_printf(where_condition, buffer_size, "where svr_ip='%s' and svr_port=%d", ip_str_buffer, server.get_port()))) {
114
        LOG_WARN("fail to create where confition", K(ip_str_buffer), K(server));
115
      } else {
116
        ObStringHolder server_status;
117
        if (OB_FAIL(ObTableAccessHelper::read_single_row(OB_SYS_TENANT_ID, columns, OB_ALL_SERVER_TNAME, where_condition, server_status))) {
118
          if (OB_ITER_END == ret) {
119
            ret = OB_ENTRY_NOT_EXIST;
120
            LOG_USER_ERROR(OB_ENTRY_NOT_EXIST, "server not in cluster");
121
            LOG_WARN("server not in __all_server table", K(server), KR(ret));
122
          } else {
123
            LOG_WARN("fail to read all_server table", K(server), KR(ret));
124
          }
125
        } else if (server_status.get_ob_string().compare("ACTIVE") != 0) {
126
          ret = OB_OP_NOT_ALLOW;
127
          LOG_USER_ERROR(OB_OP_NOT_ALLOW, "server not active");
128
          LOG_WARN("server status not valid", K(server), K(server_status));
129
        }
130
      }
131
    }
132
    return ret;
133
  };
134
  auto update_ls_election_reference_info_table = [](const ObAdminSwitchReplicaRoleArg &arg, const int64_t tenant_id, const ObLSInfo &info) -> int {
135
    int ret = OB_SUCCESS;
136
    ObSwitchLeaderArg switch_leader_arg(arg.ls_id_, arg.role_, tenant_id, arg.server_);
137
    const ObLSReplica *ls_replica = nullptr;
138
    if (switch_leader_arg.ls_id_ < 0 ||
139
        OB_INVALID_TENANT_ID == switch_leader_arg.tenant_id_ ||
140
        !switch_leader_arg.dest_server_.is_valid()) {
141
      ret = OB_INVALID_ARGUMENT;
142
      LOG_WARN("invalid argument", KR(ret), K(switch_leader_arg));
143
    } else if (switch_leader_arg.role_ == ObRole::LEADER) {
144
      logservice::coordinator::LsElectionReferenceInfoRow row(tenant_id, share::ObLSID(arg.ls_id_));
145
      if (OB_FAIL(row.change_manual_leader(arg.server_))) {
146
        LOG_WARN("fail to change manual leader in __all_ls_election_reference_info", K(ret), K(arg));
147
      } else {
148
        LOG_INFO("successfully to change manual leader in __all_ls_election_reference_info", K(ret), K(arg));
149
      }
150
    } else if (switch_leader_arg.role_ == ObRole::FOLLOWER) {
151
      logservice::coordinator::LsElectionReferenceInfoRow row(tenant_id, share::ObLSID(arg.ls_id_));
152
      if (OB_FAIL(row.add_server_to_blacklist(arg.server_, logservice::coordinator::InsertElectionBlacklistReason::SWITCH_REPLICA))) {
153
        LOG_WARN("fail to add remove member info in __all_ls_election_reference_info", K(ret), K(arg));
154
      } else {
155
        LOG_INFO("successfully to add remove member info in __all_ls_election_reference_info", K(ret), K(arg));
156
      }
157
    } else if (switch_leader_arg.role_ == ObRole::INVALID_ROLE) {
158
      logservice::coordinator::LsElectionReferenceInfoRow row(tenant_id, share::ObLSID(arg.ls_id_));
159
      if (OB_FAIL(row.change_manual_leader(ObAddr()))) {
160
        LOG_WARN("fail to change manual leader in __all_ls_election_reference_info", K(ret), K(arg));
161
      } else if (OB_FAIL(row.delete_server_from_blacklist(arg.server_))) {
162
        if (OB_ENTRY_NOT_EXIST != ret) {
163
          LOG_WARN("fail to del remove member info in __all_ls_election_reference_info", K(ret), K(arg));
164
        } else {
165
          ret = OB_SUCCESS;
166
        }
167
      }
168
      if (OB_SUCC(ret)) {
169
        LOG_INFO("successfully to reset server status in __all_ls_election_reference_info", K(ret), K(arg));
170
      }
171
    }
172
    return ret;
173
  };
174
  if (!ctx_.is_inited()) {
175
    ret = OB_NOT_INIT;
176
    LOG_WARN("not init", KR(ret));
177
  } else if (!arg.is_valid()) {
178
    ret = OB_INVALID_ARGUMENT;
179
    LOG_WARN("invalid arg", K(arg), KR(ret));
180
  } else if (!ls_id.is_valid()) {// 表示需要改变server上或者zone中所有日志流的状态
181
    ret = OB_NOT_SUPPORTED;
182
    LOG_USER_ERROR(OB_NOT_SUPPORTED, "switch server's role or zone's role");
183
  } else if (!arg.server_.is_valid()) {
184
    ret = OB_INVALID_ARGUMENT;
185
    LOG_WARN("server must set", K(arg), KR(ret));
186
  } else if (OB_FAIL(check_server_valid(arg.server_))) {
187
    LOG_WARN("check server valid state failed", K(arg), KR(ret));
188
  } else if (OB_FAIL(get_tenant_id_by_name(arg, tenant_id))) {
189
    if (OB_ENTRY_NOT_EXIST == ret) {
190
      LOG_USER_ERROR(OB_ENTRY_NOT_EXIST, "invalid tenant");
191
    }
192
    LOG_WARN("fail to convert tenant name to id", K(arg), KR(ret));
193
  } else if (OB_ISNULL(GCTX.lst_operator_)) {
194
    ret = OB_ERR_UNEXPECTED;
195
    LOG_WARN("GCTX.lst_operator_ is NULL", K(arg), KR(ret), K(tenant_id));
196
  } else if (OB_FAIL(GCTX.lst_operator_->get(GCONF.cluster_id, tenant_id,
197
                     ls_id, share::ObLSTable::DEFAULT_MODE, ls_info))) {
198
    LOG_WARN("get ls info from GCTX.lst_operator_ failed", K(arg), KR(ret), K(tenant_id));
199
  } else if (OB_FAIL(update_ls_election_reference_info_table(arg, tenant_id, ls_info))) {
200
    LOG_WARN("fail to update ls election reference info", K(arg), KR(ret), K(tenant_id));
201
  } else {
202
    int tmp_ret = OB_SUCCESS;//ignore ret
203
    if (OB_TMP_FAIL(ObRootUtils::try_notify_switch_ls_leader(ctx_.rpc_proxy_, ls_info,
204
          obrpc::ObNotifySwitchLeaderArg::SwitchLeaderComment::MANUAL_SWITCH))) {
205
      LOG_WARN("failed to notify switch ls leader", KR(ret), K(ls_info));
206
    }
207
  }
208
  LOG_INFO("switch leader done", KR(ret), K(arg), K(tenant_id), K(ls_info));
209
  return ret;
210
}
211

212
int ObAdminSwitchReplicaRole::alloc_tenant_id_set(common::hash::ObHashSet<uint64_t> &tenant_id_set)
213
{
214
  int ret = OB_SUCCESS;
215
  if (tenant_id_set.created()) {
216
    if(OB_FAIL(tenant_id_set.clear())) {
217
      LOG_WARN("clear tenant id set failed", KR(ret));
218
    }
219
  } else if (OB_FAIL(tenant_id_set.create(TENANT_BUCKET_NUM))) {
220
    LOG_WARN("create tenant id set failed", LITERAL_K(TENANT_BUCKET_NUM), KR(ret));
221
  }
222
  return ret;
223
}
224

225
template<typename T>
226
int ObAdminSwitchReplicaRole::convert_set_to_array(const common::hash::ObHashSet<T> &set,
227
    ObArray<T> &array)
228
{
229
  int ret = common::OB_SUCCESS;
230
  array.reuse();
231
  if (!set.created()) {
232
    ret = OB_INVALID_ARGUMENT;
233
    LOG_WARN("set not created", "set created", set.created(), KR(ret));
234
  } else if (OB_FAIL(array.reserve(set.size()))) {
235
    LOG_WARN("array reserver failed", "capacity", set.size(), KR(ret));
236
  } else {
237
    for (typename common::hash::ObHashSet<T>::const_iterator iter = set.begin();
238
        OB_SUCCESS == ret && iter != set.end(); ++iter) {
239
      if (OB_FAIL(array.push_back(iter->first))) {
240
        LOG_WARN("push_back failed", KR(ret));
241
      }
242
    }
243
  }
244
  return ret;
245
}
246

247
int ObAdminSwitchReplicaRole::get_tenants_of_zone(const ObZone &zone,
248
    common::hash::ObHashSet<uint64_t> &tenant_id_set)
249
{
250
  int ret = OB_SUCCESS;
251
  ObArray<ObAddr> server_array;
252
  if (!ctx_.is_inited()) {
253
    ret = OB_NOT_INIT;
254
    LOG_WARN("not init", KR(ret));
255
  } else if (zone.is_empty() || !tenant_id_set.created()) {
256
    ret = OB_INVALID_ARGUMENT;
257
    LOG_WARN("invalid argument", K(zone),
258
        "tenant_id_set created", tenant_id_set.created(), KR(ret));
259
  } else if (OB_FAIL(SVR_TRACER.get_alive_servers(zone, server_array))) {
260
    LOG_WARN("get alive servers failed", K(zone), KR(ret));
261
  } else {
262
    FOREACH_CNT_X(server, server_array, OB_SUCCESS == ret) {
263
      if (OB_FAIL(ctx_.unit_mgr_->get_tenants_of_server(*server, tenant_id_set))) {
264
        LOG_WARN("get tenants of server failed", "server", *server, KR(ret));
265
      }
266
    }
267
  }
268

269
  return ret;
270
}
271

272
int ObAdminSwitchReplicaRole::get_switch_replica_tenants(const ObZone &zone, const ObAddr &server,
273
    const uint64_t &tenant_id, ObArray<uint64_t> &tenant_ids)
274
{
275
  int ret = OB_SUCCESS;
276
  if (!ctx_.is_inited()) {
277
    ret = OB_NOT_INIT;
278
    LOG_WARN("not init", KR(ret));
279
  } else if (zone.is_empty() && !server.is_valid() && OB_INVALID_ID == tenant_id) {
280
    ret = OB_INVALID_ARGUMENT;
281
    LOG_WARN("zone, server and tenant_id are all invalid",
282
        K(zone), K(server), K(tenant_id), KR(ret));
283
  } else if (OB_INVALID_ID != tenant_id) {
284
    if (OB_FAIL(tenant_ids.push_back(tenant_id))) {
285
      LOG_WARN("push back tenant id failed", KR(ret));
286
    }
287
  } else if (server.is_valid() || !zone.is_empty()) {
288
    ObHashSet<uint64_t> tenant_id_set;
289
    if (OB_FAIL(alloc_tenant_id_set(tenant_id_set))) {
290
      LOG_WARN("alloc tenant id set failed", KR(ret));
291
    } else {
292
      if (server.is_valid()) {
293
        if (OB_FAIL(ctx_.unit_mgr_->get_tenants_of_server(server, tenant_id_set))) {
294
          LOG_WARN("get tenants of server failed", K(server), KR(ret));
295
        }
296
      } else {
297
        if (OB_FAIL(get_tenants_of_zone(zone, tenant_id_set))) {
298
          LOG_WARN("get tenants of zone failed", K(zone), KR(ret));
299
        }
300
      }
301
    }
302
    if (OB_SUCC(ret)) {
303
      if (OB_FAIL(convert_set_to_array(tenant_id_set, tenant_ids))) {
304
        LOG_WARN("convert set to array failed", KR(ret));
305
      }
306
    }
307
  }
308

309
  return ret;
310
}
311

312
int ObAdminCallServer::get_server_list(const ObServerZoneArg &arg, ObIArray<ObAddr> &server_list)
313
{
314
  int ret = OB_SUCCESS;
315
  server_list.reset();
316
  if (!ctx_.is_inited()) {
317
    ret = OB_NOT_INIT;
318
    LOG_WARN("not init", KR(ret));
319
  } else if (!arg.is_valid()) {
320
    ret = OB_INVALID_ARGUMENT;
321
    LOG_WARN("invalid arg", K(arg), KR(ret));
322
  } else if (arg.server_.is_valid()) {
323
    bool is_alive = false;
324
    if (OB_FAIL(SVR_TRACER.check_server_alive(arg.server_, is_alive))) {
325
      LOG_WARN("fail to check server alive", KR(ret), "server", arg.server_);
326
    } else if (!is_alive) {
327
      ret = OB_INVALID_ARGUMENT;
328
      LOG_WARN("server is not alive", KR(ret), "server", arg.server_);
329
    } else if (OB_FAIL(server_list.push_back(arg.server_))) {
330
      LOG_WARN("push back server failed", KR(ret));
331
    }
332
  } else {
333
    bool zone_exist = true;
334
    if (!arg.zone_.is_empty() && OB_FAIL(ctx_.zone_mgr_->check_zone_exist(arg.zone_, zone_exist))) {
335
      LOG_WARN("fail to check zone exist", KR(ret));
336
    } else if (!zone_exist) {
337
      ret = OB_ZONE_INFO_NOT_EXIST;
338
      LOG_WARN("zone info not exist", KR(ret), K(arg.zone_));
339
    } else if (OB_FAIL(SVR_TRACER.get_alive_servers(arg.zone_, server_list))) {
340
      LOG_WARN("get alive servers failed", KR(ret), K(arg));
341
    }
342
  }
343
  return ret;
344
}
345

346
int ObAdminCallServer::call_all(const ObServerZoneArg &arg)
347
{
348
  int ret = OB_SUCCESS;
349
  ObArray<ObAddr> server_list;
350
  if (OB_FAIL(get_server_list(arg, server_list))) {
351
    LOG_WARN("get server list failed", K(ret), K(arg));
352
  } else {
353
    FOREACH_CNT(server, server_list) {
354
      int tmp_ret = call_server(*server);
355
      if (OB_SUCCESS != tmp_ret) {
356
        LOG_WARN("call server failed", KR(ret), "server", *server);
357
        ret = OB_SUCCESS == ret ? tmp_ret : ret;
358
      }
359
    }
360
  }
361
  return ret;
362
}
363

364
int ObAdminReportReplica::execute(const obrpc::ObAdminReportReplicaArg &arg)
365
{
366
  LOG_INFO("execute report request", K(arg));
367
  int ret = OB_SUCCESS;
368
  if (!ctx_.is_inited()) {
369
    ret = OB_NOT_INIT;
370
    LOG_WARN("not init", KR(ret));
371
  } else if (!arg.is_valid()) {
372
    ret = OB_INVALID_ARGUMENT;
373
    LOG_WARN("invalid arg", K(arg), KR(ret));
374
  } else if (OB_FAIL(call_all(arg))) {
375
    LOG_WARN("execute report replica failed", KR(ret), K(arg));
376
  }
377
  return ret;
378
}
379

380
int ObAdminReportReplica::call_server(const ObAddr &server)
381
{
382
  int ret = OB_SUCCESS;
383
  if (!ctx_.is_inited()) {
384
    ret = OB_NOT_INIT;
385
    LOG_WARN("not init", KR(ret));
386
  } else if (!server.is_valid()) {
387
    ret = OB_INVALID_ARGUMENT;
388
    LOG_WARN("invalid server", K(server), KR(ret));
389
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server).report_replica())) {
390
    LOG_WARN("request server report replica failed", KR(ret), K(server));
391
  }
392
  return ret;
393
}
394

395
int ObAdminRecycleReplica::execute(const obrpc::ObAdminRecycleReplicaArg &arg)
396
{
397
  LOG_INFO("execute recycle request", K(arg));
398
  int ret = OB_SUCCESS;
399
  if (!ctx_.is_inited()) {
400
    ret = OB_NOT_INIT;
401
    LOG_WARN("not init", KR(ret));
402
  } else if (!arg.is_valid()) {
403
    ret = OB_INVALID_ARGUMENT;
404
    LOG_WARN("invalid arg", K(arg), KR(ret));
405
  } else if (OB_FAIL(call_all(arg))) {
406
    LOG_WARN("execute recycle replica failed", KR(ret), K(arg));
407
  }
408
  return ret;
409
}
410

411
int ObAdminRecycleReplica::call_server(const ObAddr &server)
412
{
413
  int ret = OB_SUCCESS;
414
  if (!ctx_.is_inited()) {
415
    ret = OB_NOT_INIT;
416
    LOG_WARN("not init", KR(ret));
417
  } else if (!server.is_valid()) {
418
    ret = OB_INVALID_ARGUMENT;
419
    LOG_WARN("invalid server", K(server), KR(ret));
420
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server).recycle_replica())) {
421
    LOG_WARN("request server recycle replica failed", KR(ret), K(server));
422
  }
423
  return ret;
424
}
425

426
int ObAdminClearLocationCache::execute(const obrpc::ObAdminClearLocationCacheArg &arg)
427
{
428
  LOG_INFO("execute clear location cache request", K(arg));
429
  int ret = OB_SUCCESS;
430
  if (!ctx_.is_inited()) {
431
    ret = OB_NOT_INIT;
432
    LOG_WARN("not init", KR(ret));
433
  } else if (!arg.is_valid()) {
434
    ret = OB_INVALID_ARGUMENT;
435
    LOG_WARN("invalid arg", K(arg), KR(ret));
436
  } else if (OB_FAIL(call_all(arg))) {
437
    LOG_WARN("execute clear location cache failed", KR(ret), K(arg));
438
  }
439
  return ret;
440
}
441

442
int ObAdminClearLocationCache::call_server(const ObAddr &server)
443
{
444
  int ret = OB_SUCCESS;
445
  if (!ctx_.is_inited()) {
446
    ret = OB_NOT_INIT;
447
    LOG_WARN("not init", KR(ret));
448
  } else if (!server.is_valid()) {
449
    ret = OB_INVALID_ARGUMENT;
450
    LOG_WARN("invalid server", K(server), KR(ret));
451
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server).clear_location_cache())) {
452
    LOG_WARN("request clear location cache failed", KR(ret), K(server));
453
  }
454
  return ret;
455
}
456

457
int ObAdminReloadUnit::execute()
458
{
459
  LOG_INFO("execute reload unit request");
460
  int ret = OB_SUCCESS;
461
  if (!ctx_.is_inited()) {
462
    ret = OB_NOT_INIT;
463
    LOG_WARN("not init", KR(ret));
464
  } else if (OB_FAIL(ctx_.unit_mgr_->load())) {
465
    LOG_WARN("unit manager load failed", KR(ret));
466
  }
467
  LOG_INFO("finish execute reload unit request", KR(ret));
468
  return ret;
469
}
470

471
int ObAdminReloadServer::execute()
472
{
473
  LOG_INFO("execute reload server request");
474
  int ret = OB_SUCCESS;
475
  if (!ctx_.is_inited()) {
476
    ret = OB_NOT_INIT;
477
    LOG_WARN("not init", KR(ret));
478
  } else if (OB_ISNULL(ctx_.server_mgr_)) {
479
    ret = OB_ERR_UNEXPECTED;
480
    LOG_WARN("ctx_.server_mgr_ is null", KR(ret), KP(ctx_.server_mgr_));
481
  } else if (OB_FAIL(ctx_.server_mgr_->load_server_manager())) {
482
    LOG_WARN("build server status failed", KR(ret));
483
  }
484
  return ret;
485
}
486

487
int ObAdminReloadZone::execute()
488
{
489
  LOG_INFO("execute reload zone request");
490
  int ret = OB_SUCCESS;
491
  if (!ctx_.is_inited()) {
492
    ret = OB_NOT_INIT;
493
    LOG_WARN("not init", KR(ret));
494
  } else if (OB_FAIL(ctx_.zone_mgr_->reload())) {
495
    LOG_ERROR("zone manager reload failed", KR(ret));
496
  }
497
  return ret;
498
}
499

500
int ObAdminClearMergeError::execute(const obrpc::ObAdminMergeArg &arg)
501
{
502
  LOG_INFO("execute clear merge error request", K(arg));
503
  int ret = OB_SUCCESS;
504
  if (!ctx_.is_inited()) {
505
    ret = OB_NOT_INIT;
506
    LOG_WARN("not init", KR(ret));
507
  } else if (!arg.is_valid()) {
508
    ret = OB_INVALID_ARGUMENT;
509
    LOG_WARN("invalid arg", K(arg), KR(ret));
510
  } else {
511
    ObTenantAdminMergeParam param;
512
    param.transport_ = GCTX.net_frame_->get_req_transport();
513
    if (arg.affect_all_ || arg.affect_all_user_ || arg.affect_all_meta_) {
514
      if ((true == arg.affect_all_ && true == arg.affect_all_user_) ||
515
          (true == arg.affect_all_ && true == arg.affect_all_meta_) ||
516
          (true == arg.affect_all_user_ && true == arg.affect_all_meta_)) {
517
        ret = OB_ERR_UNEXPECTED;
518
        LOG_WARN("only one of affect_all,affect_all_user,affect_all_meta can be true",
519
                 KR(ret), "affect_all", arg.affect_all_, "affect_all_user",
520
                 arg.affect_all_user_, "affect_all_meta", arg.affect_all_meta_);
521
      } else {
522
        if (arg.affect_all_) {
523
          param.need_all_ = true;
524
        } else if (arg.affect_all_user_) {
525
          param.need_all_user_ = true;
526
        } else {
527
          param.need_all_meta_ = true;
528
        }
529
      }
530
    } else if (OB_FAIL(param.tenant_array_.assign(arg.tenant_ids_))) {
531
      LOG_WARN("fail to assign tenant_ids", KR(ret), K(arg));
532
    }
533
    if (FAILEDx(ObMajorFreezeHelper::clear_merge_error(param))) {
534
      LOG_WARN("fail to clear merge error", KR(ret), K(param));
535
    }
536
  }
537
  return ret;
538
}
539

540
int ObAdminZoneFastRecovery::execute(const obrpc::ObAdminRecoveryArg &arg)
541
{
542
  LOG_INFO("execute zone fast recovery admin request", K(arg));
543
  int ret = OB_SUCCESS;
544
  if (!ctx_.is_inited()) {
545
    ret = OB_NOT_INIT;
546
    LOG_WARN("not init", KR(ret));
547
  } else if (!arg.is_valid()) {
548
    ret = OB_INVALID_ARGUMENT;
549
    LOG_WARN("invalid arg", K(arg), KR(ret));
550
  } else {
551
    switch (arg.type_) {
552
      case ObAdminRecoveryArg::SUSPEND_RECOVERY:
553
        if (OB_FAIL(ctx_.zone_mgr_->update_recovery_status(
554
                arg.zone_, share::ObZoneInfo::RECOVERY_STATUS_SUSPEND))) {
555
          LOG_WARN("fail to update zone fast recovery status", KR(ret));
556
        }
557
        break;
558
      case ObAdminRecoveryArg::RESUME_RECOVERY:
559
        if (OB_FAIL(ctx_.zone_mgr_->update_recovery_status(
560
                arg.zone_, share::ObZoneInfo::RECOVERY_STATUS_NORMAL))) {
561
          LOG_WARN("fail to update zone fast recovery status", KR(ret));
562
        }
563
        break;
564
      default:
565
        ret = OB_ERR_UNEXPECTED;
566
        LOG_WARN("arg type unexpected", KR(ret), "type", arg.type_);
567
        break;
568
    }
569
  }
570
  return ret;
571
}
572

573
int ObAdminMerge::execute(const obrpc::ObAdminMergeArg &arg)
574
{
575
  LOG_INFO("execute merge admin request", K(arg));
576
  int ret = OB_SUCCESS;
577
  if (!ctx_.is_inited()) {
578
    ret = OB_NOT_INIT;
579
    LOG_WARN("not init", KR(ret));
580
  } else if (!arg.is_valid()) {
581
    ret = OB_INVALID_ARGUMENT;
582
    LOG_WARN("invalid arg", K(arg), KR(ret));
583
  } else {
584
    switch(arg.type_) {
585
      case ObAdminMergeArg::START_MERGE: {
586
        /* if (OB_FAIL(ctx_.daily_merge_scheduler_->manual_start_merge(arg.zone_))) {
587
          LOG_WARN("start merge zone failed", K(ret), K(arg));
588
        }*/
589
        break;
590
      }
591
      case ObAdminMergeArg::SUSPEND_MERGE: {
592
        ObTenantAdminMergeParam param;
593
        param.transport_ = GCTX.net_frame_->get_req_transport();
594
        if (arg.affect_all_ || arg.affect_all_user_ || arg.affect_all_meta_) {
595
          if ((true == arg.affect_all_ && true == arg.affect_all_user_) ||
596
              (true == arg.affect_all_ && true == arg.affect_all_meta_) ||
597
              (true == arg.affect_all_user_ && true == arg.affect_all_meta_)) {
598
            ret = OB_ERR_UNEXPECTED;
599
            LOG_WARN("only one of affect_all,affect_all_user,affect_all_meta can be true",
600
                     KR(ret), "affect_all", arg.affect_all_, "affect_all_user",
601
                     arg.affect_all_user_, "affect_all_meta", arg.affect_all_meta_);
602
          } else {
603
            if (arg.affect_all_) {
604
              param.need_all_ = true;
605
            } else if (arg.affect_all_user_) {
606
              param.need_all_user_ = true;
607
            } else {
608
              param.need_all_meta_ = true;
609
            }
610
          }
611
        } else if (OB_FAIL(param.tenant_array_.assign(arg.tenant_ids_))) {
612
          LOG_WARN("fail to assign tenant_ids", KR(ret), K(arg));
613
        }
614
        if (FAILEDx(ObMajorFreezeHelper::suspend_merge(param))) {
615
          LOG_WARN("fail to suspend merge", KR(ret), K(param));
616
        }
617
        break;
618
      }
619
      case ObAdminMergeArg::RESUME_MERGE: {
620
        ObTenantAdminMergeParam param;
621
        param.transport_ = GCTX.net_frame_->get_req_transport();
622
        if (arg.affect_all_ || arg.affect_all_user_ || arg.affect_all_meta_) {
623
          if ((true == arg.affect_all_ && true == arg.affect_all_user_) ||
624
              (true == arg.affect_all_ && true == arg.affect_all_meta_) ||
625
              (true == arg.affect_all_user_ && true == arg.affect_all_meta_)) {
626
            ret = OB_ERR_UNEXPECTED;
627
            LOG_WARN("only one of affect_all,affect_all_user,affect_all_meta can be true",
628
                     KR(ret), "affect_all", arg.affect_all_, "affect_all_user",
629
                     arg.affect_all_user_, "affect_all_meta", arg.affect_all_meta_);
630
          } else {
631
            if (arg.affect_all_) {
632
              param.need_all_ = true;
633
            } else if (arg.affect_all_user_) {
634
              param.need_all_user_ = true;
635
            } else {
636
              param.need_all_meta_ = true;
637
            }
638
          }
639
        } else if (OB_FAIL(param.tenant_array_.assign(arg.tenant_ids_))) {
640
          LOG_WARN("fail to assign tenant_ids", KR(ret), K(arg));
641
        }
642
        if (FAILEDx(ObMajorFreezeHelper::resume_merge(param))) {
643
          LOG_WARN("fail to resume merge", KR(ret), K(param));
644
        }
645
        break;
646
      }
647
      default: {
648
        ret = OB_NOT_SUPPORTED;
649
        LOG_WARN("unsupported merge admin type", "type", arg.type_, KR(ret));
650
      }
651
    }
652
  }
653
  return ret;
654
}
655

656
int ObAdminClearRoottable::execute(const obrpc::ObAdminClearRoottableArg &arg)
657
{
658
  int ret = OB_NOT_SUPPORTED;
659
  UNUSED(arg);
660
  return ret;
661
}
662

663
//FIXME: flush schemas of all tenants
664
int ObAdminRefreshSchema::execute(const obrpc::ObAdminRefreshSchemaArg &arg)
665
{
666
  LOG_INFO("execute refresh schema", K(arg));
667
  int ret = OB_SUCCESS;
668
  if (!ctx_.is_inited()) {
669
    ret = OB_NOT_INIT;
670
    LOG_WARN("not init", KR(ret));
671
  } else if (!arg.is_valid()) {
672
    ret = OB_INVALID_ARGUMENT;
673
    LOG_WARN("invalid arg", K(arg), KR(ret));
674
  } else if (OB_FAIL(ctx_.ddl_service_->refresh_schema(OB_SYS_TENANT_ID))) {
675
    LOG_WARN("refresh schema failed", KR(ret));
676
  } else {
677
    if (OB_FAIL(ctx_.schema_service_->get_tenant_schema_version(OB_SYS_TENANT_ID, schema_version_))) {
678
      LOG_WARN("fail to get schema version", KR(ret));
679
     } else if (OB_FAIL(ctx_.schema_service_->get_refresh_schema_info(schema_info_))) {
680
       LOG_WARN("fail to get refresh schema info", KR(ret), K(schema_info_));
681
     } else if (!schema_info_.is_valid()) {
682
       schema_info_.set_schema_version(schema_version_);
683
     }
684
     if (OB_FAIL(ret)) {
685
     } else if (OB_FAIL(call_all(arg))) {
686
      LOG_WARN("execute notify refresh schema failed", KR(ret), K(arg));
687
    }
688
  }
689
  return ret;
690
}
691

692
int ObAdminRefreshSchema::call_server(const ObAddr &server)
693
{
694
  int ret = OB_SUCCESS;
695
  ObTimeoutCtx ctx;
696
  if (OB_UNLIKELY(!ctx_.is_inited())) {
697
    ret = OB_NOT_INIT;
698
    LOG_WARN("not init", KR(ret));
699
  } else if (OB_UNLIKELY(!server.is_valid())) {
700
    ret = OB_INVALID_ARGUMENT;
701
    LOG_WARN("invalid server", KR(ret), K(server));
702
  } else if (OB_ISNULL(GCTX.srv_rpc_proxy_)) {
703
    ret = OB_ERR_UNEXPECTED;
704
    LOG_WARN("GCTX.srv_rpc_proxy_ is null", KR(ret));
705
  } else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
706
    LOG_WARN("fail to set timeout ctx", KR(ret));
707
  } else {
708
    ObSwitchSchemaArg arg;
709
    arg.schema_info_ = schema_info_;
710
    ObArray<int> return_code_array;
711
    ObSwitchSchemaProxy proxy(*GCTX.srv_rpc_proxy_, &ObSrvRpcProxy::switch_schema);
712
    int tmp_ret = OB_SUCCESS;
713
    const int64_t timeout_ts = ctx.get_timeout(0);
714
    if (OB_FAIL(proxy.call(server, timeout_ts, arg))) {
715
      LOG_WARN("notify switch schema failed", KR(ret), K(server), K_(schema_version), K_(schema_info));
716
    }
717
    if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
718
      ret = OB_SUCC(ret) ? tmp_ret : ret;
719
      LOG_WARN("fail to wait all", KR(ret), KR(tmp_ret), K(server));
720
    } else if (OB_FAIL(ret)) {
721
    } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) {
722
      LOG_WARN("fail to check return cnt", KR(ret), K(server), "return_cnt", return_code_array.count());
723
    } else if (OB_UNLIKELY(1 != return_code_array.count())) {
724
      ret = OB_ERR_UNEXPECTED;
725
      LOG_WARN("return_code_array count shoud be 1", KR(ret), K(server), "return_cnt", return_code_array.count());
726
    } else {
727
      ret = return_code_array.at(0);
728
    }
729
  }
730
  return ret;
731
}
732

733
int ObAdminRefreshMemStat::execute(const ObAdminRefreshMemStatArg &arg)
734
{
735
  LOG_INFO("execute refresh memory stat");
736
  int ret = OB_SUCCESS;
737
  if (!ctx_.is_inited()) {
738
    ret = OB_NOT_INIT;
739
    LOG_WARN("not init", KR(ret));
740
  } else if (OB_FAIL(call_all(arg))) {
741
   LOG_WARN("execute notify refresh memory stat failed", KR(ret));
742
  }
743
  return ret;
744
}
745

746
int ObAdminRefreshMemStat::call_server(const ObAddr &server)
747
{
748
  int ret = OB_SUCCESS;
749
  if (!ctx_.is_inited()) {
750
    ret = OB_NOT_INIT;
751
    LOG_WARN("not init", KR(ret));
752
  } else if (!server.is_valid()) {
753
    ret = OB_INVALID_ARGUMENT;
754
    LOG_WARN("invalid server", K(server), KR(ret));
755
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server).refresh_memory_stat())) {
756
    LOG_WARN("notify refresh memory stat failed", KR(ret), K(server));
757
  }
758
  return ret;
759
}
760

761
int ObAdminWashMemFragmentation::execute(const ObAdminWashMemFragmentationArg &arg)
762
{
763
  LOG_INFO("execute sync wash fragment");
764
  int ret = OB_SUCCESS;
765
  if (!ctx_.is_inited()) {
766
    ret = OB_NOT_INIT;
767
    LOG_WARN("not init", K(ret));
768
  } else if (OB_FAIL(call_all(arg))) {
769
    LOG_WARN("execute notify sync wash fragment failed", K(ret));
770
  }
771
  return ret;
772
}
773

774
int ObAdminWashMemFragmentation::call_server(const ObAddr &server)
775
{
776
  int ret = OB_SUCCESS;
777
  if (!ctx_.is_inited()) {
778
    ret = OB_NOT_INIT;
779
    LOG_WARN("not init", K(ret));
780
  } else if (!server.is_valid()) {
781
    ret = OB_INVALID_ARGUMENT;
782
    LOG_WARN("invalid server", K(server), K(ret));
783
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server).wash_memory_fragmentation())) {
784
    LOG_WARN("notify sync wash fragment failed", K(ret), K(server));
785
  }
786
  return ret;
787
}
788

789
int ObAdminSetConfig::verify_config(obrpc::ObAdminSetConfigArg &arg)
790
{
791
  int ret = OB_SUCCESS;
792
  void *ptr = nullptr, *cfg_ptr = nullptr;
793
  ObServerConfigChecker *cfg = nullptr;
794
  ObTenantConfigChecker *tenant_cfg = nullptr;
795
  if (!ctx_.is_inited()) {
796
    ret = OB_NOT_INIT;
797
    LOG_WARN("not init", KR(ret));
798
  } else if (!arg.is_valid()) {
799
    ret = OB_INVALID_ARGUMENT;
800
    LOG_WARN("invalid arg", K(arg), KR(ret));
801
  }
802
  FOREACH_X(item, arg.items_, OB_SUCCESS == ret) {
803
    if (item->name_.is_empty()) {
804
      ret = OB_INVALID_ARGUMENT;
805
      LOG_WARN("empty config name", "item", *item, KR(ret));
806
    } else {
807
      ObConfigItem *ci = nullptr;
808
      if (OB_SYS_TENANT_ID != item->exec_tenant_id_ || item->tenant_name_.size() > 0) {
809
        // tenants(user or sys tenants) modify tenant level configuration
810
        item->want_to_set_tenant_config_ = true;
811
        if (nullptr == tenant_cfg) {
812
          if (OB_ISNULL(cfg_ptr = ob_malloc(sizeof(ObTenantConfigChecker),
813
                        ObModIds::OB_RS_PARTITION_TABLE_TEMP))) {
814
            ret = OB_ALLOCATE_MEMORY_FAILED;
815
            LOG_WARN("fail to alloc memory", KR(ret));
816
          } else if (OB_ISNULL(tenant_cfg = new (cfg_ptr) ObTenantConfigChecker())) {
817
            ret = OB_ERR_UNEXPECTED;
818
            LOG_WARN("new tenant_cfg failed", KR(ret));
819
          }
820
        } // if
821

822
        if (OB_SUCC(ret)) {
823
          ObConfigItem * const *ci_ptr = tenant_cfg->get_container().get(
824
                                          ObConfigStringKey(item->name_.ptr()));
825
          if (OB_ISNULL(ci_ptr)) {
826
            ret = OB_ERR_SYS_CONFIG_UNKNOWN;
827
            LOG_WARN("can't found config item", KR(ret), "item", *item);
828
          } else {
829
            ci = *ci_ptr;
830
            share::schema::ObSchemaGetterGuard schema_guard;
831
            const char *const NAME_ALL = "all";
832
            const char *const NAME_ALL_USER = "all_user";
833
            const char *const NAME_ALL_META = "all_meta";
834
            if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(OB_SYS_TENANT_ID, schema_guard))) {
835
              LOG_WARN("get_schema_guard failed", KR(ret));
836
            } else if (OB_SYS_TENANT_ID == item->exec_tenant_id_ &&
837
                      (0 == item->tenant_name_.str().case_compare(NAME_ALL) ||
838
                       0 == item->tenant_name_.str().case_compare(NAME_ALL_USER) ||
839
                       0 == item->tenant_name_.str().case_compare(NAME_ALL_META))) {
840
              common::ObArray<uint64_t> tenant_ids;
841
              if (OB_FAIL(schema_guard.get_tenant_ids(tenant_ids))) {
842
                LOG_WARN("get_tenant_ids failed", KR(ret));
843
              } else {
844
                using FUNC_TYPE = bool (*) (const uint64_t);
845
                FUNC_TYPE condition_func = nullptr;
846
                if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_1_0) {
847
                  if (0 == item->tenant_name_.str().case_compare(NAME_ALL_USER) ||
848
                      0 == item->tenant_name_.str().case_compare(NAME_ALL_META)) {
849
                    ret = OB_NOT_SUPPORTED;
850
                    LOG_WARN("all_user/all_meta are not supported when min_cluster_version is less than 4.2.1.0",
851
                             KR(ret), "tenant_name", item->tenant_name_);
852
                  } else {
853
                    condition_func = is_not_virtual_tenant_id;
854
                  }
855
                } else {
856
                  if (0 == item->tenant_name_.str().case_compare(NAME_ALL) ||
857
                      0 == item->tenant_name_.str().case_compare(NAME_ALL_USER)) {
858
                    condition_func = is_user_tenant;
859
                  } else {
860
                    condition_func = is_meta_tenant;
861
                  }
862
                }
863
                if (OB_SUCC(ret) && (nullptr != condition_func)) {
864
                  for (const uint64_t tenant_id: tenant_ids) {
865
                    if (condition_func(tenant_id) &&
866
                        OB_FAIL(item->tenant_ids_.push_back(tenant_id))) {
867
                      LOG_WARN("add tenant_id failed", K(tenant_id), KR(ret));
868
                      break;
869
                    }
870
                  } // for
871
                }
872
              }
873
            } else if (OB_SYS_TENANT_ID == item->exec_tenant_id_
874
                       && item->tenant_name_ == ObFixedLengthString<common::OB_MAX_TENANT_NAME_LENGTH + 1>("seed")) {
875
              uint64_t tenant_id = OB_PARAMETER_SEED_ID;
876
              if (OB_FAIL(item->tenant_ids_.push_back(tenant_id))) {
877
                LOG_WARN("add seed tenant_id failed", KR(ret));
878
                break;
879
              }
880
            } else {
881
              uint64_t tenant_id = OB_INVALID_TENANT_ID;
882
              if (OB_SYS_TENANT_ID != item->exec_tenant_id_) {
883
                tenant_id = item->exec_tenant_id_;
884
              } else {
885
                if (OB_FAIL(schema_guard.get_tenant_id(
886
                                   ObString(item->tenant_name_.ptr()), tenant_id))
887
                                   || OB_INVALID_ID == tenant_id) {
888
                  ret = OB_ERR_INVALID_TENANT_NAME;
889
                  LOG_WARN("get_tenant_id failed", KR(ret), "tenant", item->tenant_name_);
890
                }
891
              }
892
              if (OB_SUCC(ret) && OB_FAIL(item->tenant_ids_.push_back(tenant_id))) {
893
                LOG_WARN("add tenant_id failed", K(tenant_id), KR(ret));
894
              }
895
            } // else
896
          } // else
897
        } // if
898
      } else {
899
        // sys tenant try to modify configration(cluster level or sys tenant level)
900
        if (nullptr == cfg) {
901
          if (OB_ISNULL(ptr = ob_malloc(sizeof(ObServerConfigChecker),
902
                                      ObModIds::OB_RS_PARTITION_TABLE_TEMP))) {
903
            ret = OB_ALLOCATE_MEMORY_FAILED;
904
            LOG_WARN("fail to alloc memory", KR(ret));
905
          } else if (OB_ISNULL(cfg = new (ptr) ObServerConfigChecker)) {
906
            ret = OB_ERR_UNEXPECTED;
907
            LOG_WARN("new cfg failed", KR(ret));
908
          }
909
        } // if
910
        if (OB_SUCC(ret) && nullptr == tenant_cfg) {
911
          if (OB_ISNULL(cfg_ptr = ob_malloc(sizeof(ObTenantConfigChecker),
912
                        ObModIds::OB_RS_PARTITION_TABLE_TEMP))) {
913
            ret = OB_ALLOCATE_MEMORY_FAILED;
914
            LOG_WARN("fail to alloc memory", KR(ret));
915
          } else if (OB_ISNULL(tenant_cfg = new (cfg_ptr) ObTenantConfigChecker())) {
916
            ret = OB_ERR_UNEXPECTED;
917
            LOG_WARN("new tenant_cfg failed", KR(ret));
918
          }
919
        } // if
920

921
        if (OB_SUCC(ret)) {
922
          ObConfigItem * const *sys_ci_ptr = cfg->get_container().get(
923
                                             ObConfigStringKey(item->name_.ptr()));
924
          ObConfigItem * const *tenant_ci_ptr = tenant_cfg->get_container().get(
925
                                                ObConfigStringKey(item->name_.ptr()));
926
          if (OB_NOT_NULL(sys_ci_ptr)) {
927
            ci = *sys_ci_ptr;
928
          } else if (OB_NOT_NULL(tenant_ci_ptr)) {
929
            ci = *tenant_ci_ptr;
930
            item->want_to_set_tenant_config_ = true;
931
            if (OB_FAIL(item->tenant_ids_.push_back(OB_SYS_TENANT_ID))) {
932
              LOG_WARN("add tenant_id failed", KR(ret));
933
            }
934
          } else {
935
            ret = OB_ERR_SYS_CONFIG_UNKNOWN;
936
            LOG_WARN("can't found config item", KR(ret), "item", *item);
937
          }
938
        } // if
939
      } // else
940

941
      if (OB_SUCC(ret)) {
942
        const char *err = NULL;
943
        if (ci->is_not_editable() && !arg.is_inner_) {
944
          ret = OB_INVALID_CONFIG; //TODO: specific report not editable
945
          LOG_WARN("config is not editable", "item", *item, KR(ret));
946
        } else if (!ci->check_unit(item->value_.ptr())) {
947
          ret = OB_INVALID_CONFIG;
948
          LOG_ERROR("invalid config", "item", *item, KR(ret));
949
        } else if (!ci->set_value(item->value_.ptr())) {
950
          ret = OB_INVALID_CONFIG;
951
          LOG_WARN("invalid config", "item", *item, KR(ret));
952
        } else if (!ci->check()) {
953
          ret = OB_INVALID_CONFIG;
954
          LOG_WARN("invalid value range", "item", *item, KR(ret));
955
        } else if (!ctx_.root_service_->check_config(*ci, err)) {
956
          ret = OB_INVALID_CONFIG;
957
          LOG_WARN("invalid value range", "item", *item, KR(ret));
958
        }
959
        if (OB_FAIL(ret)) {
960
          if (nullptr != err) {
961
            LOG_USER_ERROR(OB_INVALID_CONFIG, err);
962
          }
963
        }
964
      } // if
965
    } // else
966
  } // FOREACH_X
967

968
  if (nullptr != cfg) {
969
    cfg->~ObServerConfigChecker();
970
    ob_free(cfg);
971
    cfg = nullptr;
972
    ptr = nullptr;
973
  } else if (nullptr != ptr) {
974
    ob_free(ptr);
975
    ptr = nullptr;
976
  }
977
  if (nullptr != tenant_cfg) {
978
    tenant_cfg->~ObTenantConfigChecker();
979
    ob_free(tenant_cfg);
980
    tenant_cfg = nullptr;
981
    cfg_ptr = nullptr;
982
  } else if (nullptr != cfg_ptr) {
983
    ob_free(cfg_ptr);
984
    cfg_ptr = nullptr;
985
  }
986
  return ret;
987
}
988

989
int ObAdminSetConfig::update_config(obrpc::ObAdminSetConfigArg &arg, int64_t new_version)
990
{
991
  int ret = OB_SUCCESS;
992
  if (!ctx_.is_inited()) {
993
    ret = OB_NOT_INIT;
994
    LOG_WARN("not init", KR(ret));
995
  } else if (!arg.is_valid()) {
996
    ret = OB_INVALID_ARGUMENT;
997
    LOG_WARN("invalid arg", K(arg), KR(ret));
998
  } else {
999
    FOREACH_X(item, arg.items_, OB_SUCCESS == ret) {
1000
      char svr_ip[OB_MAX_SERVER_ADDR_SIZE] = "ANY";
1001
      int64_t svr_port = 0;
1002
      if (item->server_.is_valid()) {
1003
        if (false == item->server_.ip_to_string(svr_ip, sizeof(svr_ip))) {
1004
          ret = OB_INVALID_ARGUMENT;
1005
          LOG_WARN("convert server addr to ip failed", KR(ret), "server", item->server_);
1006
        } else {
1007
          svr_port = item->server_.get_port();
1008
          ObAddr addr;
1009
          bool is_server_exist = false;
1010
          if (false == addr.set_ip_addr(svr_ip, static_cast<int32_t>(svr_port))){
1011
            ret = OB_ERR_UNEXPECTED;
1012
            LOG_WARN("set addr fail", KR(ret), "svr_ip", svr_ip, K(svr_port));
1013
          } else if (OB_FAIL(SVR_TRACER.is_server_exist(addr, is_server_exist))) {
1014
            LOG_WARN("check server exist fail", K(addr));
1015
          } else if (!is_server_exist) {
1016
            ret = OB_INVALID_ARGUMENT;
1017
            LOG_WARN("server is not exist", KR(ret), K(addr));
1018
            LOG_USER_ERROR(OB_INVALID_ARGUMENT, "server");
1019
          }
1020
        } // else
1021
      } // if
1022

1023
      if (OB_FAIL(ret)) {
1024
      } else if (!item->zone_.is_empty()) {
1025
        bool is_zone_exist = false;
1026
        if (OB_FAIL(ctx_.zone_mgr_->check_zone_exist(item->zone_, is_zone_exist))) {
1027
          ret = OB_ERR_UNEXPECTED;
1028
          LOG_WARN("check zone exist fail", KR(ret), "zone", item->zone_);
1029
        } else if(!is_zone_exist) {
1030
          ret = OB_INVALID_ARGUMENT;
1031
          LOG_WARN("zone is not exist", KR(ret), "zone", item->zone_);
1032
          LOG_USER_ERROR(OB_INVALID_ARGUMENT, "zone");
1033
        }
1034
      }
1035

1036
      if (OB_SUCC(ret))
1037
      {
1038
        new_version = std::max(new_version + 1, ObTimeUtility::current_time());
1039
      }
1040

1041
      if (OB_FAIL(ret)) {
1042
      } else if (item->want_to_set_tenant_config_) {
1043
        // tenant config
1044
        ObDMLSqlSplicer dml;
1045
        share::schema::ObSchemaGetterGuard schema_guard;
1046
        const share::schema::ObSimpleTenantSchema *tenant_schema = NULL;
1047
        if (OB_FAIL(GSCHEMASERVICE.get_tenant_schema_guard(
1048
                OB_SYS_TENANT_ID, schema_guard))) {
1049
          LOG_WARN("fail to get sys tenant schema guard", KR(ret));
1050
        } else {
1051
          for (uint64_t tenant_id : item->tenant_ids_) {
1052
            const char *table_name = (ObAdminSetConfig::OB_PARAMETER_SEED_ID == tenant_id ?
1053
                                      OB_ALL_SEED_PARAMETER_TNAME : OB_TENANT_PARAMETER_TNAME);
1054
            tenant_id = (ObAdminSetConfig::OB_PARAMETER_SEED_ID == tenant_id ? OB_SYS_TENANT_ID : tenant_id);
1055
            uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
1056
            dml.reset();
1057
            if (OB_FAIL(schema_guard.get_tenant_info(exec_tenant_id, tenant_schema))) {
1058
              LOG_WARN("failed to get tenant ids", KR(ret), K(exec_tenant_id));
1059
            } else if (OB_ISNULL(tenant_schema)) {
1060
              ret = OB_TENANT_NOT_EXIST;
1061
              LOG_WARN("tenant not exist", KR(ret), K(tenant_id));
1062
            } else if (!tenant_schema->is_normal()) {
1063
              //tenant not normal, maybe tenant not ready, cannot add tenant config
1064
            } else if (0 == ObString(table_name).case_compare(OB_TENANT_PARAMETER_TNAME) &&
1065
                OB_FAIL(dml.add_pk_column("tenant_id", tenant_id))) {
1066
              // __all_seed_parameter does not have column 'tenant_id'
1067
              LOG_WARN("add column failed", KR(ret));
1068
            } else if (OB_FAIL(dml.add_pk_column("zone", item->zone_.ptr()))
1069
                || OB_FAIL(dml.add_pk_column("svr_type", print_server_role(OB_SERVER)))
1070
                || OB_FAIL(dml.add_pk_column(K(svr_ip)))
1071
                || OB_FAIL(dml.add_pk_column(K(svr_port)))
1072
                || OB_FAIL(dml.add_pk_column("name", item->name_.ptr()))
1073
                || OB_FAIL(dml.add_column("value", item->value_.ptr()))
1074
                || OB_FAIL(dml.add_column("info", item->comment_.ptr()))
1075
                || OB_FAIL(dml.add_column("config_version", new_version))) {
1076
              LOG_WARN("add column failed", KR(ret));
1077
            } else if (OB_FAIL(dml.get_values().append_fmt("usec_to_time(%ld)", new_version))) {
1078
              LOG_WARN("append valued failed", KR(ret));
1079
            } else if (OB_FAIL(dml.add_column(false, "gmt_modified"))) {
1080
              LOG_WARN("add column failed", KR(ret));
1081
            } else {
1082
              int64_t affected_rows = 0;
1083
              ObDMLExecHelper exec(*ctx_.sql_proxy_, exec_tenant_id);
1084
              ObConfigItem *ci = nullptr;
1085
              // tenant not exist in RS, use SYS instead
1086
              omt::ObTenantConfigGuard tenant_config(TENANT_CONF(OB_SYS_TENANT_ID));
1087
              if (!tenant_config.is_valid()) {
1088
                ret = OB_ERR_UNEXPECTED;
1089
                LOG_WARN("failed to get tenant config",K(tenant_id),  KR(ret));
1090
              } else if (OB_ISNULL(tenant_config->get_container().get(
1091
                                          ObConfigStringKey(item->name_.ptr())))) {
1092
                ret = OB_ERR_SYS_CONFIG_UNKNOWN;
1093
                LOG_WARN("can't found config item", KR(ret), K(tenant_id), "item", *item);
1094
              } else {
1095
                ci = *(tenant_config->get_container().get(
1096
                                      ObConfigStringKey(item->name_.ptr())));
1097
                if (OB_FAIL(dml.add_column("section", ci->section()))
1098
                            || OB_FAIL(dml.add_column("scope", ci->scope()))
1099
                            || OB_FAIL(dml.add_column("source", ci->source()))
1100
                            || OB_FAIL(dml.add_column("edit_level", ci->edit_level()))
1101
                            || OB_FAIL(dml.add_column("data_type", ci->data_type()))) {
1102
                  LOG_WARN("add column failed", KR(ret));
1103
                } else if (OB_FAIL(exec.exec_insert_update(table_name,
1104
                                                          dml, affected_rows))) {
1105
                  LOG_WARN("execute insert update failed", K(tenant_id), KR(ret), "item", *item);
1106
                } else if (is_zero_row(affected_rows) || affected_rows > 2) {
1107
                  ret = OB_ERR_UNEXPECTED;
1108
                  LOG_WARN("unexpected affected rows", K(tenant_id), K(affected_rows), KR(ret));
1109
                } else {
1110
                  // set config_version to config_version_map and trigger parameter update
1111
                  if (ObAdminSetConfig::OB_PARAMETER_SEED_ID == tenant_id) {
1112
                  } else if (OB_FAIL(OTC_MGR.set_tenant_config_version(tenant_id, new_version))) {
1113
                    LOG_WARN("failed to set tenant config version",
1114
                        K(tenant_id), KR(ret), "item", *item);
1115
                  } else if(GCTX.omt_->has_tenant(tenant_id) &&
1116
                      OB_FAIL(OTC_MGR.got_version(tenant_id, new_version))) {
1117
                    LOG_WARN("failed to got version", K(tenant_id), KR(ret), "item", *item);
1118
                  } else {
1119
                    LOG_INFO("got new tenant config version",
1120
                        K(new_version), K(tenant_id), "item", *item);
1121
                  }
1122
                }
1123
              }
1124
            }
1125
            if (OB_FAIL(ret)) {
1126
              break;
1127
            }
1128
          } // for
1129
        }
1130
      } else {
1131
        // sys config
1132
        ObDMLSqlSplicer dml;
1133
        dml.reset();
1134
        if (OB_SYS_TENANT_ID != item->exec_tenant_id_) {
1135
          uint64_t tenant_id = item->exec_tenant_id_;
1136
          ret = OB_ERR_UNEXPECTED;
1137
          LOG_WARN("unexpected tenant_id", K(tenant_id), KR(ret));
1138
        } else if (OB_FAIL(dml.add_pk_column("zone", item->zone_.ptr()))
1139
                  || OB_FAIL(dml.add_pk_column("svr_type", print_server_role(OB_SERVER)))
1140
                  || OB_FAIL(dml.add_pk_column(K(svr_ip)))
1141
                  || OB_FAIL(dml.add_pk_column(K(svr_port)))
1142
                  || OB_FAIL(dml.add_pk_column("name", item->name_.ptr()))
1143
                  || OB_FAIL(dml.add_column("value", item->value_.ptr()))
1144
                  || OB_FAIL(dml.add_column("info", item->comment_.ptr()))
1145
                  || OB_FAIL(dml.add_column("config_version", new_version))) {
1146
          LOG_WARN("add column failed", KR(ret));
1147
        } else if (OB_FAIL(dml.get_values().append_fmt("usec_to_time(%ld)", new_version))) {
1148
          LOG_WARN("append valued failed", KR(ret));
1149
        } else if (OB_FAIL(dml.add_column(false, "gmt_modified"))) {
1150
          LOG_WARN("add column failed", KR(ret));
1151
        } else {
1152
          int64_t affected_rows = 0;
1153
          ObDMLExecHelper exec(*ctx_.sql_proxy_, OB_SYS_TENANT_ID);
1154
          ObConfigItem *ci = nullptr;
1155
          ObConfigItem *const *ci_ptr = GCONF.get_container().get(
1156
                                         ObConfigStringKey(item->name_.ptr()));
1157
          if (OB_ISNULL(ci_ptr)) {
1158
            ret = OB_ERR_SYS_CONFIG_UNKNOWN;
1159
            LOG_WARN("can't found config item", KR(ret), "item", *item);
1160
          } else {
1161
            ci = *ci_ptr;
1162
            if (OB_FAIL(dml.add_column("section", ci->section()))
1163
                        || OB_FAIL(dml.add_column("scope", ci->scope()))
1164
                        || OB_FAIL(dml.add_column("source", ci->source()))
1165
                        || OB_FAIL(dml.add_column("edit_level", ci->edit_level()))
1166
                        || OB_FAIL(dml.add_column("data_type", ci->data_type()))) {
1167
              LOG_WARN("add column failed", KR(ret));
1168
            } else if (OB_FAIL(exec.exec_insert_update(OB_ALL_SYS_PARAMETER_TNAME,
1169
                                                       dml, affected_rows))) {
1170
              LOG_WARN("execute insert update failed", KR(ret), "item", *item);
1171
            } else if (is_zero_row(affected_rows) || affected_rows > 2) {
1172
              ret = OB_ERR_UNEXPECTED;
1173
              LOG_WARN("unexpected affected rows", K(affected_rows), KR(ret));
1174
            } else {
1175
              // set config_version to __all_zone and trigger parameter update
1176
              if (OB_FAIL(ctx_.zone_mgr_->update_config_version(new_version))) {
1177
                LOG_WARN("set new config version failed", KR(ret), K(new_version));
1178
              } else if (OB_FAIL(ctx_.config_mgr_->got_version(new_version))) {
1179
                LOG_WARN("config mgr got version failed", KR(ret), K(new_version));
1180
              } else {
1181
                LOG_INFO("got new sys config version", K(new_version), "item", *item);
1182
              }
1183
            } // else trigger update
1184
          } // else
1185
        } // else
1186
      } // else sys config
1187
    } // FOREACH_X
1188
  }
1189

1190
  return ret;
1191
}
1192

1193
int ObAdminSetConfig::execute(obrpc::ObAdminSetConfigArg &arg)
1194
{
1195
  LOG_INFO("execute set config request", K(arg));
1196
  int ret = OB_SUCCESS;
1197
  int64_t config_version = 0;
1198
  if (!ctx_.is_inited()) {
1199
    ret = OB_NOT_INIT;
1200
    LOG_WARN("not init", KR(ret));
1201
  } else if (!arg.is_valid()) {
1202
    ret = OB_INVALID_ARGUMENT;
1203
    LOG_WARN("invalid arg", K(arg), KR(ret));
1204
  } else if (OB_FAIL(verify_config(arg))) {
1205
    LOG_WARN("verify config failed", KR(ret), K(arg));
1206
  } else if (OB_FAIL(ctx_.zone_mgr_->get_config_version(config_version))) {
1207
    LOG_WARN("get_config_version failed", KR(ret));
1208
  } else {
1209
    if (OB_FAIL(ctx_.root_service_->set_config_pre_hook(arg))) {
1210
      LOG_WARN("fail to process pre hook", K(arg), KR(ret));
1211
    } else if (OB_FAIL(update_config(arg, config_version))) {
1212
      LOG_WARN("update config failed", KR(ret), K(arg));
1213
    } else if (OB_ISNULL(ctx_.root_service_)) {
1214
      ret = OB_ERR_UNEXPECTED;
1215
      LOG_WARN("error inner stat", KR(ret), K(ctx_.root_service_));
1216
    } else if (OB_FAIL(ctx_.root_service_->set_config_post_hook(arg))) {
1217
      LOG_WARN("fail to set config callback", KR(ret));
1218
    } else {
1219
      LOG_INFO("set config succ", K(arg));
1220
    }
1221
  }
1222
  return ret;
1223
}
1224

1225
int ObAdminMigrateUnit::execute(const ObAdminMigrateUnitArg &arg)
1226
{
1227
  int ret = OB_SUCCESS;
1228
  LOG_INFO("execute migrate unit request", K(arg));
1229
  if (!ctx_.is_inited()) {
1230
    ret = OB_NOT_INIT;
1231
    LOG_WARN("not init", KR(ret));
1232
  } else if (!arg.is_valid()) {
1233
    ret = OB_INVALID_ARGUMENT;
1234
    LOG_WARN("invalid arg", K(arg), KR(ret));
1235
  } else {
1236
    const uint64_t unit_id = arg.unit_id_;
1237
    const ObAddr &dst = arg.destination_;
1238
    if (OB_FAIL(ctx_.unit_mgr_->admin_migrate_unit(unit_id, dst, arg.is_cancel_))) {
1239
      LOG_WARN("migrate unit failed", K(unit_id), K(dst), KR(ret));
1240
    } else {
1241
      ctx_.root_balancer_->wakeup();
1242
    }
1243
  }
1244
  return ret;
1245
}
1246

1247
int ObAdminUpgradeVirtualSchema::execute()
1248
{
1249
  int ret = OB_SUCCESS;
1250
  LOG_INFO("execute upgrade virtual schema request");
1251
  int64_t upgrade_cnt = 0;
1252
  ObSchemaGetterGuard schema_guard;
1253
  ObArray<uint64_t> tenant_ids;
1254
  if (OB_UNLIKELY(!ctx_.is_inited())) {
1255
    ret = OB_NOT_INIT;
1256
    LOG_WARN("not init", KR(ret));
1257
  } else if (GCTX.is_standby_cluster()) {
1258
    // standby cluster cannot upgrade virtual schema independently,
1259
    // need to get these information from the primary cluster
1260
    ret = OB_OP_NOT_ALLOW;
1261
    LOG_WARN("upgrade virtual schema in standby cluster not allow", KR(ret));
1262
    LOG_USER_ERROR(OB_OP_NOT_ALLOW, "upgrade virtual schema in standby cluster");
1263
  } else if (OB_ISNULL(ctx_.root_inspection_)
1264
             || OB_ISNULL(ctx_.ddl_service_)) {
1265
    ret = OB_ERR_UNEXPECTED;
1266
    LOG_WARN("ptr is null", KR(ret), KP(ctx_.root_inspection_), KP(ctx_.ddl_service_));
1267
  } else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
1268
             OB_SYS_TENANT_ID, schema_guard))) {
1269
    LOG_WARN("get_schema_guard failed", KR(ret));
1270
  } else if (OB_FAIL(schema_guard.get_tenant_ids(tenant_ids))) {
1271
    LOG_WARN("fail to get tenant ids", KR(ret));
1272
  } else {
1273
    FOREACH(tenant_id, tenant_ids) { // ignore ret
1274
      int tmp_ret = OB_SUCCESS;
1275
      if (OB_SUCCESS != (tmp_ret = execute(*tenant_id, upgrade_cnt))) {
1276
        LOG_WARN("fail to execute upgrade virtual table by tenant", KR(tmp_ret), K(*tenant_id));
1277
      }
1278
      ret = OB_SUCC(ret) ? tmp_ret : ret;
1279
    }
1280
  }
1281
  if (OB_SUCC(ret) && upgrade_cnt > 0) {
1282
    // if schema upgraded, inspect schema again
1283
    int tmp_ret = ctx_.root_inspection_->check_all();
1284
    if (OB_SUCCESS != tmp_ret) {
1285
      LOG_WARN("root inspection failed", KR(tmp_ret));
1286
    }
1287
  }
1288
  return ret;
1289
}
1290

1291
int ObAdminUpgradeVirtualSchema::execute(
1292
    const uint64_t tenant_id,
1293
    int64_t &upgrade_cnt)
1294
{
1295
  int ret = OB_SUCCESS;
1296
  if (OB_UNLIKELY(!ctx_.is_inited())) {
1297
    ret = OB_NOT_INIT;
1298
    LOG_WARN("not init", KR(ret));
1299
  } else if (OB_UNLIKELY(
1300
             is_virtual_tenant_id(tenant_id)
1301
             || OB_INVALID_TENANT_ID == tenant_id)) {
1302
    ret = OB_INVALID_ARGUMENT;
1303
    LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
1304
  } else if (OB_ISNULL(ctx_.root_inspection_)
1305
             || OB_ISNULL(ctx_.ddl_service_)) {
1306
    ret = OB_ERR_UNEXPECTED;
1307
    LOG_WARN("ptr is null", KR(ret), KP(ctx_.root_inspection_), KP(ctx_.ddl_service_));
1308
  }
1309

1310
  const schema_create_func *creator_ptr_array[] = {
1311
        share::virtual_table_schema_creators,
1312
        share::sys_view_schema_creators, NULL };
1313
  ObArray<ObTableSchema> hard_code_tables;
1314
  ObTableSchema table_schema;
1315

1316
  for (const schema_create_func **creator_ptr_ptr = creator_ptr_array;
1317
       OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr_ptr); ++creator_ptr_ptr) {
1318
    for (const schema_create_func *creator_ptr = *creator_ptr_ptr;
1319
        OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr); ++creator_ptr) {
1320
      table_schema.reset();
1321
      bool exist = false;
1322
      if (OB_FAIL((*creator_ptr)(table_schema))) {
1323
        LOG_WARN("create table schema failed", KR(ret));
1324
      } else if (!is_sys_tenant(tenant_id)
1325
                 && OB_FAIL(ObSchemaUtils::construct_tenant_space_full_table(
1326
                            tenant_id, table_schema))) {
1327
        LOG_WARN("fail to construct tenant space table", KR(ret), K(tenant_id));
1328
      } else if (OB_FAIL(ObSysTableChecker::is_inner_table_exist(
1329
                 tenant_id, table_schema, exist))) {
1330
        LOG_WARN("fail to check inner table exist",
1331
                 KR(ret), K(tenant_id), K(table_schema));
1332
      } else if (!exist) {
1333
        // skip
1334
      } else if (is_sys_table(table_schema.get_table_id())) {
1335
        // only check and upgrade virtual table && sys views
1336
      } else if (OB_FAIL(hard_code_tables.push_back(table_schema))) {
1337
        LOG_WARN("push_back failed", KR(ret), K(tenant_id));
1338
      }
1339
    }
1340
  }
1341

1342
  // remove tables not exist on hard code tables
1343
  ObSchemaGetterGuard schema_guard;
1344
  ObArray<uint64_t> tids;
1345
  if (FAILEDx(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(tenant_id, schema_guard))) {
1346
    LOG_WARN("get_schema_guard failed", KR(ret), K(tenant_id));
1347
  } else if (OB_FAIL(schema_guard.get_table_ids_in_tenant(tenant_id, tids))) {
1348
    LOG_WARN("get_table_ids_in_tenant failed", KR(ret), K(tenant_id));
1349
  } else {
1350
    FOREACH_CNT_X(tid, tids, OB_SUCC(ret)) {
1351
      const ObTableSchema *in_mem_table = NULL;
1352
      if (!is_inner_table(*tid) || is_sys_table(*tid)) {
1353
        continue;
1354
      } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, *tid, in_mem_table))) {
1355
        LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(*tid));
1356
      } else if (OB_ISNULL(in_mem_table)) {
1357
        ret = OB_TABLE_NOT_EXIST;
1358
        LOG_WARN("table not exist", KR(ret), K(tenant_id), K(*tid));
1359
      } else {
1360
        bool exist = false;
1361
        FOREACH_CNT_X(hard_code_table, hard_code_tables, OB_SUCC(ret) && !exist) {
1362
          if (OB_ISNULL(hard_code_table)) {
1363
            ret = OB_ERR_UNEXPECTED;
1364
            LOG_WARN("hard code table is null", KR(ret), K(tenant_id));
1365
          } else if (in_mem_table->get_table_id() == hard_code_table->get_table_id()) {
1366
            exist = true;
1367
          }
1368
        }
1369
        if (!exist) {
1370
          if (FAILEDx(ctx_.ddl_service_->drop_inner_table(*in_mem_table))) {
1371
            LOG_WARN("drop table schema failed", KR(ret), K(tenant_id), KPC(in_mem_table));
1372
          } else if (OB_FAIL(ctx_.ddl_service_->refresh_schema(tenant_id))) {
1373
            LOG_WARN("refresh_schema failed", KR(ret), K(tenant_id));
1374
          }
1375
        }
1376
      }
1377
    }
1378
  }
1379

1380
  // upgrade tables
1381
  FOREACH_CNT_X(hard_code_table, hard_code_tables, OB_SUCC(ret)) {
1382
    if (OB_ISNULL(hard_code_table)) {
1383
      ret = OB_ERR_UNEXPECTED;
1384
      LOG_WARN("hard code table is null", KR(ret), K(tenant_id));
1385
    } else if (OB_FAIL(ctx_.root_inspection_->check_table_schema(tenant_id, *hard_code_table))) {
1386
      if (OB_SCHEMA_ERROR != ret) {
1387
        LOG_WARN("check table schema failed", KR(ret), K(tenant_id), K(*hard_code_table));
1388
      } else {
1389
        LOG_INFO("table schema need upgrade", K(tenant_id), K(*hard_code_table));
1390
        if (OB_FAIL(upgrade_(tenant_id, *hard_code_table))) {
1391
          LOG_WARN("upgrade failed", KR(ret), K(tenant_id), K(*hard_code_table));
1392
        } else {
1393
          LOG_INFO("update table schema success", K(tenant_id), K(*hard_code_table));
1394
          upgrade_cnt++;
1395
        }
1396
      }
1397
    }
1398
  }
1399
  return ret;
1400
}
1401

1402
int ObAdminUpgradeVirtualSchema::upgrade_(
1403
    const uint64_t tenant_id,
1404
    share::schema::ObTableSchema &table)
1405
{
1406
  int ret = OB_SUCCESS;
1407
  const ObTableSchema *exist_schema = NULL;
1408
  ObSchemaGetterGuard schema_guard;
1409
  if (OB_UNLIKELY(!ctx_.is_inited())) {
1410
    ret = OB_NOT_INIT;
1411
    LOG_WARN("not init", KR(ret));
1412
  } else if (OB_UNLIKELY(
1413
             is_virtual_tenant_id(tenant_id)
1414
             || OB_INVALID_TENANT_ID == tenant_id)) {
1415
    ret = OB_INVALID_ARGUMENT;
1416
    LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
1417
  } else if (OB_UNLIKELY(
1418
             !table.is_valid()
1419
             || is_sys_table(table.get_table_id())
1420
             || table.get_tenant_id() != tenant_id)) {
1421
    ret = OB_INVALID_ARGUMENT;
1422
    LOG_WARN("invalid table", KR(ret), K(tenant_id), K(table));
1423
  } else if (OB_ISNULL(ctx_.ddl_service_)) {
1424
    ret = OB_ERR_UNEXPECTED;
1425
    LOG_WARN("ddl service is null", KR(ret));
1426
  }
1427
  // 1. check table name duplicated
1428
  if (FAILEDx(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
1429
      tenant_id, schema_guard))) {
1430
    LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id));
1431
  } else {
1432
    ObArenaAllocator allocator;
1433
    ObString index_name;
1434
    if (table.is_index_table()) {
1435
      // In the early version, table name of oracle virtual table index is not right
1436
      // (data_table_id is mysql virtual table id), which may cause we can't find duplicate index
1437
      // with table name and duplicate name conflict occur.
1438
      //
1439
      // ETC:
1440
      // OB_ALL_VIRTUAL_PLAN_CACHE_STAT_ORA_TID = 15034;
1441
      // OB_ALL_VIRTUAL_PLAN_CACHE_STAT_ORA_ALL_VIRTUAL_PLAN_CACHE_STAT_I1_TID = 19998;
1442
      // OB_ALL_VIRTUAL_PLAN_CACHE_STAT_ALL_VIRTUAL_PLAN_CACHE_STAT_I1_TNAME = "__idx_1099511638779_all_virtual_plan_cache_stat_i1";
1443
      // OB_ALL_VIRTUAL_PLAN_CACHE_STAT_ORA_ALL_VIRTUAL_PLAN_CACHE_STAT_I1_TNAME = "__idx_1099511642810_all_virtual_plan_cache_stat_i1";
1444
      //
1445
      // For oracle virtual table index which data_table_id is (1 << 40) | 15034 = 1099511642810,
1446
      // but it use OB_ALL_VIRTUAL_PLAN_CACHE_STAT_ALL_VIRTUAL_PLAN_CACHE_STAT_I1_TNAME as table name.
1447
      if (OB_FAIL(table.generate_origin_index_name())) {
1448
        LOG_WARN("fail to generate origin index name", KR(ret), K(table));
1449
      } else if (OB_FAIL(ObTableSchema::build_index_table_name(
1450
                 allocator, table.get_data_table_id(),
1451
                 table.get_origin_index_name_str(), index_name))) {
1452
        LOG_WARN("fail to build index table name", KR(ret), K(table));
1453
      }
1454
    }
1455
    if (FAILEDx(schema_guard.get_table_schema(
1456
                tenant_id,
1457
                table.get_database_id(),
1458
                table.is_index_table() ? index_name : table.get_table_name(),
1459
                table.is_index_table(),
1460
                exist_schema))) {
1461
      LOG_WARN("get table schema failed", KR(ret), K(tenant_id), "table", table.get_table_name());
1462
      if (OB_TABLE_NOT_EXIST == ret) {
1463
        ret = OB_SUCCESS;
1464
      }
1465
    } else if (OB_ISNULL(exist_schema)) {
1466
      // no duplicate table name
1467
    } else if (OB_FAIL(ctx_.ddl_service_->drop_inner_table(*exist_schema))) {
1468
      LOG_WARN("get table schema failed", KR(ret), K(tenant_id),
1469
               "table", table.get_table_name(), "table_id", table.get_table_id());
1470
    } else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
1471
               tenant_id, schema_guard))) {
1472
      LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id));
1473
    }
1474
  }
1475
  // 2. try drop table first
1476
  exist_schema = NULL;
1477
  if (FAILEDx(schema_guard.get_table_schema(tenant_id,
1478
                                            table.get_table_id(),
1479
                                            exist_schema))) {
1480
    LOG_WARN("get table schema failed", KR(ret), "table", table.get_table_name(),
1481
             "table_id", table.get_table_id());
1482
    if (OB_TABLE_NOT_EXIST == ret) {
1483
      ret = OB_SUCCESS;
1484
    }
1485
  } else if (OB_ISNULL(exist_schema)) {
1486
    // missed table
1487
  } else if (OB_FAIL(ctx_.ddl_service_->drop_inner_table(*exist_schema))) {
1488
    LOG_WARN("drop table schema failed", KR(ret), "table_schema", *exist_schema);
1489
  } else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
1490
             tenant_id, schema_guard))) {
1491
    LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id));
1492
  }
1493
  // 3. create table
1494
  if (FAILEDx(ctx_.ddl_service_->add_table_schema(table, schema_guard))) {
1495
    LOG_WARN("add table schema failed", KR(ret), K(tenant_id), K(table));
1496
  } else if (OB_FAIL(ctx_.ddl_service_->refresh_schema(tenant_id))) {
1497
    LOG_WARN("refresh schema failed", KR(ret), K(tenant_id));
1498
  }
1499

1500
  return ret;
1501
}
1502

1503
int ObAdminUpgradeCmd::execute(const Bool &upgrade)
1504
{
1505
  int ret = OB_SUCCESS;
1506
  // set enable_upgrade_mode
1507
  HEAP_VAR(ObAdminSetConfigItem, item) {
1508
    obrpc::ObAdminSetConfigArg set_config_arg;
1509
    set_config_arg.is_inner_ = true;
1510
    const char *enable_upgrade_name = "enable_upgrade_mode";
1511
    ObAdminSetConfig admin_set_config(ctx_);
1512
    char min_server_version[OB_SERVER_VERSION_LENGTH] = {'\0'};
1513
    uint64_t cluster_version = GET_MIN_CLUSTER_VERSION();
1514

1515
    if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
1516
        min_server_version, OB_SERVER_VERSION_LENGTH, cluster_version)) {
1517
       ret = OB_INVALID_ARGUMENT;
1518
       LOG_WARN("fail to print version str", KR(ret), K(cluster_version));
1519
    } else if (OB_FAIL(item.name_.assign(enable_upgrade_name))) {
1520
      LOG_WARN("assign enable_upgrade_mode config name failed", KR(ret));
1521
    } else if (OB_FAIL(item.value_.assign((upgrade ? "true" : "false")))) {
1522
      LOG_WARN("assign enable_upgrade_mode config value failed", KR(ret));
1523
    } else if (OB_FAIL(set_config_arg.items_.push_back(item))) {
1524
      LOG_WARN("add enable_upgrade_mode config item failed", KR(ret));
1525
    } else {
1526
      const char *upgrade_stage_name = "_upgrade_stage";
1527
      obrpc::ObUpgradeStage stage = upgrade ?
1528
                                    obrpc::OB_UPGRADE_STAGE_PREUPGRADE :
1529
                                    obrpc::OB_UPGRADE_STAGE_NONE;
1530
      if (OB_FAIL(item.name_.assign(upgrade_stage_name))) {
1531
        LOG_WARN("assign _upgrade_stage config name failed", KR(ret), K(upgrade));
1532
      } else if (OB_FAIL(item.value_.assign(obrpc::get_upgrade_stage_str(stage)))) {
1533
        LOG_WARN("assign _upgrade_stage config value failed", KR(ret), K(stage), K(upgrade));
1534
      } else if (OB_FAIL(set_config_arg.items_.push_back(item))) {
1535
        LOG_WARN("add _upgrade_stage config item failed", KR(ret), K(stage), K(upgrade));
1536
      }
1537
    }
1538
    share::ObServerInfoInTable::ObBuildVersion build_version;
1539
    if (FAILEDx(admin_set_config.execute(set_config_arg))) {
1540
      LOG_WARN("execute set config failed", KR(ret));
1541
    } else if (OB_FAIL(observer::ObService::get_build_version(build_version))) {
1542
      LOG_WARN("fail to get build version", KR(ret));
1543
    } else {
1544
      CLUSTER_EVENT_SYNC_ADD("UPGRADE",
1545
                             upgrade ? "BEGIN_UPGRADE" : "END_UPGRADE",
1546
                             "cluster_version", min_server_version,
1547
                             "build_version", build_version.ptr());
1548
      LOG_INFO("change upgrade parameters",
1549
               "enable_upgrade_mode", upgrade,
1550
               "in_major_version_upgrade_mode", GCONF.in_major_version_upgrade_mode());
1551

1552
    }
1553
  }
1554
  return ret;
1555
}
1556

1557
int ObAdminRollingUpgradeCmd::execute(const obrpc::ObAdminRollingUpgradeArg &arg)
1558
{
1559
  int ret = OB_SUCCESS;
1560
  HEAP_VAR(ObAdminSetConfigItem, item) {
1561
    obrpc::ObAdminSetConfigArg set_config_arg;
1562
    set_config_arg.is_inner_ = true;
1563
    const char *upgrade_stage_name = "_upgrade_stage";
1564
    ObAdminSetConfig admin_set_config(ctx_);
1565
    char ori_min_server_version[OB_SERVER_VERSION_LENGTH] = {'\0'};
1566
    char min_server_version[OB_SERVER_VERSION_LENGTH] = {'\0'};
1567
    uint64_t ori_cluster_version = GET_MIN_CLUSTER_VERSION();
1568

1569
    if (!arg.is_valid()) {
1570
      ret = OB_INVALID_ARGUMENT;
1571
      LOG_WARN("invalid arg", KR(ret), K(arg));
1572
    } else if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
1573
               ori_min_server_version, OB_SERVER_VERSION_LENGTH, ori_cluster_version)) {
1574
       ret = OB_INVALID_ARGUMENT;
1575
       LOG_WARN("fail to print version str", KR(ret), K(ori_cluster_version));
1576
    } else if (OB_FAIL(item.name_.assign(upgrade_stage_name))) {
1577
      LOG_WARN("assign _upgrade_stage config name failed", KR(ret), K(arg));
1578
    } else if (OB_FAIL(item.value_.assign(obrpc::get_upgrade_stage_str(arg.stage_)))) {
1579
      LOG_WARN("assign _upgrade_stage config value failed", KR(ret), K(arg));
1580
    } else if (OB_FAIL(set_config_arg.items_.push_back(item))) {
1581
      LOG_WARN("add _upgrade_stage config item failed", KR(ret), K(arg));
1582
    } else if (obrpc::OB_UPGRADE_STAGE_POSTUPGRADE == arg.stage_) {
1583
      // wait min_observer_version to report to inner table
1584
      ObTimeoutCtx ctx;
1585
      if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
1586
        LOG_WARN("fail to set default timeout", KR(ret));
1587
      } else {
1588
        const int64_t CHECK_INTERVAL = 100 * 1000L; // 100ms
1589
        while (OB_SUCC(ret)) {
1590
          uint64_t min_observer_version = 0;
1591
          if (ctx.is_timeouted()) {
1592
            ret = OB_TIMEOUT;
1593
            LOG_WARN("wait min_server_version report to inner table failed",
1594
                     KR(ret), "abs_timeout", ctx.get_abs_timeout());
1595
          } else if (OB_FAIL(SVR_TRACER.get_min_server_version(
1596
                     min_server_version, min_observer_version))) {
1597
            LOG_WARN("failed to get the min server version", KR(ret));
1598
          } else if (min_observer_version > CLUSTER_CURRENT_VERSION) {
1599
            ret = OB_ERR_UNEXPECTED;
1600
            LOG_WARN("min_observer_version is larger than CLUSTER_CURRENT_VERSION",
1601
                     KR(ret), "min_server_version", min_server_version,
1602
                     K(min_observer_version), "CLUSTER_CURRENT_VERSION", CLUSTER_CURRENT_VERSION);
1603
          } else if (min_observer_version < CLUSTER_CURRENT_VERSION) {
1604
            if (REACH_TIME_INTERVAL(1 * 1000 * 1000L)) { // 1s
1605
              LOG_INFO("min_observer_version is not reported yet, just wait",
1606
                       KR(ret), "min_server_version", min_server_version,
1607
                       K(min_observer_version), "CLUSTER_CURRENT_VERSION", CLUSTER_CURRENT_VERSION);
1608
            }
1609
            ob_usleep(CHECK_INTERVAL);
1610
          } else {
1611
            break;
1612
          }
1613
        } // end while
1614
      }
1615
      // end rolling upgrade, should raise min_observer_version
1616
      const char *min_obs_version_name = "min_observer_version";
1617
      if (FAILEDx(item.name_.assign(min_obs_version_name))) {
1618
        LOG_WARN("assign min_observer_version config name failed",
1619
                 KR(ret), K(min_obs_version_name));
1620
      } else if (OB_FAIL(item.value_.assign(min_server_version))) {
1621
        LOG_WARN("assign min_observer_version config value failed",
1622
                 KR(ret), K(min_server_version));
1623
      } else if (OB_FAIL(set_config_arg.items_.push_back(item))) {
1624
        LOG_WARN("add min_observer_version config item failed", KR(ret), K(item));
1625
      }
1626
    }
1627
    if (FAILEDx(admin_set_config.execute(set_config_arg))) {
1628
      LOG_WARN("execute set config failed", KR(ret));
1629
    } else {
1630
      share::ObServerInfoInTable::ObBuildVersion build_version;
1631
      if (OB_FAIL(observer::ObService::get_build_version(build_version))) {
1632
        LOG_WARN("fail to get build version", KR(ret));
1633
      } else if (obrpc::OB_UPGRADE_STAGE_POSTUPGRADE != arg.stage_) {
1634
        CLUSTER_EVENT_SYNC_ADD("UPGRADE", "BEGIN_ROLLING_UPGRADE",
1635
                               "cluster_version", ori_min_server_version,
1636
                               "build_version", build_version.ptr());
1637
      } else {
1638
        CLUSTER_EVENT_SYNC_ADD("UPGRADE", "END_ROLLING_UPGRADE",
1639
                               "cluster_version", min_server_version,
1640
                               "ori_cluster_version", ori_min_server_version,
1641
                               "build_version", build_version.ptr());
1642
      }
1643
      LOG_INFO("change upgrade parameters", KR(ret), "_upgrade_stage", arg.stage_);
1644
    }
1645
  }
1646
  return ret;
1647
}
1648

1649
DEFINE_ENUM_FUNC(ObInnerJob, inner_job, OB_INNER_JOB_DEF);
1650

1651
int ObAdminRunJob::execute(const ObRunJobArg &arg)
1652
{
1653
  int ret = OB_SUCCESS;
1654
  ObInnerJob job = INVALID_INNER_JOB;
1655
  if (!ctx_.is_inited()) {
1656
    ret = OB_NOT_INIT;
1657
    LOG_WARN("not init", KR(ret));
1658
  } else if (!arg.is_valid()) {
1659
    ret = OB_INVALID_ARGUMENT;
1660
    LOG_WARN("invalid arg", K(arg), KR(ret));
1661
  } else if (INVALID_INNER_JOB == (job = get_inner_job_value(arg.job_))) {
1662
    ret = OB_INVALID_ARGUMENT;
1663
    LOG_WARN("invalid inner job", K(arg), KR(ret));
1664
  } else {
1665
    switch(job) {
1666
      case CHECK_PARTITION_TABLE: {
1667
        ObAdminCheckPartitionTable job_executor(ctx_);
1668
        if (OB_FAIL(job_executor.execute(arg))) {
1669
          LOG_WARN("execute job failed", K(arg), KR(ret));
1670
        }
1671
        break;
1672
      }
1673
      case ROOT_INSPECTION: {
1674
        ObAdminRootInspection job_executor(ctx_);
1675
        if (OB_FAIL(job_executor.execute(arg))) {
1676
          LOG_WARN("execute job failed", K(arg), KR(ret));
1677
        }
1678
        break;
1679
      }
1680
      case UPGRADE_STORAGE_FORMAT_VERSION:
1681
      case STOP_UPGRADE_STORAGE_FORMAT_VERSION: {
1682
        ObAdminUpgradeStorageFormatVersionExecutor job_executor(ctx_);
1683
        if (OB_FAIL(job_executor.execute(arg))) {
1684
          LOG_WARN("fail to execute upgrade storage format version job", KR(ret));
1685
        }
1686
        break;
1687
      }
1688
      case CREATE_INNER_SCHEMA: {
1689
        ObAdminCreateInnerSchema job_executor(ctx_);
1690
        if (OB_FAIL(job_executor.execute(arg))) {
1691
          LOG_WARN("execute job failed", KR(ret));
1692
        }
1693
        break;
1694
      }
1695
      case IO_CALIBRATION: {
1696
        ObAdminIOCalibration job_executor(ctx_);
1697
        if (OB_FAIL(job_executor.execute(arg))) {
1698
          LOG_WARN("execute job failed", KR(ret));
1699
        }
1700
        break;
1701
      }
1702
      default: {
1703
        ret = OB_ERR_UNEXPECTED;
1704
        LOG_WARN("not known job", K(job), KR(ret));
1705
        break;
1706
      }
1707
    }
1708
  }
1709
  return ret;
1710
}
1711

1712
int ObAdminCheckPartitionTable::execute(const obrpc::ObRunJobArg &arg)
1713
{
1714
  UNUSEDx(arg);
1715
  return OB_NOT_SUPPORTED;
1716
}
1717

1718
int ObAdminCheckPartitionTable::call_server(const ObAddr &server)
1719
{
1720
  int ret = OB_SUCCESS;
1721
  if (!ctx_.is_inited()) {
1722
    ret = OB_NOT_INIT;
1723
    LOG_WARN("not init", KR(ret));
1724
  } else if (!server.is_valid()) {
1725
    ret = OB_INVALID_ARGUMENT;
1726
    LOG_WARN("invalid server", K(server), KR(ret));
1727
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server).check_partition_table())) {
1728
    LOG_WARN("request check partition table failed", KR(ret), K(server));
1729
  }
1730
  return ret;
1731
}
1732

1733
int ObAdminCreateInnerSchema::execute(const obrpc::ObRunJobArg &arg)
1734
{
1735
  int ret = OB_SUCCESS;
1736
  LOG_INFO("execute create inner role request", KR(ret));
1737
  if (!ctx_.is_inited()) {
1738
    ret = OB_NOT_INIT;
1739
    LOG_WARN("not init", KR(ret));
1740
  } else if (!arg.is_valid()) {
1741
    ret = OB_INVALID_ARGUMENT;
1742
    LOG_WARN("invalid argument", KR(ret), K(arg));
1743
  } else if (CREATE_INNER_SCHEMA != get_inner_job_value(arg.job_)) {
1744
    ret = OB_ERR_UNEXPECTED;
1745
    LOG_WARN("job to run not create inner role", KR(ret), K(arg));
1746
  } else if (OB_UNLIKELY(nullptr == ctx_.root_service_)) {
1747
    ret = OB_ERR_UNEXPECTED;
1748
    LOG_WARN("root service ptr is null", KR(ret));
1749
  } else if (OB_FAIL(ctx_.root_service_->submit_create_inner_schema_task())) {
1750
    LOG_WARN("fail to submit create inner role task", KR(ret));
1751
  }
1752
  return ret;
1753
}
1754

1755
int ObAdminIOCalibration::execute(const obrpc::ObRunJobArg &arg)
1756
{
1757
  int ret = OB_SUCCESS;
1758
  LOG_INFO("execute io calibration quest", KR(ret));
1759
  if (!ctx_.is_inited()) {
1760
    ret = OB_NOT_INIT;
1761
    LOG_WARN("not init", KR(ret));
1762
  } else if (!arg.is_valid()) {
1763
    ret = OB_INVALID_ARGUMENT;
1764
    LOG_WARN("invalid argument", KR(ret), K(arg));
1765
  } else if (IO_CALIBRATION != get_inner_job_value(arg.job_)) {
1766
    ret = OB_ERR_UNEXPECTED;
1767
    LOG_WARN("unexpected job type", KR(ret), K(arg));
1768
  } else if (OB_FAIL(call_all(arg))) {
1769
    LOG_WARN("call all server failed", K(ret), K(arg));
1770
  }
1771
  return ret;
1772
}
1773

1774
int ObAdminIOCalibration::call_server(const common::ObAddr &server)
1775
{
1776
  int ret = OB_SUCCESS;
1777
  if (OB_UNLIKELY(!ctx_.is_inited())) {
1778
    ret = OB_NOT_INIT;
1779
    LOG_WARN("not init", K(ret));
1780
  } else if (OB_UNLIKELY(!server.is_valid())) {
1781
    ret = OB_INVALID_ARGUMENT;
1782
    LOG_WARN("invalid server", K(server), K(ret));
1783
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server).execute_io_benchmark())) {
1784
    LOG_WARN("request io calibration failed", KR(ret), K(server));
1785
  }
1786
  return ret;
1787
}
1788

1789
int ObAdminRefreshIOCalibration::execute(const obrpc::ObAdminRefreshIOCalibrationArg &arg)
1790
{
1791
  int ret = OB_SUCCESS;
1792
  ObArray<ObAddr> server_list;
1793
  if (OB_UNLIKELY(!ctx_.is_inited())) {
1794
    ret = OB_NOT_INIT;
1795
    LOG_WARN("not init", K(ret));
1796
  } else if (OB_UNLIKELY(!arg.is_valid())) {
1797
    ret = OB_INVALID_ARGUMENT;
1798
    LOG_WARN("invalid argument", K(ret), K(arg));
1799
  } else if (OB_FAIL(get_server_list(arg, server_list))) {
1800
    LOG_WARN("get server list failed", K(ret), K(arg));
1801
  } else if (arg.only_refresh_) {
1802
    // do nothing
1803
  } else {
1804
    ObIOAbility io_ability;
1805
    for (int64_t i = 0; OB_SUCC(ret) && i < arg.calibration_list_.count(); ++i) {
1806
      const ObIOBenchResult &item = arg.calibration_list_.at(i);
1807
      if (OB_FAIL(io_ability.add_measure_item(item))) {
1808
        LOG_WARN("add item failed", K(ret), K(item));
1809
      }
1810
    }
1811
    if (OB_SUCC(ret)) {
1812
      if (arg.calibration_list_.count() > 0 && !io_ability.is_valid()) {
1813
        ret = OB_INVALID_ARGUMENT;
1814
        LOG_WARN("invalid calibration list", K(ret), K(arg), K(io_ability));
1815
      }
1816
    }
1817
    if (OB_SUCC(ret)) {
1818
      ObMySQLTransaction trans;
1819
      if (OB_FAIL(trans.start(ctx_.sql_proxy_, OB_SYS_TENANT_ID))) {
1820
        LOG_WARN("start transaction failed", K(ret));
1821
      } else {
1822
        for (int64_t i = 0; OB_SUCC(ret) && i < server_list.count(); ++i) {
1823
          if (OB_FAIL(ObIOCalibration::get_instance().write_into_table(trans, server_list.at(i), io_ability))) {
1824
            LOG_WARN("write io ability failed", K(ret), K(io_ability), K(server_list.at(i)));
1825
          }
1826
        }
1827
        bool is_commit = OB_SUCCESS == ret;
1828
        int tmp_ret = trans.end(is_commit);
1829
        if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
1830
          LOG_WARN("end transaction failed", K(tmp_ret), K(is_commit));
1831
          ret = OB_SUCC(ret) ? tmp_ret : ret;
1832
        }
1833
      }
1834
    }
1835
  }
1836
  if (OB_SUCC(ret)) {
1837
    ObRefreshIOCalibrationArg refresh_arg;
1838
    refresh_arg.storage_name_ = arg.storage_name_;
1839
    refresh_arg.only_refresh_ = arg.only_refresh_;
1840
    if (OB_FAIL(refresh_arg.calibration_list_.assign(arg.calibration_list_))) {
1841
      LOG_WARN("assign calibration list failed", K(ret), K(arg.calibration_list_));
1842
    } else {
1843
      int64_t succ_count = 0;
1844
      FOREACH_CNT(server, server_list) {
1845
        int tmp_ret = ctx_.rpc_proxy_->to(*server).refresh_io_calibration(refresh_arg);
1846
        if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
1847
          LOG_WARN("request io calibration failed", KR(tmp_ret), K(*server), K(refresh_arg));
1848
        } else {
1849
          ++succ_count;
1850
        }
1851
      }
1852
      if (server_list.count() != succ_count) {
1853
        ret = OB_PARTIAL_FAILED;
1854
        LOG_USER_ERROR(OB_PARTIAL_FAILED);
1855
      }
1856
    }
1857
  }
1858
  LOG_INFO("admin refresh io calibration", K(ret), K(arg), K(server_list));
1859
  return ret;
1860
}
1861

1862
int ObAdminRefreshIOCalibration::call_server(const common::ObAddr &server)
1863
{
1864
  // should never go here
1865
  UNUSED(server);
1866
  return OB_NOT_SUPPORTED;
1867
}
1868

1869
int ObAdminRootInspection::execute(const obrpc::ObRunJobArg &arg)
1870
{
1871
  int ret = OB_SUCCESS;
1872
  LOG_INFO("execute root inspection request", K(arg));
1873
  ObAddr rs_addr;
1874
  if (!ctx_.is_inited()) {
1875
    ret = OB_NOT_INIT;
1876
    LOG_WARN("not init", KR(ret));
1877
  } else if (!arg.is_valid()) {
1878
    ret = OB_INVALID_ARGUMENT;
1879
    LOG_WARN("invalid arg", K(arg), KR(ret));
1880
  } else if (ROOT_INSPECTION != get_inner_job_value(arg.job_)) {
1881
    ret = OB_ERR_UNEXPECTED;
1882
    LOG_WARN("job to run not root inspection", K(arg), KR(ret));
1883
  } else if (OB_ISNULL(GCTX.rs_mgr_)) {
1884
    ret = OB_ERR_UNEXPECTED;
1885
    LOG_WARN("GCTX.rs_mgr_ is null", KR(ret), KP(GCTX.rs_mgr_));
1886
  } else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) {
1887
    LOG_WARN("fail to get master root server", KR(ret));
1888
  } else if (OB_UNLIKELY(!rs_addr.is_valid())) {
1889
    ret = OB_INVALID_ARGUMENT;
1890
    LOG_WARN("rs_addr is invalid", KR(ret), K(rs_addr));
1891
  } else if (!ctx_.root_inspection_->is_inited()) {
1892
    ret = OB_INNER_STAT_ERROR;
1893
    LOG_WARN("root_inspection not inited", KR(ret));
1894
  } else if (!arg.zone_.is_empty()) {
1895
    ret = OB_INVALID_ARGUMENT;
1896
    LOG_WARN("root inspection can't execute by zone", K(arg), KR(ret));
1897
  } else if (arg.server_.is_valid() && arg.server_ != rs_addr) {
1898
    ret = OB_INVALID_ARGUMENT;
1899
    LOG_WARN("only rs can execute root inspection", K(arg),
1900
        "rs", rs_addr, KR(ret));
1901
  } else if (OB_FAIL(ctx_.root_inspection_->check_all())) {
1902
    LOG_WARN("root_inspection check_all failed", KR(ret));
1903
  }
1904

1905
  return ret;
1906
}
1907

1908
int ObAdminUpgradeStorageFormatVersionExecutor::execute(const obrpc::ObRunJobArg &arg)
1909
{
1910
  int ret = OB_SUCCESS;
1911
  ObInnerJob job = INVALID_INNER_JOB;
1912
  LOG_INFO("execute upgrade storage format version request", K(arg));
1913
  if (OB_UNLIKELY(!ctx_.is_inited())) {
1914
    ret = OB_INVALID_ARGUMENT;
1915
    LOG_WARN("ObAdminUpgradeStorageFormatVersionExecutor has not been inited", KR(ret));
1916
  } else if (OB_UNLIKELY(!arg.is_valid())) {
1917
    ret = OB_INVALID_ARGUMENT;
1918
    LOG_WARN("invalid arguments", KR(ret), K(arg));
1919
  } else {
1920
    job = get_inner_job_value(arg.job_);
1921
    if (UPGRADE_STORAGE_FORMAT_VERSION == job) {
1922
      if (OB_ISNULL(ctx_.root_service_)) {
1923
        ret = OB_ERR_UNEXPECTED;
1924
        LOG_WARN("error unexpected, root service must not be NULL", KR(ret));
1925
      } else if (OB_FAIL(ctx_.root_service_->submit_upgrade_storage_format_version_task())) {
1926
        LOG_WARN("fail to submit upgrade storage format version task", KR(ret));
1927
      }
1928
    } else if (STOP_UPGRADE_STORAGE_FORMAT_VERSION == job) {
1929
      if (OB_ISNULL(ctx_.upgrade_storage_format_executor_)) {
1930
        ret = OB_ERR_UNEXPECTED;
1931
        LOG_WARN("executor is null", KR(ret));
1932
      } else if (OB_FAIL(ctx_.upgrade_storage_format_executor_->stop())) {
1933
        LOG_WARN("fail to stop upgrade_storage_format task", KR(ret));
1934
      } else {
1935
        ctx_.upgrade_storage_format_executor_->start();
1936
      }
1937
    } else {
1938
      ret = OB_ERR_UNEXPECTED;
1939
      LOG_WARN("invalid job type", KR(ret), K(job));
1940
    }
1941
  }
1942
  return ret;
1943
}
1944

1945
int ObAdminFlushCache::execute(const obrpc::ObAdminFlushCacheArg &arg)
1946
{
1947
  int ret = OB_SUCCESS;
1948
  int64_t tenant_num = arg.tenant_ids_.count();
1949
  ObSEArray<ObAddr, 8> server_list;
1950
  ObFlushCacheArg fc_arg;
1951
  // fine-grained plan evict only will pass this way.
1952
  // This because fine-grained plan evict must specify tenant
1953
  // if tenant num is 0, flush all tenant, else, flush appointed tenant
1954
  if (tenant_num != 0) { //flush appointed tenant
1955
    for (int64_t i = 0; OB_SUCC(ret) && i < tenant_num; ++i) {
1956
      //get tenant server list;
1957
      if (OB_FAIL(get_tenant_servers(arg.tenant_ids_.at(i), server_list))) {
1958
        LOG_WARN("fail to get tenant servers", "tenant_id", arg.tenant_ids_.at(i));
1959
      } else {
1960
        //call tenant servers;
1961
        fc_arg.is_all_tenant_ = false;
1962
        fc_arg.cache_type_ = arg.cache_type_;
1963
        fc_arg.ns_type_ = arg.ns_type_;
1964
        // fine-grained plan evict args
1965
        if (arg.is_fine_grained_) {
1966
          fc_arg.sql_id_ = arg.sql_id_;
1967
          fc_arg.is_fine_grained_ = arg.is_fine_grained_;
1968
          fc_arg.schema_id_ = arg.schema_id_;
1969
          for(int64_t j=0; OB_SUCC(ret) && j<arg.db_ids_.count(); j++) {
1970
            if (OB_FAIL(fc_arg.push_database(arg.db_ids_.at(j)))) {
1971
              LOG_WARN("fail to add db ids", KR(ret));
1972
            }
1973
          }
1974
        }
1975
        for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
1976
          fc_arg.tenant_id_ = arg.tenant_ids_.at(i);
1977
          LOG_INFO("flush server cache", K(fc_arg), K(server_list.at(j)));
1978
          if (OB_FAIL(call_server(server_list.at(j), fc_arg))) {
1979
            LOG_WARN("fail to call tenant server",
1980
                     "tenant_id", arg.tenant_ids_.at(i),
1981
                     "server addr", server_list.at(j));
1982
          }
1983
        }
1984
      }
1985
      server_list.reset();
1986
    }
1987
  } else { // flush all tenant
1988
    //get all server list, server_mgr_.get_alive_servers
1989
    if (OB_FAIL(get_all_servers(server_list))) {
1990
      LOG_WARN("fail to get all servers", KR(ret));
1991
    } else {
1992
      fc_arg.is_all_tenant_ = true;
1993
      fc_arg.tenant_id_ = common::OB_INVALID_TENANT_ID;
1994
      fc_arg.cache_type_ = arg.cache_type_;
1995
      fc_arg.ns_type_ = arg.ns_type_;
1996
      for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
1997
        LOG_INFO("flush server cache", K(fc_arg), K(server_list.at(j)));
1998
        if (OB_FAIL(call_server(server_list.at(j), fc_arg))) {
1999
          LOG_WARN("fail to call tenant server",
2000
                   "server addr", server_list.at(j));
2001
        }
2002
      }
2003
    }
2004
  }
2005
  return ret;
2006
}
2007

2008
#ifdef OB_BUILD_SPM
2009
int ObAdminLoadBaseline::execute(const obrpc::ObLoadPlanBaselineArg &arg)
2010
{
2011
  int ret = OB_SUCCESS;
2012
  ObSEArray<ObAddr, 8> server_list;
2013
  if (OB_FAIL(get_tenant_servers(arg.tenant_id_, server_list))) {
2014
    LOG_WARN("fail to get tenant servers", "tenant_id", arg.tenant_id_, KR(ret));
2015
  } else {
2016
    //call tenant servers;
2017
    for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
2018
      if (OB_FAIL(call_server(server_list.at(j), arg))) {
2019
        LOG_WARN("fail to call tenant server",
2020
                 "tenant_id", arg.tenant_id_,
2021
                 "server addr", server_list.at(j),
2022
                 KR(ret));
2023
      }
2024
    }
2025
  }
2026
  server_list.reset();
2027
  return ret;
2028
}
2029

2030
int ObAdminLoadBaseline::call_server(const common::ObAddr &server,
2031
                                     const obrpc::ObLoadPlanBaselineArg &arg)
2032
{
2033
  int ret = OB_SUCCESS;
2034
  if (!ctx_.is_inited()) {
2035
    ret = OB_NOT_INIT;
2036
    LOG_WARN("not init", KR(ret));
2037
  } else if (!server.is_valid()) {
2038
    ret = OB_INVALID_ARGUMENT;
2039
    LOG_WARN("invalid server", K(server), KR(ret));
2040
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server)
2041
                                     .by(arg.tenant_id_)
2042
                                     .as(arg.tenant_id_)
2043
                                     .load_baseline(arg))) {
2044
    LOG_WARN("request server load baseline failed", KR(ret), K(server));
2045
  }
2046

2047
  return ret;
2048
}
2049

2050
int ObAdminLoadBaselineV2::execute(const obrpc::ObLoadPlanBaselineArg &arg, uint64_t &total_load_count)
2051
{
2052
  int ret = OB_SUCCESS;
2053
  ObSEArray<ObAddr, 8> server_list;
2054
  if (OB_FAIL(get_tenant_servers(arg.tenant_id_, server_list))) {
2055
    LOG_WARN("fail to get tenant servers", "tenant_id", arg.tenant_id_, KR(ret));
2056
  } else {
2057
    //call tenant servers;
2058
    for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
2059
      obrpc::ObLoadBaselineRes res;
2060
      if (OB_FAIL(call_server(server_list.at(j), arg, res))) {
2061
        LOG_WARN("fail to call tenant server",
2062
                 "tenant_id", arg.tenant_id_,
2063
                 "server addr", server_list.at(j),
2064
                 KR(ret));
2065
      } else {
2066
        total_load_count += res.load_count_;
2067
      }
2068
    }
2069
  }
2070
  server_list.reset();
2071
  return ret;
2072
}
2073

2074
int ObAdminLoadBaselineV2::call_server(const common::ObAddr &server,
2075
                                     const obrpc::ObLoadPlanBaselineArg &arg,
2076
                                     obrpc::ObLoadBaselineRes &res)
2077
{
2078
  int ret = OB_SUCCESS;
2079
  if (!ctx_.is_inited()) {
2080
    ret = OB_NOT_INIT;
2081
    LOG_WARN("not init", KR(ret));
2082
  } else if (!server.is_valid()) {
2083
    ret = OB_INVALID_ARGUMENT;
2084
    LOG_WARN("invalid server", K(server), KR(ret));
2085
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server)
2086
                                     .by(arg.tenant_id_)
2087
                                     .as(arg.tenant_id_)
2088
                                     .load_baseline_v2(arg, res))) {
2089
    LOG_WARN("request server load baseline failed", KR(ret), K(server));
2090
  }
2091
  return ret;
2092
}
2093
#endif
2094

2095
int ObTenantServerAdminUtil::get_tenant_servers(const uint64_t tenant_id, common::ObIArray<ObAddr> &servers)
2096
{
2097
  int ret = OB_SUCCESS;
2098
  // sys tenant, get all servers directly
2099
  if (OB_SYS_TENANT_ID == tenant_id) {
2100
    if (OB_FAIL(get_all_servers(servers))) {
2101
      LOG_WARN("fail to get all servers", KR(ret));
2102
    }
2103
  } else {
2104
    ObArray<uint64_t> pool_ids;
2105
    if (OB_ISNULL(ctx_.unit_mgr_)) {
2106
      ret = OB_INVALID_ARGUMENT;
2107
      LOG_WARN("invalid argument", K(ctx_.unit_mgr_), KR(ret));
2108
    } else if (!SVR_TRACER.has_build() || !ctx_.unit_mgr_->check_inner_stat()) {
2109
      ret = OB_SERVER_IS_INIT;
2110
      LOG_WARN("server manager or unit manager hasn't built",
2111
               "unit_mgr built", ctx_.unit_mgr_->check_inner_stat(), KR(ret));
2112
    } else if (OB_FAIL(ctx_.unit_mgr_->get_pool_ids_of_tenant(tenant_id, pool_ids))) {
2113
      LOG_WARN("get_pool_ids_of_tenant failed", K(tenant_id), KR(ret));
2114
    } else {
2115
      ObArray<ObUnitInfo> unit_infos;
2116
      for (int64_t i = 0; OB_SUCC(ret) && i < pool_ids.count(); ++i) {
2117
        unit_infos.reuse();
2118
        if (OB_FAIL(ctx_.unit_mgr_->get_unit_infos_of_pool(pool_ids.at(i), unit_infos))) {
2119
          LOG_WARN("get_unit_infos_of_pool failed", "pool_id", pool_ids.at(i), KR(ret));
2120
        } else {
2121
          for (int64_t j = 0; OB_SUCC(ret) && j < unit_infos.count(); ++j) {
2122
            bool is_alive = false;
2123
            const ObUnit &unit = unit_infos.at(j).unit_;
2124
            if (OB_FAIL(SVR_TRACER.check_server_alive(unit.server_, is_alive))) {
2125
              LOG_WARN("check_server_alive failed", "server", unit.server_, KR(ret));
2126
            } else if (is_alive) {
2127
              if (OB_FAIL(servers.push_back(unit.server_))) {
2128
                LOG_WARN("push_back failed", KR(ret));
2129
              }
2130
            }
2131
            if (OB_SUCC(ret)) {
2132
              if (unit.migrate_from_server_.is_valid()) {
2133
                if (OB_FAIL(SVR_TRACER.check_server_alive(
2134
                    unit.migrate_from_server_, is_alive))) {
2135
                  LOG_WARN("check_server_alive failed", "server",
2136
                      unit.migrate_from_server_, KR(ret));
2137
                } else if (is_alive) {
2138
                  if (OB_FAIL(servers.push_back(unit.migrate_from_server_))) {
2139
                    LOG_WARN("push_back failed", KR(ret));
2140
                  }
2141
                }
2142
              }
2143
            }
2144
          } // for unit infos end
2145
        }
2146
      } // for pool ids end
2147
    }
2148
  }
2149

2150
  return ret;
2151
}
2152

2153
int ObTenantServerAdminUtil::get_all_servers(common::ObIArray<ObAddr> &servers)
2154
{
2155
  int ret = OB_SUCCESS;
2156
  ObZone empty_zone;
2157
  if (OB_FAIL(SVR_TRACER.get_alive_servers(empty_zone, servers))) {
2158
    //if zone is empty, get all servers
2159
    LOG_WARN("fail to get all servers", KR(ret));
2160
  }
2161
  return ret;
2162
}
2163

2164
int ObAdminFlushCache::call_server(const common::ObAddr &server, const obrpc::ObFlushCacheArg &arg)
2165
{
2166
  int ret = OB_SUCCESS;
2167
  if (!ctx_.is_inited()) {
2168
    ret = OB_NOT_INIT;
2169
    LOG_WARN("not init", KR(ret));
2170
  } else if (!server.is_valid()) {
2171
    ret = OB_INVALID_ARGUMENT;
2172
    LOG_WARN("invalid server", K(server), KR(ret));
2173
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server).flush_cache(arg))) {
2174
    LOG_WARN("request server flush cache failed", KR(ret), K(server));
2175
  }
2176
  return ret;
2177
}
2178

2179
int ObAdminSetTP::execute(const obrpc::ObAdminSetTPArg &arg)
2180
{
2181
  LOG_INFO("start execute set_tp request", K(arg));
2182
  int ret = OB_SUCCESS;
2183
  if (!ctx_.is_inited()) {
2184
    ret = OB_NOT_INIT;
2185
    LOG_WARN("not init", KR(ret));
2186
  } else if (!arg.is_valid()) {
2187
    ret = OB_INVALID_ARGUMENT;
2188
    LOG_WARN("invalid arg", K(arg), KR(ret));
2189
  } else if (OB_FAIL(call_all(arg))) {
2190
    LOG_WARN("execute report replica failed", KR(ret), K(arg));
2191
  }
2192
  LOG_INFO("end execute set_tp request", K(arg));
2193
  return ret;
2194
}
2195

2196
int ObAdminSetTP::call_server(const ObAddr &server)
2197
{
2198
  int ret = OB_SUCCESS;
2199
  if (!ctx_.is_inited()) {
2200
    ret = OB_NOT_INIT;
2201
    LOG_WARN("not init", KR(ret));
2202
  } else if (!server.is_valid()) {
2203
    ret = OB_INVALID_ARGUMENT;
2204
    LOG_WARN("invalid server", K(server), KR(ret));
2205
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server).set_tracepoint(arg_))) {
2206
    LOG_WARN("request server report replica failed", KR(ret), K(server));
2207
  }
2208
  return ret;
2209
}
2210

2211
int ObAdminSyncRewriteRules::execute(const obrpc::ObSyncRewriteRuleArg &arg)
2212
{
2213
  int ret = OB_SUCCESS;
2214
  ObSEArray<ObAddr, 8> server_list;
2215
  if (OB_FAIL(get_tenant_servers(arg.tenant_id_, server_list))) {
2216
    LOG_WARN("fail to get tenant servers", "tenant_id", arg.tenant_id_, KR(ret));
2217
  } else {
2218
    //call tenant servers;
2219
    for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
2220
      if (OB_FAIL(call_server(server_list.at(j), arg))) {
2221
        LOG_WARN("fail to call tenant server",
2222
                 "tenant_id", arg.tenant_id_,
2223
                 "server addr", server_list.at(j),
2224
                 KR(ret));
2225
      }
2226
    }
2227
  }
2228
  server_list.reset();
2229
  return ret;
2230
}
2231

2232
int ObAdminSyncRewriteRules::call_server(const common::ObAddr &server,
2233
                                         const obrpc::ObSyncRewriteRuleArg &arg)
2234
{
2235
  int ret = OB_SUCCESS;
2236
  if (!ctx_.is_inited()) {
2237
    ret = OB_NOT_INIT;
2238
    LOG_WARN("not init", KR(ret));
2239
  } else if (!server.is_valid()) {
2240
    ret = OB_INVALID_ARGUMENT;
2241
    LOG_WARN("invalid server", K(server), KR(ret));
2242
  } else if (OB_ISNULL(ctx_.rpc_proxy_)) {
2243
    ret = OB_ERR_UNEXPECTED;
2244
    LOG_WARN("get unexpected null", K(ret));
2245
  } else if (OB_FAIL(ctx_.rpc_proxy_->to(server)
2246
                                     .by(arg.tenant_id_)
2247
                                     .as(arg.tenant_id_)
2248
                                     .sync_rewrite_rules(arg))) {
2249
    LOG_WARN("request server sync rewrite rules failed", KR(ret), K(server));
2250
  }
2251
  return ret;
2252
}
2253

2254
} // end namespace rootserver
2255
} // end namespace oceanbase
2256

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.