oceanbase
2255 строк · 89.0 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#include "io/easy_connection.h"
14#include "lib/list/ob_dlist.h"
15#include "lib/ob_errno.h"
16#include "lib/string/ob_string_holder.h"
17#include "logservice/leader_coordinator/failure_event.h"
18#include "logservice/leader_coordinator/ob_failure_detector.h"
19#include "share/inner_table/ob_inner_table_schema_constants.h"
20#include "share/ob_table_access_helper.h"
21#include "share/rc/ob_tenant_base.h"
22#define USING_LOG_PREFIX RS
23
24#include "ob_system_admin_util.h"
25
26#include "lib/time/ob_time_utility.h"
27#include "lib/container/ob_array_iterator.h"
28#include "share/ob_srv_rpc_proxy.h"
29#include "share/ob_rpc_struct.h"
30#include "share/schema/ob_schema_getter_guard.h"
31#include "share/schema/ob_multi_version_schema_service.h"
32#include "share/config/ob_server_config.h"
33#include "share/config/ob_config_manager.h"
34#include "share/ob_dml_sql_splicer.h"
35#include "share/ob_cluster_version.h"
36#include "share/ob_upgrade_utils.h"
37#include "share/ob_share_util.h" // ObShareUtil
38#include "storage/ob_file_system_router.h"
39#include "observer/ob_server_struct.h"
40#include "observer/omt/ob_tenant_config_mgr.h"
41#include "observer/omt/ob_multi_tenant.h"
42#include "observer/ob_srv_network_frame.h"
43#include "ob_server_manager.h"
44#include "ob_ddl_operator.h"
45#include "ob_zone_manager.h"
46#include "ob_ddl_service.h"
47#include "ob_unit_manager.h"
48#include "ob_root_inspection.h"
49#include "ob_root_service.h"
50#include "storage/ob_file_system_router.h"
51#include "logservice/leader_coordinator/table_accessor.h"
52#include "rootserver/freeze/ob_major_freeze_helper.h"
53#include "share/ob_cluster_event_history_table_operator.h"//CLUSTER_EVENT_INSTANCE
54#include "observer/ob_service.h"
55namespace oceanbase
56{
57using namespace common;
58using namespace common::hash;
59using namespace share;
60using namespace share::schema;
61using namespace obrpc;
62
63namespace rootserver
64{
65
66int ObSystemAdminUtil::check_service() const
67{
68int ret = OB_SUCCESS;
69if (!ctx_.is_inited()) {
70ret = OB_NOT_INIT;
71LOG_WARN("not init", KR(ret));
72} else {
73ret = ctx_.rs_status_->in_service()? OB_SUCCESS : OB_CANCELED;
74}
75return ret;
76}
77
78int ObAdminSwitchReplicaRole::execute(const ObAdminSwitchReplicaRoleArg &arg)
79{
80LOG_INFO("execute switch replica role request", K(arg));
81ObArenaAllocator allocator(ObModIds::OB_RS_PARTITION_TABLE_TEMP);
82int ret = OB_SUCCESS;
83const ObLSID ls_id(arg.ls_id_);
84uint64_t tenant_id = OB_INVALID_TENANT_ID;
85ObLSInfo ls_info;
86auto get_tenant_id_by_name = [this](const ObAdminSwitchReplicaRoleArg &arg, uint64_t &tenant_id) -> int {
87int ret = OB_SUCCESS;
88ObSchemaGetterGuard schema_guard;
89ObString tenant_name;
90tenant_name.assign_ptr(arg.tenant_name_.ptr(),
91static_cast<int32_t>(strlen(arg.tenant_name_.ptr())));
92if (tenant_name.empty()) {
93tenant_id = OB_INVALID_TENANT_ID;
94} else if (OB_FAIL(ctx_.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
95LOG_WARN("get schema manager failed", KR(ret));
96} else if (OB_FAIL(schema_guard.get_tenant_id(tenant_name, tenant_id))
97|| OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
98ret = OB_TENANT_NOT_EXIST;
99LOG_WARN("tenant not exist", K(tenant_name), KR(ret));
100}
101return ret;
102};
103auto check_server_valid = [](const ObAddr &server) -> int {
104int ret = OB_SUCCESS;
105const char *columns[1] = {"status"};
106constexpr int64_t buffer_size = 128;
107char where_condition[buffer_size] = {0};
108char ip_str_buffer[buffer_size] = {0};
109if (!server.ip_to_string(ip_str_buffer, buffer_size)) {
110ret = OB_ERR_UNEXPECTED;
111LOG_WARN("ip to string failed", K(ip_str_buffer), K(server));
112} else {
113if (OB_FAIL(databuff_printf(where_condition, buffer_size, "where svr_ip='%s' and svr_port=%d", ip_str_buffer, server.get_port()))) {
114LOG_WARN("fail to create where confition", K(ip_str_buffer), K(server));
115} else {
116ObStringHolder server_status;
117if (OB_FAIL(ObTableAccessHelper::read_single_row(OB_SYS_TENANT_ID, columns, OB_ALL_SERVER_TNAME, where_condition, server_status))) {
118if (OB_ITER_END == ret) {
119ret = OB_ENTRY_NOT_EXIST;
120LOG_USER_ERROR(OB_ENTRY_NOT_EXIST, "server not in cluster");
121LOG_WARN("server not in __all_server table", K(server), KR(ret));
122} else {
123LOG_WARN("fail to read all_server table", K(server), KR(ret));
124}
125} else if (server_status.get_ob_string().compare("ACTIVE") != 0) {
126ret = OB_OP_NOT_ALLOW;
127LOG_USER_ERROR(OB_OP_NOT_ALLOW, "server not active");
128LOG_WARN("server status not valid", K(server), K(server_status));
129}
130}
131}
132return ret;
133};
134auto update_ls_election_reference_info_table = [](const ObAdminSwitchReplicaRoleArg &arg, const int64_t tenant_id, const ObLSInfo &info) -> int {
135int ret = OB_SUCCESS;
136ObSwitchLeaderArg switch_leader_arg(arg.ls_id_, arg.role_, tenant_id, arg.server_);
137const ObLSReplica *ls_replica = nullptr;
138if (switch_leader_arg.ls_id_ < 0 ||
139OB_INVALID_TENANT_ID == switch_leader_arg.tenant_id_ ||
140!switch_leader_arg.dest_server_.is_valid()) {
141ret = OB_INVALID_ARGUMENT;
142LOG_WARN("invalid argument", KR(ret), K(switch_leader_arg));
143} else if (switch_leader_arg.role_ == ObRole::LEADER) {
144logservice::coordinator::LsElectionReferenceInfoRow row(tenant_id, share::ObLSID(arg.ls_id_));
145if (OB_FAIL(row.change_manual_leader(arg.server_))) {
146LOG_WARN("fail to change manual leader in __all_ls_election_reference_info", K(ret), K(arg));
147} else {
148LOG_INFO("successfully to change manual leader in __all_ls_election_reference_info", K(ret), K(arg));
149}
150} else if (switch_leader_arg.role_ == ObRole::FOLLOWER) {
151logservice::coordinator::LsElectionReferenceInfoRow row(tenant_id, share::ObLSID(arg.ls_id_));
152if (OB_FAIL(row.add_server_to_blacklist(arg.server_, logservice::coordinator::InsertElectionBlacklistReason::SWITCH_REPLICA))) {
153LOG_WARN("fail to add remove member info in __all_ls_election_reference_info", K(ret), K(arg));
154} else {
155LOG_INFO("successfully to add remove member info in __all_ls_election_reference_info", K(ret), K(arg));
156}
157} else if (switch_leader_arg.role_ == ObRole::INVALID_ROLE) {
158logservice::coordinator::LsElectionReferenceInfoRow row(tenant_id, share::ObLSID(arg.ls_id_));
159if (OB_FAIL(row.change_manual_leader(ObAddr()))) {
160LOG_WARN("fail to change manual leader in __all_ls_election_reference_info", K(ret), K(arg));
161} else if (OB_FAIL(row.delete_server_from_blacklist(arg.server_))) {
162if (OB_ENTRY_NOT_EXIST != ret) {
163LOG_WARN("fail to del remove member info in __all_ls_election_reference_info", K(ret), K(arg));
164} else {
165ret = OB_SUCCESS;
166}
167}
168if (OB_SUCC(ret)) {
169LOG_INFO("successfully to reset server status in __all_ls_election_reference_info", K(ret), K(arg));
170}
171}
172return ret;
173};
174if (!ctx_.is_inited()) {
175ret = OB_NOT_INIT;
176LOG_WARN("not init", KR(ret));
177} else if (!arg.is_valid()) {
178ret = OB_INVALID_ARGUMENT;
179LOG_WARN("invalid arg", K(arg), KR(ret));
180} else if (!ls_id.is_valid()) {// 表示需要改变server上或者zone中所有日志流的状态
181ret = OB_NOT_SUPPORTED;
182LOG_USER_ERROR(OB_NOT_SUPPORTED, "switch server's role or zone's role");
183} else if (!arg.server_.is_valid()) {
184ret = OB_INVALID_ARGUMENT;
185LOG_WARN("server must set", K(arg), KR(ret));
186} else if (OB_FAIL(check_server_valid(arg.server_))) {
187LOG_WARN("check server valid state failed", K(arg), KR(ret));
188} else if (OB_FAIL(get_tenant_id_by_name(arg, tenant_id))) {
189if (OB_ENTRY_NOT_EXIST == ret) {
190LOG_USER_ERROR(OB_ENTRY_NOT_EXIST, "invalid tenant");
191}
192LOG_WARN("fail to convert tenant name to id", K(arg), KR(ret));
193} else if (OB_ISNULL(GCTX.lst_operator_)) {
194ret = OB_ERR_UNEXPECTED;
195LOG_WARN("GCTX.lst_operator_ is NULL", K(arg), KR(ret), K(tenant_id));
196} else if (OB_FAIL(GCTX.lst_operator_->get(GCONF.cluster_id, tenant_id,
197ls_id, share::ObLSTable::DEFAULT_MODE, ls_info))) {
198LOG_WARN("get ls info from GCTX.lst_operator_ failed", K(arg), KR(ret), K(tenant_id));
199} else if (OB_FAIL(update_ls_election_reference_info_table(arg, tenant_id, ls_info))) {
200LOG_WARN("fail to update ls election reference info", K(arg), KR(ret), K(tenant_id));
201} else {
202int tmp_ret = OB_SUCCESS;//ignore ret
203if (OB_TMP_FAIL(ObRootUtils::try_notify_switch_ls_leader(ctx_.rpc_proxy_, ls_info,
204obrpc::ObNotifySwitchLeaderArg::SwitchLeaderComment::MANUAL_SWITCH))) {
205LOG_WARN("failed to notify switch ls leader", KR(ret), K(ls_info));
206}
207}
208LOG_INFO("switch leader done", KR(ret), K(arg), K(tenant_id), K(ls_info));
209return ret;
210}
211
212int ObAdminSwitchReplicaRole::alloc_tenant_id_set(common::hash::ObHashSet<uint64_t> &tenant_id_set)
213{
214int ret = OB_SUCCESS;
215if (tenant_id_set.created()) {
216if(OB_FAIL(tenant_id_set.clear())) {
217LOG_WARN("clear tenant id set failed", KR(ret));
218}
219} else if (OB_FAIL(tenant_id_set.create(TENANT_BUCKET_NUM))) {
220LOG_WARN("create tenant id set failed", LITERAL_K(TENANT_BUCKET_NUM), KR(ret));
221}
222return ret;
223}
224
225template<typename T>
226int ObAdminSwitchReplicaRole::convert_set_to_array(const common::hash::ObHashSet<T> &set,
227ObArray<T> &array)
228{
229int ret = common::OB_SUCCESS;
230array.reuse();
231if (!set.created()) {
232ret = OB_INVALID_ARGUMENT;
233LOG_WARN("set not created", "set created", set.created(), KR(ret));
234} else if (OB_FAIL(array.reserve(set.size()))) {
235LOG_WARN("array reserver failed", "capacity", set.size(), KR(ret));
236} else {
237for (typename common::hash::ObHashSet<T>::const_iterator iter = set.begin();
238OB_SUCCESS == ret && iter != set.end(); ++iter) {
239if (OB_FAIL(array.push_back(iter->first))) {
240LOG_WARN("push_back failed", KR(ret));
241}
242}
243}
244return ret;
245}
246
247int ObAdminSwitchReplicaRole::get_tenants_of_zone(const ObZone &zone,
248common::hash::ObHashSet<uint64_t> &tenant_id_set)
249{
250int ret = OB_SUCCESS;
251ObArray<ObAddr> server_array;
252if (!ctx_.is_inited()) {
253ret = OB_NOT_INIT;
254LOG_WARN("not init", KR(ret));
255} else if (zone.is_empty() || !tenant_id_set.created()) {
256ret = OB_INVALID_ARGUMENT;
257LOG_WARN("invalid argument", K(zone),
258"tenant_id_set created", tenant_id_set.created(), KR(ret));
259} else if (OB_FAIL(SVR_TRACER.get_alive_servers(zone, server_array))) {
260LOG_WARN("get alive servers failed", K(zone), KR(ret));
261} else {
262FOREACH_CNT_X(server, server_array, OB_SUCCESS == ret) {
263if (OB_FAIL(ctx_.unit_mgr_->get_tenants_of_server(*server, tenant_id_set))) {
264LOG_WARN("get tenants of server failed", "server", *server, KR(ret));
265}
266}
267}
268
269return ret;
270}
271
272int ObAdminSwitchReplicaRole::get_switch_replica_tenants(const ObZone &zone, const ObAddr &server,
273const uint64_t &tenant_id, ObArray<uint64_t> &tenant_ids)
274{
275int ret = OB_SUCCESS;
276if (!ctx_.is_inited()) {
277ret = OB_NOT_INIT;
278LOG_WARN("not init", KR(ret));
279} else if (zone.is_empty() && !server.is_valid() && OB_INVALID_ID == tenant_id) {
280ret = OB_INVALID_ARGUMENT;
281LOG_WARN("zone, server and tenant_id are all invalid",
282K(zone), K(server), K(tenant_id), KR(ret));
283} else if (OB_INVALID_ID != tenant_id) {
284if (OB_FAIL(tenant_ids.push_back(tenant_id))) {
285LOG_WARN("push back tenant id failed", KR(ret));
286}
287} else if (server.is_valid() || !zone.is_empty()) {
288ObHashSet<uint64_t> tenant_id_set;
289if (OB_FAIL(alloc_tenant_id_set(tenant_id_set))) {
290LOG_WARN("alloc tenant id set failed", KR(ret));
291} else {
292if (server.is_valid()) {
293if (OB_FAIL(ctx_.unit_mgr_->get_tenants_of_server(server, tenant_id_set))) {
294LOG_WARN("get tenants of server failed", K(server), KR(ret));
295}
296} else {
297if (OB_FAIL(get_tenants_of_zone(zone, tenant_id_set))) {
298LOG_WARN("get tenants of zone failed", K(zone), KR(ret));
299}
300}
301}
302if (OB_SUCC(ret)) {
303if (OB_FAIL(convert_set_to_array(tenant_id_set, tenant_ids))) {
304LOG_WARN("convert set to array failed", KR(ret));
305}
306}
307}
308
309return ret;
310}
311
312int ObAdminCallServer::get_server_list(const ObServerZoneArg &arg, ObIArray<ObAddr> &server_list)
313{
314int ret = OB_SUCCESS;
315server_list.reset();
316if (!ctx_.is_inited()) {
317ret = OB_NOT_INIT;
318LOG_WARN("not init", KR(ret));
319} else if (!arg.is_valid()) {
320ret = OB_INVALID_ARGUMENT;
321LOG_WARN("invalid arg", K(arg), KR(ret));
322} else if (arg.server_.is_valid()) {
323bool is_alive = false;
324if (OB_FAIL(SVR_TRACER.check_server_alive(arg.server_, is_alive))) {
325LOG_WARN("fail to check server alive", KR(ret), "server", arg.server_);
326} else if (!is_alive) {
327ret = OB_INVALID_ARGUMENT;
328LOG_WARN("server is not alive", KR(ret), "server", arg.server_);
329} else if (OB_FAIL(server_list.push_back(arg.server_))) {
330LOG_WARN("push back server failed", KR(ret));
331}
332} else {
333bool zone_exist = true;
334if (!arg.zone_.is_empty() && OB_FAIL(ctx_.zone_mgr_->check_zone_exist(arg.zone_, zone_exist))) {
335LOG_WARN("fail to check zone exist", KR(ret));
336} else if (!zone_exist) {
337ret = OB_ZONE_INFO_NOT_EXIST;
338LOG_WARN("zone info not exist", KR(ret), K(arg.zone_));
339} else if (OB_FAIL(SVR_TRACER.get_alive_servers(arg.zone_, server_list))) {
340LOG_WARN("get alive servers failed", KR(ret), K(arg));
341}
342}
343return ret;
344}
345
346int ObAdminCallServer::call_all(const ObServerZoneArg &arg)
347{
348int ret = OB_SUCCESS;
349ObArray<ObAddr> server_list;
350if (OB_FAIL(get_server_list(arg, server_list))) {
351LOG_WARN("get server list failed", K(ret), K(arg));
352} else {
353FOREACH_CNT(server, server_list) {
354int tmp_ret = call_server(*server);
355if (OB_SUCCESS != tmp_ret) {
356LOG_WARN("call server failed", KR(ret), "server", *server);
357ret = OB_SUCCESS == ret ? tmp_ret : ret;
358}
359}
360}
361return ret;
362}
363
364int ObAdminReportReplica::execute(const obrpc::ObAdminReportReplicaArg &arg)
365{
366LOG_INFO("execute report request", K(arg));
367int ret = OB_SUCCESS;
368if (!ctx_.is_inited()) {
369ret = OB_NOT_INIT;
370LOG_WARN("not init", KR(ret));
371} else if (!arg.is_valid()) {
372ret = OB_INVALID_ARGUMENT;
373LOG_WARN("invalid arg", K(arg), KR(ret));
374} else if (OB_FAIL(call_all(arg))) {
375LOG_WARN("execute report replica failed", KR(ret), K(arg));
376}
377return ret;
378}
379
380int ObAdminReportReplica::call_server(const ObAddr &server)
381{
382int ret = OB_SUCCESS;
383if (!ctx_.is_inited()) {
384ret = OB_NOT_INIT;
385LOG_WARN("not init", KR(ret));
386} else if (!server.is_valid()) {
387ret = OB_INVALID_ARGUMENT;
388LOG_WARN("invalid server", K(server), KR(ret));
389} else if (OB_FAIL(ctx_.rpc_proxy_->to(server).report_replica())) {
390LOG_WARN("request server report replica failed", KR(ret), K(server));
391}
392return ret;
393}
394
395int ObAdminRecycleReplica::execute(const obrpc::ObAdminRecycleReplicaArg &arg)
396{
397LOG_INFO("execute recycle request", K(arg));
398int ret = OB_SUCCESS;
399if (!ctx_.is_inited()) {
400ret = OB_NOT_INIT;
401LOG_WARN("not init", KR(ret));
402} else if (!arg.is_valid()) {
403ret = OB_INVALID_ARGUMENT;
404LOG_WARN("invalid arg", K(arg), KR(ret));
405} else if (OB_FAIL(call_all(arg))) {
406LOG_WARN("execute recycle replica failed", KR(ret), K(arg));
407}
408return ret;
409}
410
411int ObAdminRecycleReplica::call_server(const ObAddr &server)
412{
413int ret = OB_SUCCESS;
414if (!ctx_.is_inited()) {
415ret = OB_NOT_INIT;
416LOG_WARN("not init", KR(ret));
417} else if (!server.is_valid()) {
418ret = OB_INVALID_ARGUMENT;
419LOG_WARN("invalid server", K(server), KR(ret));
420} else if (OB_FAIL(ctx_.rpc_proxy_->to(server).recycle_replica())) {
421LOG_WARN("request server recycle replica failed", KR(ret), K(server));
422}
423return ret;
424}
425
426int ObAdminClearLocationCache::execute(const obrpc::ObAdminClearLocationCacheArg &arg)
427{
428LOG_INFO("execute clear location cache request", K(arg));
429int ret = OB_SUCCESS;
430if (!ctx_.is_inited()) {
431ret = OB_NOT_INIT;
432LOG_WARN("not init", KR(ret));
433} else if (!arg.is_valid()) {
434ret = OB_INVALID_ARGUMENT;
435LOG_WARN("invalid arg", K(arg), KR(ret));
436} else if (OB_FAIL(call_all(arg))) {
437LOG_WARN("execute clear location cache failed", KR(ret), K(arg));
438}
439return ret;
440}
441
442int ObAdminClearLocationCache::call_server(const ObAddr &server)
443{
444int ret = OB_SUCCESS;
445if (!ctx_.is_inited()) {
446ret = OB_NOT_INIT;
447LOG_WARN("not init", KR(ret));
448} else if (!server.is_valid()) {
449ret = OB_INVALID_ARGUMENT;
450LOG_WARN("invalid server", K(server), KR(ret));
451} else if (OB_FAIL(ctx_.rpc_proxy_->to(server).clear_location_cache())) {
452LOG_WARN("request clear location cache failed", KR(ret), K(server));
453}
454return ret;
455}
456
457int ObAdminReloadUnit::execute()
458{
459LOG_INFO("execute reload unit request");
460int ret = OB_SUCCESS;
461if (!ctx_.is_inited()) {
462ret = OB_NOT_INIT;
463LOG_WARN("not init", KR(ret));
464} else if (OB_FAIL(ctx_.unit_mgr_->load())) {
465LOG_WARN("unit manager load failed", KR(ret));
466}
467LOG_INFO("finish execute reload unit request", KR(ret));
468return ret;
469}
470
471int ObAdminReloadServer::execute()
472{
473LOG_INFO("execute reload server request");
474int ret = OB_SUCCESS;
475if (!ctx_.is_inited()) {
476ret = OB_NOT_INIT;
477LOG_WARN("not init", KR(ret));
478} else if (OB_ISNULL(ctx_.server_mgr_)) {
479ret = OB_ERR_UNEXPECTED;
480LOG_WARN("ctx_.server_mgr_ is null", KR(ret), KP(ctx_.server_mgr_));
481} else if (OB_FAIL(ctx_.server_mgr_->load_server_manager())) {
482LOG_WARN("build server status failed", KR(ret));
483}
484return ret;
485}
486
487int ObAdminReloadZone::execute()
488{
489LOG_INFO("execute reload zone request");
490int ret = OB_SUCCESS;
491if (!ctx_.is_inited()) {
492ret = OB_NOT_INIT;
493LOG_WARN("not init", KR(ret));
494} else if (OB_FAIL(ctx_.zone_mgr_->reload())) {
495LOG_ERROR("zone manager reload failed", KR(ret));
496}
497return ret;
498}
499
500int ObAdminClearMergeError::execute(const obrpc::ObAdminMergeArg &arg)
501{
502LOG_INFO("execute clear merge error request", K(arg));
503int ret = OB_SUCCESS;
504if (!ctx_.is_inited()) {
505ret = OB_NOT_INIT;
506LOG_WARN("not init", KR(ret));
507} else if (!arg.is_valid()) {
508ret = OB_INVALID_ARGUMENT;
509LOG_WARN("invalid arg", K(arg), KR(ret));
510} else {
511ObTenantAdminMergeParam param;
512param.transport_ = GCTX.net_frame_->get_req_transport();
513if (arg.affect_all_ || arg.affect_all_user_ || arg.affect_all_meta_) {
514if ((true == arg.affect_all_ && true == arg.affect_all_user_) ||
515(true == arg.affect_all_ && true == arg.affect_all_meta_) ||
516(true == arg.affect_all_user_ && true == arg.affect_all_meta_)) {
517ret = OB_ERR_UNEXPECTED;
518LOG_WARN("only one of affect_all,affect_all_user,affect_all_meta can be true",
519KR(ret), "affect_all", arg.affect_all_, "affect_all_user",
520arg.affect_all_user_, "affect_all_meta", arg.affect_all_meta_);
521} else {
522if (arg.affect_all_) {
523param.need_all_ = true;
524} else if (arg.affect_all_user_) {
525param.need_all_user_ = true;
526} else {
527param.need_all_meta_ = true;
528}
529}
530} else if (OB_FAIL(param.tenant_array_.assign(arg.tenant_ids_))) {
531LOG_WARN("fail to assign tenant_ids", KR(ret), K(arg));
532}
533if (FAILEDx(ObMajorFreezeHelper::clear_merge_error(param))) {
534LOG_WARN("fail to clear merge error", KR(ret), K(param));
535}
536}
537return ret;
538}
539
540int ObAdminZoneFastRecovery::execute(const obrpc::ObAdminRecoveryArg &arg)
541{
542LOG_INFO("execute zone fast recovery admin request", K(arg));
543int ret = OB_SUCCESS;
544if (!ctx_.is_inited()) {
545ret = OB_NOT_INIT;
546LOG_WARN("not init", KR(ret));
547} else if (!arg.is_valid()) {
548ret = OB_INVALID_ARGUMENT;
549LOG_WARN("invalid arg", K(arg), KR(ret));
550} else {
551switch (arg.type_) {
552case ObAdminRecoveryArg::SUSPEND_RECOVERY:
553if (OB_FAIL(ctx_.zone_mgr_->update_recovery_status(
554arg.zone_, share::ObZoneInfo::RECOVERY_STATUS_SUSPEND))) {
555LOG_WARN("fail to update zone fast recovery status", KR(ret));
556}
557break;
558case ObAdminRecoveryArg::RESUME_RECOVERY:
559if (OB_FAIL(ctx_.zone_mgr_->update_recovery_status(
560arg.zone_, share::ObZoneInfo::RECOVERY_STATUS_NORMAL))) {
561LOG_WARN("fail to update zone fast recovery status", KR(ret));
562}
563break;
564default:
565ret = OB_ERR_UNEXPECTED;
566LOG_WARN("arg type unexpected", KR(ret), "type", arg.type_);
567break;
568}
569}
570return ret;
571}
572
573int ObAdminMerge::execute(const obrpc::ObAdminMergeArg &arg)
574{
575LOG_INFO("execute merge admin request", K(arg));
576int ret = OB_SUCCESS;
577if (!ctx_.is_inited()) {
578ret = OB_NOT_INIT;
579LOG_WARN("not init", KR(ret));
580} else if (!arg.is_valid()) {
581ret = OB_INVALID_ARGUMENT;
582LOG_WARN("invalid arg", K(arg), KR(ret));
583} else {
584switch(arg.type_) {
585case ObAdminMergeArg::START_MERGE: {
586/* if (OB_FAIL(ctx_.daily_merge_scheduler_->manual_start_merge(arg.zone_))) {
587LOG_WARN("start merge zone failed", K(ret), K(arg));
588}*/
589break;
590}
591case ObAdminMergeArg::SUSPEND_MERGE: {
592ObTenantAdminMergeParam param;
593param.transport_ = GCTX.net_frame_->get_req_transport();
594if (arg.affect_all_ || arg.affect_all_user_ || arg.affect_all_meta_) {
595if ((true == arg.affect_all_ && true == arg.affect_all_user_) ||
596(true == arg.affect_all_ && true == arg.affect_all_meta_) ||
597(true == arg.affect_all_user_ && true == arg.affect_all_meta_)) {
598ret = OB_ERR_UNEXPECTED;
599LOG_WARN("only one of affect_all,affect_all_user,affect_all_meta can be true",
600KR(ret), "affect_all", arg.affect_all_, "affect_all_user",
601arg.affect_all_user_, "affect_all_meta", arg.affect_all_meta_);
602} else {
603if (arg.affect_all_) {
604param.need_all_ = true;
605} else if (arg.affect_all_user_) {
606param.need_all_user_ = true;
607} else {
608param.need_all_meta_ = true;
609}
610}
611} else if (OB_FAIL(param.tenant_array_.assign(arg.tenant_ids_))) {
612LOG_WARN("fail to assign tenant_ids", KR(ret), K(arg));
613}
614if (FAILEDx(ObMajorFreezeHelper::suspend_merge(param))) {
615LOG_WARN("fail to suspend merge", KR(ret), K(param));
616}
617break;
618}
619case ObAdminMergeArg::RESUME_MERGE: {
620ObTenantAdminMergeParam param;
621param.transport_ = GCTX.net_frame_->get_req_transport();
622if (arg.affect_all_ || arg.affect_all_user_ || arg.affect_all_meta_) {
623if ((true == arg.affect_all_ && true == arg.affect_all_user_) ||
624(true == arg.affect_all_ && true == arg.affect_all_meta_) ||
625(true == arg.affect_all_user_ && true == arg.affect_all_meta_)) {
626ret = OB_ERR_UNEXPECTED;
627LOG_WARN("only one of affect_all,affect_all_user,affect_all_meta can be true",
628KR(ret), "affect_all", arg.affect_all_, "affect_all_user",
629arg.affect_all_user_, "affect_all_meta", arg.affect_all_meta_);
630} else {
631if (arg.affect_all_) {
632param.need_all_ = true;
633} else if (arg.affect_all_user_) {
634param.need_all_user_ = true;
635} else {
636param.need_all_meta_ = true;
637}
638}
639} else if (OB_FAIL(param.tenant_array_.assign(arg.tenant_ids_))) {
640LOG_WARN("fail to assign tenant_ids", KR(ret), K(arg));
641}
642if (FAILEDx(ObMajorFreezeHelper::resume_merge(param))) {
643LOG_WARN("fail to resume merge", KR(ret), K(param));
644}
645break;
646}
647default: {
648ret = OB_NOT_SUPPORTED;
649LOG_WARN("unsupported merge admin type", "type", arg.type_, KR(ret));
650}
651}
652}
653return ret;
654}
655
656int ObAdminClearRoottable::execute(const obrpc::ObAdminClearRoottableArg &arg)
657{
658int ret = OB_NOT_SUPPORTED;
659UNUSED(arg);
660return ret;
661}
662
663//FIXME: flush schemas of all tenants
664int ObAdminRefreshSchema::execute(const obrpc::ObAdminRefreshSchemaArg &arg)
665{
666LOG_INFO("execute refresh schema", K(arg));
667int ret = OB_SUCCESS;
668if (!ctx_.is_inited()) {
669ret = OB_NOT_INIT;
670LOG_WARN("not init", KR(ret));
671} else if (!arg.is_valid()) {
672ret = OB_INVALID_ARGUMENT;
673LOG_WARN("invalid arg", K(arg), KR(ret));
674} else if (OB_FAIL(ctx_.ddl_service_->refresh_schema(OB_SYS_TENANT_ID))) {
675LOG_WARN("refresh schema failed", KR(ret));
676} else {
677if (OB_FAIL(ctx_.schema_service_->get_tenant_schema_version(OB_SYS_TENANT_ID, schema_version_))) {
678LOG_WARN("fail to get schema version", KR(ret));
679} else if (OB_FAIL(ctx_.schema_service_->get_refresh_schema_info(schema_info_))) {
680LOG_WARN("fail to get refresh schema info", KR(ret), K(schema_info_));
681} else if (!schema_info_.is_valid()) {
682schema_info_.set_schema_version(schema_version_);
683}
684if (OB_FAIL(ret)) {
685} else if (OB_FAIL(call_all(arg))) {
686LOG_WARN("execute notify refresh schema failed", KR(ret), K(arg));
687}
688}
689return ret;
690}
691
692int ObAdminRefreshSchema::call_server(const ObAddr &server)
693{
694int ret = OB_SUCCESS;
695ObTimeoutCtx ctx;
696if (OB_UNLIKELY(!ctx_.is_inited())) {
697ret = OB_NOT_INIT;
698LOG_WARN("not init", KR(ret));
699} else if (OB_UNLIKELY(!server.is_valid())) {
700ret = OB_INVALID_ARGUMENT;
701LOG_WARN("invalid server", KR(ret), K(server));
702} else if (OB_ISNULL(GCTX.srv_rpc_proxy_)) {
703ret = OB_ERR_UNEXPECTED;
704LOG_WARN("GCTX.srv_rpc_proxy_ is null", KR(ret));
705} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
706LOG_WARN("fail to set timeout ctx", KR(ret));
707} else {
708ObSwitchSchemaArg arg;
709arg.schema_info_ = schema_info_;
710ObArray<int> return_code_array;
711ObSwitchSchemaProxy proxy(*GCTX.srv_rpc_proxy_, &ObSrvRpcProxy::switch_schema);
712int tmp_ret = OB_SUCCESS;
713const int64_t timeout_ts = ctx.get_timeout(0);
714if (OB_FAIL(proxy.call(server, timeout_ts, arg))) {
715LOG_WARN("notify switch schema failed", KR(ret), K(server), K_(schema_version), K_(schema_info));
716}
717if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
718ret = OB_SUCC(ret) ? tmp_ret : ret;
719LOG_WARN("fail to wait all", KR(ret), KR(tmp_ret), K(server));
720} else if (OB_FAIL(ret)) {
721} else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) {
722LOG_WARN("fail to check return cnt", KR(ret), K(server), "return_cnt", return_code_array.count());
723} else if (OB_UNLIKELY(1 != return_code_array.count())) {
724ret = OB_ERR_UNEXPECTED;
725LOG_WARN("return_code_array count shoud be 1", KR(ret), K(server), "return_cnt", return_code_array.count());
726} else {
727ret = return_code_array.at(0);
728}
729}
730return ret;
731}
732
733int ObAdminRefreshMemStat::execute(const ObAdminRefreshMemStatArg &arg)
734{
735LOG_INFO("execute refresh memory stat");
736int ret = OB_SUCCESS;
737if (!ctx_.is_inited()) {
738ret = OB_NOT_INIT;
739LOG_WARN("not init", KR(ret));
740} else if (OB_FAIL(call_all(arg))) {
741LOG_WARN("execute notify refresh memory stat failed", KR(ret));
742}
743return ret;
744}
745
746int ObAdminRefreshMemStat::call_server(const ObAddr &server)
747{
748int ret = OB_SUCCESS;
749if (!ctx_.is_inited()) {
750ret = OB_NOT_INIT;
751LOG_WARN("not init", KR(ret));
752} else if (!server.is_valid()) {
753ret = OB_INVALID_ARGUMENT;
754LOG_WARN("invalid server", K(server), KR(ret));
755} else if (OB_FAIL(ctx_.rpc_proxy_->to(server).refresh_memory_stat())) {
756LOG_WARN("notify refresh memory stat failed", KR(ret), K(server));
757}
758return ret;
759}
760
761int ObAdminWashMemFragmentation::execute(const ObAdminWashMemFragmentationArg &arg)
762{
763LOG_INFO("execute sync wash fragment");
764int ret = OB_SUCCESS;
765if (!ctx_.is_inited()) {
766ret = OB_NOT_INIT;
767LOG_WARN("not init", K(ret));
768} else if (OB_FAIL(call_all(arg))) {
769LOG_WARN("execute notify sync wash fragment failed", K(ret));
770}
771return ret;
772}
773
774int ObAdminWashMemFragmentation::call_server(const ObAddr &server)
775{
776int ret = OB_SUCCESS;
777if (!ctx_.is_inited()) {
778ret = OB_NOT_INIT;
779LOG_WARN("not init", K(ret));
780} else if (!server.is_valid()) {
781ret = OB_INVALID_ARGUMENT;
782LOG_WARN("invalid server", K(server), K(ret));
783} else if (OB_FAIL(ctx_.rpc_proxy_->to(server).wash_memory_fragmentation())) {
784LOG_WARN("notify sync wash fragment failed", K(ret), K(server));
785}
786return ret;
787}
788
789int ObAdminSetConfig::verify_config(obrpc::ObAdminSetConfigArg &arg)
790{
791int ret = OB_SUCCESS;
792void *ptr = nullptr, *cfg_ptr = nullptr;
793ObServerConfigChecker *cfg = nullptr;
794ObTenantConfigChecker *tenant_cfg = nullptr;
795if (!ctx_.is_inited()) {
796ret = OB_NOT_INIT;
797LOG_WARN("not init", KR(ret));
798} else if (!arg.is_valid()) {
799ret = OB_INVALID_ARGUMENT;
800LOG_WARN("invalid arg", K(arg), KR(ret));
801}
802FOREACH_X(item, arg.items_, OB_SUCCESS == ret) {
803if (item->name_.is_empty()) {
804ret = OB_INVALID_ARGUMENT;
805LOG_WARN("empty config name", "item", *item, KR(ret));
806} else {
807ObConfigItem *ci = nullptr;
808if (OB_SYS_TENANT_ID != item->exec_tenant_id_ || item->tenant_name_.size() > 0) {
809// tenants(user or sys tenants) modify tenant level configuration
810item->want_to_set_tenant_config_ = true;
811if (nullptr == tenant_cfg) {
812if (OB_ISNULL(cfg_ptr = ob_malloc(sizeof(ObTenantConfigChecker),
813ObModIds::OB_RS_PARTITION_TABLE_TEMP))) {
814ret = OB_ALLOCATE_MEMORY_FAILED;
815LOG_WARN("fail to alloc memory", KR(ret));
816} else if (OB_ISNULL(tenant_cfg = new (cfg_ptr) ObTenantConfigChecker())) {
817ret = OB_ERR_UNEXPECTED;
818LOG_WARN("new tenant_cfg failed", KR(ret));
819}
820} // if
821
822if (OB_SUCC(ret)) {
823ObConfigItem * const *ci_ptr = tenant_cfg->get_container().get(
824ObConfigStringKey(item->name_.ptr()));
825if (OB_ISNULL(ci_ptr)) {
826ret = OB_ERR_SYS_CONFIG_UNKNOWN;
827LOG_WARN("can't found config item", KR(ret), "item", *item);
828} else {
829ci = *ci_ptr;
830share::schema::ObSchemaGetterGuard schema_guard;
831const char *const NAME_ALL = "all";
832const char *const NAME_ALL_USER = "all_user";
833const char *const NAME_ALL_META = "all_meta";
834if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(OB_SYS_TENANT_ID, schema_guard))) {
835LOG_WARN("get_schema_guard failed", KR(ret));
836} else if (OB_SYS_TENANT_ID == item->exec_tenant_id_ &&
837(0 == item->tenant_name_.str().case_compare(NAME_ALL) ||
8380 == item->tenant_name_.str().case_compare(NAME_ALL_USER) ||
8390 == item->tenant_name_.str().case_compare(NAME_ALL_META))) {
840common::ObArray<uint64_t> tenant_ids;
841if (OB_FAIL(schema_guard.get_tenant_ids(tenant_ids))) {
842LOG_WARN("get_tenant_ids failed", KR(ret));
843} else {
844using FUNC_TYPE = bool (*) (const uint64_t);
845FUNC_TYPE condition_func = nullptr;
846if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_1_0) {
847if (0 == item->tenant_name_.str().case_compare(NAME_ALL_USER) ||
8480 == item->tenant_name_.str().case_compare(NAME_ALL_META)) {
849ret = OB_NOT_SUPPORTED;
850LOG_WARN("all_user/all_meta are not supported when min_cluster_version is less than 4.2.1.0",
851KR(ret), "tenant_name", item->tenant_name_);
852} else {
853condition_func = is_not_virtual_tenant_id;
854}
855} else {
856if (0 == item->tenant_name_.str().case_compare(NAME_ALL) ||
8570 == item->tenant_name_.str().case_compare(NAME_ALL_USER)) {
858condition_func = is_user_tenant;
859} else {
860condition_func = is_meta_tenant;
861}
862}
863if (OB_SUCC(ret) && (nullptr != condition_func)) {
864for (const uint64_t tenant_id: tenant_ids) {
865if (condition_func(tenant_id) &&
866OB_FAIL(item->tenant_ids_.push_back(tenant_id))) {
867LOG_WARN("add tenant_id failed", K(tenant_id), KR(ret));
868break;
869}
870} // for
871}
872}
873} else if (OB_SYS_TENANT_ID == item->exec_tenant_id_
874&& item->tenant_name_ == ObFixedLengthString<common::OB_MAX_TENANT_NAME_LENGTH + 1>("seed")) {
875uint64_t tenant_id = OB_PARAMETER_SEED_ID;
876if (OB_FAIL(item->tenant_ids_.push_back(tenant_id))) {
877LOG_WARN("add seed tenant_id failed", KR(ret));
878break;
879}
880} else {
881uint64_t tenant_id = OB_INVALID_TENANT_ID;
882if (OB_SYS_TENANT_ID != item->exec_tenant_id_) {
883tenant_id = item->exec_tenant_id_;
884} else {
885if (OB_FAIL(schema_guard.get_tenant_id(
886ObString(item->tenant_name_.ptr()), tenant_id))
887|| OB_INVALID_ID == tenant_id) {
888ret = OB_ERR_INVALID_TENANT_NAME;
889LOG_WARN("get_tenant_id failed", KR(ret), "tenant", item->tenant_name_);
890}
891}
892if (OB_SUCC(ret) && OB_FAIL(item->tenant_ids_.push_back(tenant_id))) {
893LOG_WARN("add tenant_id failed", K(tenant_id), KR(ret));
894}
895} // else
896} // else
897} // if
898} else {
899// sys tenant try to modify configration(cluster level or sys tenant level)
900if (nullptr == cfg) {
901if (OB_ISNULL(ptr = ob_malloc(sizeof(ObServerConfigChecker),
902ObModIds::OB_RS_PARTITION_TABLE_TEMP))) {
903ret = OB_ALLOCATE_MEMORY_FAILED;
904LOG_WARN("fail to alloc memory", KR(ret));
905} else if (OB_ISNULL(cfg = new (ptr) ObServerConfigChecker)) {
906ret = OB_ERR_UNEXPECTED;
907LOG_WARN("new cfg failed", KR(ret));
908}
909} // if
910if (OB_SUCC(ret) && nullptr == tenant_cfg) {
911if (OB_ISNULL(cfg_ptr = ob_malloc(sizeof(ObTenantConfigChecker),
912ObModIds::OB_RS_PARTITION_TABLE_TEMP))) {
913ret = OB_ALLOCATE_MEMORY_FAILED;
914LOG_WARN("fail to alloc memory", KR(ret));
915} else if (OB_ISNULL(tenant_cfg = new (cfg_ptr) ObTenantConfigChecker())) {
916ret = OB_ERR_UNEXPECTED;
917LOG_WARN("new tenant_cfg failed", KR(ret));
918}
919} // if
920
921if (OB_SUCC(ret)) {
922ObConfigItem * const *sys_ci_ptr = cfg->get_container().get(
923ObConfigStringKey(item->name_.ptr()));
924ObConfigItem * const *tenant_ci_ptr = tenant_cfg->get_container().get(
925ObConfigStringKey(item->name_.ptr()));
926if (OB_NOT_NULL(sys_ci_ptr)) {
927ci = *sys_ci_ptr;
928} else if (OB_NOT_NULL(tenant_ci_ptr)) {
929ci = *tenant_ci_ptr;
930item->want_to_set_tenant_config_ = true;
931if (OB_FAIL(item->tenant_ids_.push_back(OB_SYS_TENANT_ID))) {
932LOG_WARN("add tenant_id failed", KR(ret));
933}
934} else {
935ret = OB_ERR_SYS_CONFIG_UNKNOWN;
936LOG_WARN("can't found config item", KR(ret), "item", *item);
937}
938} // if
939} // else
940
941if (OB_SUCC(ret)) {
942const char *err = NULL;
943if (ci->is_not_editable() && !arg.is_inner_) {
944ret = OB_INVALID_CONFIG; //TODO: specific report not editable
945LOG_WARN("config is not editable", "item", *item, KR(ret));
946} else if (!ci->check_unit(item->value_.ptr())) {
947ret = OB_INVALID_CONFIG;
948LOG_ERROR("invalid config", "item", *item, KR(ret));
949} else if (!ci->set_value(item->value_.ptr())) {
950ret = OB_INVALID_CONFIG;
951LOG_WARN("invalid config", "item", *item, KR(ret));
952} else if (!ci->check()) {
953ret = OB_INVALID_CONFIG;
954LOG_WARN("invalid value range", "item", *item, KR(ret));
955} else if (!ctx_.root_service_->check_config(*ci, err)) {
956ret = OB_INVALID_CONFIG;
957LOG_WARN("invalid value range", "item", *item, KR(ret));
958}
959if (OB_FAIL(ret)) {
960if (nullptr != err) {
961LOG_USER_ERROR(OB_INVALID_CONFIG, err);
962}
963}
964} // if
965} // else
966} // FOREACH_X
967
968if (nullptr != cfg) {
969cfg->~ObServerConfigChecker();
970ob_free(cfg);
971cfg = nullptr;
972ptr = nullptr;
973} else if (nullptr != ptr) {
974ob_free(ptr);
975ptr = nullptr;
976}
977if (nullptr != tenant_cfg) {
978tenant_cfg->~ObTenantConfigChecker();
979ob_free(tenant_cfg);
980tenant_cfg = nullptr;
981cfg_ptr = nullptr;
982} else if (nullptr != cfg_ptr) {
983ob_free(cfg_ptr);
984cfg_ptr = nullptr;
985}
986return ret;
987}
988
989int ObAdminSetConfig::update_config(obrpc::ObAdminSetConfigArg &arg, int64_t new_version)
990{
991int ret = OB_SUCCESS;
992if (!ctx_.is_inited()) {
993ret = OB_NOT_INIT;
994LOG_WARN("not init", KR(ret));
995} else if (!arg.is_valid()) {
996ret = OB_INVALID_ARGUMENT;
997LOG_WARN("invalid arg", K(arg), KR(ret));
998} else {
999FOREACH_X(item, arg.items_, OB_SUCCESS == ret) {
1000char svr_ip[OB_MAX_SERVER_ADDR_SIZE] = "ANY";
1001int64_t svr_port = 0;
1002if (item->server_.is_valid()) {
1003if (false == item->server_.ip_to_string(svr_ip, sizeof(svr_ip))) {
1004ret = OB_INVALID_ARGUMENT;
1005LOG_WARN("convert server addr to ip failed", KR(ret), "server", item->server_);
1006} else {
1007svr_port = item->server_.get_port();
1008ObAddr addr;
1009bool is_server_exist = false;
1010if (false == addr.set_ip_addr(svr_ip, static_cast<int32_t>(svr_port))){
1011ret = OB_ERR_UNEXPECTED;
1012LOG_WARN("set addr fail", KR(ret), "svr_ip", svr_ip, K(svr_port));
1013} else if (OB_FAIL(SVR_TRACER.is_server_exist(addr, is_server_exist))) {
1014LOG_WARN("check server exist fail", K(addr));
1015} else if (!is_server_exist) {
1016ret = OB_INVALID_ARGUMENT;
1017LOG_WARN("server is not exist", KR(ret), K(addr));
1018LOG_USER_ERROR(OB_INVALID_ARGUMENT, "server");
1019}
1020} // else
1021} // if
1022
1023if (OB_FAIL(ret)) {
1024} else if (!item->zone_.is_empty()) {
1025bool is_zone_exist = false;
1026if (OB_FAIL(ctx_.zone_mgr_->check_zone_exist(item->zone_, is_zone_exist))) {
1027ret = OB_ERR_UNEXPECTED;
1028LOG_WARN("check zone exist fail", KR(ret), "zone", item->zone_);
1029} else if(!is_zone_exist) {
1030ret = OB_INVALID_ARGUMENT;
1031LOG_WARN("zone is not exist", KR(ret), "zone", item->zone_);
1032LOG_USER_ERROR(OB_INVALID_ARGUMENT, "zone");
1033}
1034}
1035
1036if (OB_SUCC(ret))
1037{
1038new_version = std::max(new_version + 1, ObTimeUtility::current_time());
1039}
1040
1041if (OB_FAIL(ret)) {
1042} else if (item->want_to_set_tenant_config_) {
1043// tenant config
1044ObDMLSqlSplicer dml;
1045share::schema::ObSchemaGetterGuard schema_guard;
1046const share::schema::ObSimpleTenantSchema *tenant_schema = NULL;
1047if (OB_FAIL(GSCHEMASERVICE.get_tenant_schema_guard(
1048OB_SYS_TENANT_ID, schema_guard))) {
1049LOG_WARN("fail to get sys tenant schema guard", KR(ret));
1050} else {
1051for (uint64_t tenant_id : item->tenant_ids_) {
1052const char *table_name = (ObAdminSetConfig::OB_PARAMETER_SEED_ID == tenant_id ?
1053OB_ALL_SEED_PARAMETER_TNAME : OB_TENANT_PARAMETER_TNAME);
1054tenant_id = (ObAdminSetConfig::OB_PARAMETER_SEED_ID == tenant_id ? OB_SYS_TENANT_ID : tenant_id);
1055uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
1056dml.reset();
1057if (OB_FAIL(schema_guard.get_tenant_info(exec_tenant_id, tenant_schema))) {
1058LOG_WARN("failed to get tenant ids", KR(ret), K(exec_tenant_id));
1059} else if (OB_ISNULL(tenant_schema)) {
1060ret = OB_TENANT_NOT_EXIST;
1061LOG_WARN("tenant not exist", KR(ret), K(tenant_id));
1062} else if (!tenant_schema->is_normal()) {
1063//tenant not normal, maybe tenant not ready, cannot add tenant config
1064} else if (0 == ObString(table_name).case_compare(OB_TENANT_PARAMETER_TNAME) &&
1065OB_FAIL(dml.add_pk_column("tenant_id", tenant_id))) {
1066// __all_seed_parameter does not have column 'tenant_id'
1067LOG_WARN("add column failed", KR(ret));
1068} else if (OB_FAIL(dml.add_pk_column("zone", item->zone_.ptr()))
1069|| OB_FAIL(dml.add_pk_column("svr_type", print_server_role(OB_SERVER)))
1070|| OB_FAIL(dml.add_pk_column(K(svr_ip)))
1071|| OB_FAIL(dml.add_pk_column(K(svr_port)))
1072|| OB_FAIL(dml.add_pk_column("name", item->name_.ptr()))
1073|| OB_FAIL(dml.add_column("value", item->value_.ptr()))
1074|| OB_FAIL(dml.add_column("info", item->comment_.ptr()))
1075|| OB_FAIL(dml.add_column("config_version", new_version))) {
1076LOG_WARN("add column failed", KR(ret));
1077} else if (OB_FAIL(dml.get_values().append_fmt("usec_to_time(%ld)", new_version))) {
1078LOG_WARN("append valued failed", KR(ret));
1079} else if (OB_FAIL(dml.add_column(false, "gmt_modified"))) {
1080LOG_WARN("add column failed", KR(ret));
1081} else {
1082int64_t affected_rows = 0;
1083ObDMLExecHelper exec(*ctx_.sql_proxy_, exec_tenant_id);
1084ObConfigItem *ci = nullptr;
1085// tenant not exist in RS, use SYS instead
1086omt::ObTenantConfigGuard tenant_config(TENANT_CONF(OB_SYS_TENANT_ID));
1087if (!tenant_config.is_valid()) {
1088ret = OB_ERR_UNEXPECTED;
1089LOG_WARN("failed to get tenant config",K(tenant_id), KR(ret));
1090} else if (OB_ISNULL(tenant_config->get_container().get(
1091ObConfigStringKey(item->name_.ptr())))) {
1092ret = OB_ERR_SYS_CONFIG_UNKNOWN;
1093LOG_WARN("can't found config item", KR(ret), K(tenant_id), "item", *item);
1094} else {
1095ci = *(tenant_config->get_container().get(
1096ObConfigStringKey(item->name_.ptr())));
1097if (OB_FAIL(dml.add_column("section", ci->section()))
1098|| OB_FAIL(dml.add_column("scope", ci->scope()))
1099|| OB_FAIL(dml.add_column("source", ci->source()))
1100|| OB_FAIL(dml.add_column("edit_level", ci->edit_level()))
1101|| OB_FAIL(dml.add_column("data_type", ci->data_type()))) {
1102LOG_WARN("add column failed", KR(ret));
1103} else if (OB_FAIL(exec.exec_insert_update(table_name,
1104dml, affected_rows))) {
1105LOG_WARN("execute insert update failed", K(tenant_id), KR(ret), "item", *item);
1106} else if (is_zero_row(affected_rows) || affected_rows > 2) {
1107ret = OB_ERR_UNEXPECTED;
1108LOG_WARN("unexpected affected rows", K(tenant_id), K(affected_rows), KR(ret));
1109} else {
1110// set config_version to config_version_map and trigger parameter update
1111if (ObAdminSetConfig::OB_PARAMETER_SEED_ID == tenant_id) {
1112} else if (OB_FAIL(OTC_MGR.set_tenant_config_version(tenant_id, new_version))) {
1113LOG_WARN("failed to set tenant config version",
1114K(tenant_id), KR(ret), "item", *item);
1115} else if(GCTX.omt_->has_tenant(tenant_id) &&
1116OB_FAIL(OTC_MGR.got_version(tenant_id, new_version))) {
1117LOG_WARN("failed to got version", K(tenant_id), KR(ret), "item", *item);
1118} else {
1119LOG_INFO("got new tenant config version",
1120K(new_version), K(tenant_id), "item", *item);
1121}
1122}
1123}
1124}
1125if (OB_FAIL(ret)) {
1126break;
1127}
1128} // for
1129}
1130} else {
1131// sys config
1132ObDMLSqlSplicer dml;
1133dml.reset();
1134if (OB_SYS_TENANT_ID != item->exec_tenant_id_) {
1135uint64_t tenant_id = item->exec_tenant_id_;
1136ret = OB_ERR_UNEXPECTED;
1137LOG_WARN("unexpected tenant_id", K(tenant_id), KR(ret));
1138} else if (OB_FAIL(dml.add_pk_column("zone", item->zone_.ptr()))
1139|| OB_FAIL(dml.add_pk_column("svr_type", print_server_role(OB_SERVER)))
1140|| OB_FAIL(dml.add_pk_column(K(svr_ip)))
1141|| OB_FAIL(dml.add_pk_column(K(svr_port)))
1142|| OB_FAIL(dml.add_pk_column("name", item->name_.ptr()))
1143|| OB_FAIL(dml.add_column("value", item->value_.ptr()))
1144|| OB_FAIL(dml.add_column("info", item->comment_.ptr()))
1145|| OB_FAIL(dml.add_column("config_version", new_version))) {
1146LOG_WARN("add column failed", KR(ret));
1147} else if (OB_FAIL(dml.get_values().append_fmt("usec_to_time(%ld)", new_version))) {
1148LOG_WARN("append valued failed", KR(ret));
1149} else if (OB_FAIL(dml.add_column(false, "gmt_modified"))) {
1150LOG_WARN("add column failed", KR(ret));
1151} else {
1152int64_t affected_rows = 0;
1153ObDMLExecHelper exec(*ctx_.sql_proxy_, OB_SYS_TENANT_ID);
1154ObConfigItem *ci = nullptr;
1155ObConfigItem *const *ci_ptr = GCONF.get_container().get(
1156ObConfigStringKey(item->name_.ptr()));
1157if (OB_ISNULL(ci_ptr)) {
1158ret = OB_ERR_SYS_CONFIG_UNKNOWN;
1159LOG_WARN("can't found config item", KR(ret), "item", *item);
1160} else {
1161ci = *ci_ptr;
1162if (OB_FAIL(dml.add_column("section", ci->section()))
1163|| OB_FAIL(dml.add_column("scope", ci->scope()))
1164|| OB_FAIL(dml.add_column("source", ci->source()))
1165|| OB_FAIL(dml.add_column("edit_level", ci->edit_level()))
1166|| OB_FAIL(dml.add_column("data_type", ci->data_type()))) {
1167LOG_WARN("add column failed", KR(ret));
1168} else if (OB_FAIL(exec.exec_insert_update(OB_ALL_SYS_PARAMETER_TNAME,
1169dml, affected_rows))) {
1170LOG_WARN("execute insert update failed", KR(ret), "item", *item);
1171} else if (is_zero_row(affected_rows) || affected_rows > 2) {
1172ret = OB_ERR_UNEXPECTED;
1173LOG_WARN("unexpected affected rows", K(affected_rows), KR(ret));
1174} else {
1175// set config_version to __all_zone and trigger parameter update
1176if (OB_FAIL(ctx_.zone_mgr_->update_config_version(new_version))) {
1177LOG_WARN("set new config version failed", KR(ret), K(new_version));
1178} else if (OB_FAIL(ctx_.config_mgr_->got_version(new_version))) {
1179LOG_WARN("config mgr got version failed", KR(ret), K(new_version));
1180} else {
1181LOG_INFO("got new sys config version", K(new_version), "item", *item);
1182}
1183} // else trigger update
1184} // else
1185} // else
1186} // else sys config
1187} // FOREACH_X
1188}
1189
1190return ret;
1191}
1192
1193int ObAdminSetConfig::execute(obrpc::ObAdminSetConfigArg &arg)
1194{
1195LOG_INFO("execute set config request", K(arg));
1196int ret = OB_SUCCESS;
1197int64_t config_version = 0;
1198if (!ctx_.is_inited()) {
1199ret = OB_NOT_INIT;
1200LOG_WARN("not init", KR(ret));
1201} else if (!arg.is_valid()) {
1202ret = OB_INVALID_ARGUMENT;
1203LOG_WARN("invalid arg", K(arg), KR(ret));
1204} else if (OB_FAIL(verify_config(arg))) {
1205LOG_WARN("verify config failed", KR(ret), K(arg));
1206} else if (OB_FAIL(ctx_.zone_mgr_->get_config_version(config_version))) {
1207LOG_WARN("get_config_version failed", KR(ret));
1208} else {
1209if (OB_FAIL(ctx_.root_service_->set_config_pre_hook(arg))) {
1210LOG_WARN("fail to process pre hook", K(arg), KR(ret));
1211} else if (OB_FAIL(update_config(arg, config_version))) {
1212LOG_WARN("update config failed", KR(ret), K(arg));
1213} else if (OB_ISNULL(ctx_.root_service_)) {
1214ret = OB_ERR_UNEXPECTED;
1215LOG_WARN("error inner stat", KR(ret), K(ctx_.root_service_));
1216} else if (OB_FAIL(ctx_.root_service_->set_config_post_hook(arg))) {
1217LOG_WARN("fail to set config callback", KR(ret));
1218} else {
1219LOG_INFO("set config succ", K(arg));
1220}
1221}
1222return ret;
1223}
1224
1225int ObAdminMigrateUnit::execute(const ObAdminMigrateUnitArg &arg)
1226{
1227int ret = OB_SUCCESS;
1228LOG_INFO("execute migrate unit request", K(arg));
1229if (!ctx_.is_inited()) {
1230ret = OB_NOT_INIT;
1231LOG_WARN("not init", KR(ret));
1232} else if (!arg.is_valid()) {
1233ret = OB_INVALID_ARGUMENT;
1234LOG_WARN("invalid arg", K(arg), KR(ret));
1235} else {
1236const uint64_t unit_id = arg.unit_id_;
1237const ObAddr &dst = arg.destination_;
1238if (OB_FAIL(ctx_.unit_mgr_->admin_migrate_unit(unit_id, dst, arg.is_cancel_))) {
1239LOG_WARN("migrate unit failed", K(unit_id), K(dst), KR(ret));
1240} else {
1241ctx_.root_balancer_->wakeup();
1242}
1243}
1244return ret;
1245}
1246
1247int ObAdminUpgradeVirtualSchema::execute()
1248{
1249int ret = OB_SUCCESS;
1250LOG_INFO("execute upgrade virtual schema request");
1251int64_t upgrade_cnt = 0;
1252ObSchemaGetterGuard schema_guard;
1253ObArray<uint64_t> tenant_ids;
1254if (OB_UNLIKELY(!ctx_.is_inited())) {
1255ret = OB_NOT_INIT;
1256LOG_WARN("not init", KR(ret));
1257} else if (GCTX.is_standby_cluster()) {
1258// standby cluster cannot upgrade virtual schema independently,
1259// need to get these information from the primary cluster
1260ret = OB_OP_NOT_ALLOW;
1261LOG_WARN("upgrade virtual schema in standby cluster not allow", KR(ret));
1262LOG_USER_ERROR(OB_OP_NOT_ALLOW, "upgrade virtual schema in standby cluster");
1263} else if (OB_ISNULL(ctx_.root_inspection_)
1264|| OB_ISNULL(ctx_.ddl_service_)) {
1265ret = OB_ERR_UNEXPECTED;
1266LOG_WARN("ptr is null", KR(ret), KP(ctx_.root_inspection_), KP(ctx_.ddl_service_));
1267} else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
1268OB_SYS_TENANT_ID, schema_guard))) {
1269LOG_WARN("get_schema_guard failed", KR(ret));
1270} else if (OB_FAIL(schema_guard.get_tenant_ids(tenant_ids))) {
1271LOG_WARN("fail to get tenant ids", KR(ret));
1272} else {
1273FOREACH(tenant_id, tenant_ids) { // ignore ret
1274int tmp_ret = OB_SUCCESS;
1275if (OB_SUCCESS != (tmp_ret = execute(*tenant_id, upgrade_cnt))) {
1276LOG_WARN("fail to execute upgrade virtual table by tenant", KR(tmp_ret), K(*tenant_id));
1277}
1278ret = OB_SUCC(ret) ? tmp_ret : ret;
1279}
1280}
1281if (OB_SUCC(ret) && upgrade_cnt > 0) {
1282// if schema upgraded, inspect schema again
1283int tmp_ret = ctx_.root_inspection_->check_all();
1284if (OB_SUCCESS != tmp_ret) {
1285LOG_WARN("root inspection failed", KR(tmp_ret));
1286}
1287}
1288return ret;
1289}
1290
1291int ObAdminUpgradeVirtualSchema::execute(
1292const uint64_t tenant_id,
1293int64_t &upgrade_cnt)
1294{
1295int ret = OB_SUCCESS;
1296if (OB_UNLIKELY(!ctx_.is_inited())) {
1297ret = OB_NOT_INIT;
1298LOG_WARN("not init", KR(ret));
1299} else if (OB_UNLIKELY(
1300is_virtual_tenant_id(tenant_id)
1301|| OB_INVALID_TENANT_ID == tenant_id)) {
1302ret = OB_INVALID_ARGUMENT;
1303LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
1304} else if (OB_ISNULL(ctx_.root_inspection_)
1305|| OB_ISNULL(ctx_.ddl_service_)) {
1306ret = OB_ERR_UNEXPECTED;
1307LOG_WARN("ptr is null", KR(ret), KP(ctx_.root_inspection_), KP(ctx_.ddl_service_));
1308}
1309
1310const schema_create_func *creator_ptr_array[] = {
1311share::virtual_table_schema_creators,
1312share::sys_view_schema_creators, NULL };
1313ObArray<ObTableSchema> hard_code_tables;
1314ObTableSchema table_schema;
1315
1316for (const schema_create_func **creator_ptr_ptr = creator_ptr_array;
1317OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr_ptr); ++creator_ptr_ptr) {
1318for (const schema_create_func *creator_ptr = *creator_ptr_ptr;
1319OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr); ++creator_ptr) {
1320table_schema.reset();
1321bool exist = false;
1322if (OB_FAIL((*creator_ptr)(table_schema))) {
1323LOG_WARN("create table schema failed", KR(ret));
1324} else if (!is_sys_tenant(tenant_id)
1325&& OB_FAIL(ObSchemaUtils::construct_tenant_space_full_table(
1326tenant_id, table_schema))) {
1327LOG_WARN("fail to construct tenant space table", KR(ret), K(tenant_id));
1328} else if (OB_FAIL(ObSysTableChecker::is_inner_table_exist(
1329tenant_id, table_schema, exist))) {
1330LOG_WARN("fail to check inner table exist",
1331KR(ret), K(tenant_id), K(table_schema));
1332} else if (!exist) {
1333// skip
1334} else if (is_sys_table(table_schema.get_table_id())) {
1335// only check and upgrade virtual table && sys views
1336} else if (OB_FAIL(hard_code_tables.push_back(table_schema))) {
1337LOG_WARN("push_back failed", KR(ret), K(tenant_id));
1338}
1339}
1340}
1341
1342// remove tables not exist on hard code tables
1343ObSchemaGetterGuard schema_guard;
1344ObArray<uint64_t> tids;
1345if (FAILEDx(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(tenant_id, schema_guard))) {
1346LOG_WARN("get_schema_guard failed", KR(ret), K(tenant_id));
1347} else if (OB_FAIL(schema_guard.get_table_ids_in_tenant(tenant_id, tids))) {
1348LOG_WARN("get_table_ids_in_tenant failed", KR(ret), K(tenant_id));
1349} else {
1350FOREACH_CNT_X(tid, tids, OB_SUCC(ret)) {
1351const ObTableSchema *in_mem_table = NULL;
1352if (!is_inner_table(*tid) || is_sys_table(*tid)) {
1353continue;
1354} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, *tid, in_mem_table))) {
1355LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(*tid));
1356} else if (OB_ISNULL(in_mem_table)) {
1357ret = OB_TABLE_NOT_EXIST;
1358LOG_WARN("table not exist", KR(ret), K(tenant_id), K(*tid));
1359} else {
1360bool exist = false;
1361FOREACH_CNT_X(hard_code_table, hard_code_tables, OB_SUCC(ret) && !exist) {
1362if (OB_ISNULL(hard_code_table)) {
1363ret = OB_ERR_UNEXPECTED;
1364LOG_WARN("hard code table is null", KR(ret), K(tenant_id));
1365} else if (in_mem_table->get_table_id() == hard_code_table->get_table_id()) {
1366exist = true;
1367}
1368}
1369if (!exist) {
1370if (FAILEDx(ctx_.ddl_service_->drop_inner_table(*in_mem_table))) {
1371LOG_WARN("drop table schema failed", KR(ret), K(tenant_id), KPC(in_mem_table));
1372} else if (OB_FAIL(ctx_.ddl_service_->refresh_schema(tenant_id))) {
1373LOG_WARN("refresh_schema failed", KR(ret), K(tenant_id));
1374}
1375}
1376}
1377}
1378}
1379
1380// upgrade tables
1381FOREACH_CNT_X(hard_code_table, hard_code_tables, OB_SUCC(ret)) {
1382if (OB_ISNULL(hard_code_table)) {
1383ret = OB_ERR_UNEXPECTED;
1384LOG_WARN("hard code table is null", KR(ret), K(tenant_id));
1385} else if (OB_FAIL(ctx_.root_inspection_->check_table_schema(tenant_id, *hard_code_table))) {
1386if (OB_SCHEMA_ERROR != ret) {
1387LOG_WARN("check table schema failed", KR(ret), K(tenant_id), K(*hard_code_table));
1388} else {
1389LOG_INFO("table schema need upgrade", K(tenant_id), K(*hard_code_table));
1390if (OB_FAIL(upgrade_(tenant_id, *hard_code_table))) {
1391LOG_WARN("upgrade failed", KR(ret), K(tenant_id), K(*hard_code_table));
1392} else {
1393LOG_INFO("update table schema success", K(tenant_id), K(*hard_code_table));
1394upgrade_cnt++;
1395}
1396}
1397}
1398}
1399return ret;
1400}
1401
1402int ObAdminUpgradeVirtualSchema::upgrade_(
1403const uint64_t tenant_id,
1404share::schema::ObTableSchema &table)
1405{
1406int ret = OB_SUCCESS;
1407const ObTableSchema *exist_schema = NULL;
1408ObSchemaGetterGuard schema_guard;
1409if (OB_UNLIKELY(!ctx_.is_inited())) {
1410ret = OB_NOT_INIT;
1411LOG_WARN("not init", KR(ret));
1412} else if (OB_UNLIKELY(
1413is_virtual_tenant_id(tenant_id)
1414|| OB_INVALID_TENANT_ID == tenant_id)) {
1415ret = OB_INVALID_ARGUMENT;
1416LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
1417} else if (OB_UNLIKELY(
1418!table.is_valid()
1419|| is_sys_table(table.get_table_id())
1420|| table.get_tenant_id() != tenant_id)) {
1421ret = OB_INVALID_ARGUMENT;
1422LOG_WARN("invalid table", KR(ret), K(tenant_id), K(table));
1423} else if (OB_ISNULL(ctx_.ddl_service_)) {
1424ret = OB_ERR_UNEXPECTED;
1425LOG_WARN("ddl service is null", KR(ret));
1426}
1427// 1. check table name duplicated
1428if (FAILEDx(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
1429tenant_id, schema_guard))) {
1430LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id));
1431} else {
1432ObArenaAllocator allocator;
1433ObString index_name;
1434if (table.is_index_table()) {
1435// In the early version, table name of oracle virtual table index is not right
1436// (data_table_id is mysql virtual table id), which may cause we can't find duplicate index
1437// with table name and duplicate name conflict occur.
1438//
1439// ETC:
1440// OB_ALL_VIRTUAL_PLAN_CACHE_STAT_ORA_TID = 15034;
1441// OB_ALL_VIRTUAL_PLAN_CACHE_STAT_ORA_ALL_VIRTUAL_PLAN_CACHE_STAT_I1_TID = 19998;
1442// OB_ALL_VIRTUAL_PLAN_CACHE_STAT_ALL_VIRTUAL_PLAN_CACHE_STAT_I1_TNAME = "__idx_1099511638779_all_virtual_plan_cache_stat_i1";
1443// OB_ALL_VIRTUAL_PLAN_CACHE_STAT_ORA_ALL_VIRTUAL_PLAN_CACHE_STAT_I1_TNAME = "__idx_1099511642810_all_virtual_plan_cache_stat_i1";
1444//
1445// For oracle virtual table index which data_table_id is (1 << 40) | 15034 = 1099511642810,
1446// but it use OB_ALL_VIRTUAL_PLAN_CACHE_STAT_ALL_VIRTUAL_PLAN_CACHE_STAT_I1_TNAME as table name.
1447if (OB_FAIL(table.generate_origin_index_name())) {
1448LOG_WARN("fail to generate origin index name", KR(ret), K(table));
1449} else if (OB_FAIL(ObTableSchema::build_index_table_name(
1450allocator, table.get_data_table_id(),
1451table.get_origin_index_name_str(), index_name))) {
1452LOG_WARN("fail to build index table name", KR(ret), K(table));
1453}
1454}
1455if (FAILEDx(schema_guard.get_table_schema(
1456tenant_id,
1457table.get_database_id(),
1458table.is_index_table() ? index_name : table.get_table_name(),
1459table.is_index_table(),
1460exist_schema))) {
1461LOG_WARN("get table schema failed", KR(ret), K(tenant_id), "table", table.get_table_name());
1462if (OB_TABLE_NOT_EXIST == ret) {
1463ret = OB_SUCCESS;
1464}
1465} else if (OB_ISNULL(exist_schema)) {
1466// no duplicate table name
1467} else if (OB_FAIL(ctx_.ddl_service_->drop_inner_table(*exist_schema))) {
1468LOG_WARN("get table schema failed", KR(ret), K(tenant_id),
1469"table", table.get_table_name(), "table_id", table.get_table_id());
1470} else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
1471tenant_id, schema_guard))) {
1472LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id));
1473}
1474}
1475// 2. try drop table first
1476exist_schema = NULL;
1477if (FAILEDx(schema_guard.get_table_schema(tenant_id,
1478table.get_table_id(),
1479exist_schema))) {
1480LOG_WARN("get table schema failed", KR(ret), "table", table.get_table_name(),
1481"table_id", table.get_table_id());
1482if (OB_TABLE_NOT_EXIST == ret) {
1483ret = OB_SUCCESS;
1484}
1485} else if (OB_ISNULL(exist_schema)) {
1486// missed table
1487} else if (OB_FAIL(ctx_.ddl_service_->drop_inner_table(*exist_schema))) {
1488LOG_WARN("drop table schema failed", KR(ret), "table_schema", *exist_schema);
1489} else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
1490tenant_id, schema_guard))) {
1491LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id));
1492}
1493// 3. create table
1494if (FAILEDx(ctx_.ddl_service_->add_table_schema(table, schema_guard))) {
1495LOG_WARN("add table schema failed", KR(ret), K(tenant_id), K(table));
1496} else if (OB_FAIL(ctx_.ddl_service_->refresh_schema(tenant_id))) {
1497LOG_WARN("refresh schema failed", KR(ret), K(tenant_id));
1498}
1499
1500return ret;
1501}
1502
1503int ObAdminUpgradeCmd::execute(const Bool &upgrade)
1504{
1505int ret = OB_SUCCESS;
1506// set enable_upgrade_mode
1507HEAP_VAR(ObAdminSetConfigItem, item) {
1508obrpc::ObAdminSetConfigArg set_config_arg;
1509set_config_arg.is_inner_ = true;
1510const char *enable_upgrade_name = "enable_upgrade_mode";
1511ObAdminSetConfig admin_set_config(ctx_);
1512char min_server_version[OB_SERVER_VERSION_LENGTH] = {'\0'};
1513uint64_t cluster_version = GET_MIN_CLUSTER_VERSION();
1514
1515if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
1516min_server_version, OB_SERVER_VERSION_LENGTH, cluster_version)) {
1517ret = OB_INVALID_ARGUMENT;
1518LOG_WARN("fail to print version str", KR(ret), K(cluster_version));
1519} else if (OB_FAIL(item.name_.assign(enable_upgrade_name))) {
1520LOG_WARN("assign enable_upgrade_mode config name failed", KR(ret));
1521} else if (OB_FAIL(item.value_.assign((upgrade ? "true" : "false")))) {
1522LOG_WARN("assign enable_upgrade_mode config value failed", KR(ret));
1523} else if (OB_FAIL(set_config_arg.items_.push_back(item))) {
1524LOG_WARN("add enable_upgrade_mode config item failed", KR(ret));
1525} else {
1526const char *upgrade_stage_name = "_upgrade_stage";
1527obrpc::ObUpgradeStage stage = upgrade ?
1528obrpc::OB_UPGRADE_STAGE_PREUPGRADE :
1529obrpc::OB_UPGRADE_STAGE_NONE;
1530if (OB_FAIL(item.name_.assign(upgrade_stage_name))) {
1531LOG_WARN("assign _upgrade_stage config name failed", KR(ret), K(upgrade));
1532} else if (OB_FAIL(item.value_.assign(obrpc::get_upgrade_stage_str(stage)))) {
1533LOG_WARN("assign _upgrade_stage config value failed", KR(ret), K(stage), K(upgrade));
1534} else if (OB_FAIL(set_config_arg.items_.push_back(item))) {
1535LOG_WARN("add _upgrade_stage config item failed", KR(ret), K(stage), K(upgrade));
1536}
1537}
1538share::ObServerInfoInTable::ObBuildVersion build_version;
1539if (FAILEDx(admin_set_config.execute(set_config_arg))) {
1540LOG_WARN("execute set config failed", KR(ret));
1541} else if (OB_FAIL(observer::ObService::get_build_version(build_version))) {
1542LOG_WARN("fail to get build version", KR(ret));
1543} else {
1544CLUSTER_EVENT_SYNC_ADD("UPGRADE",
1545upgrade ? "BEGIN_UPGRADE" : "END_UPGRADE",
1546"cluster_version", min_server_version,
1547"build_version", build_version.ptr());
1548LOG_INFO("change upgrade parameters",
1549"enable_upgrade_mode", upgrade,
1550"in_major_version_upgrade_mode", GCONF.in_major_version_upgrade_mode());
1551
1552}
1553}
1554return ret;
1555}
1556
1557int ObAdminRollingUpgradeCmd::execute(const obrpc::ObAdminRollingUpgradeArg &arg)
1558{
1559int ret = OB_SUCCESS;
1560HEAP_VAR(ObAdminSetConfigItem, item) {
1561obrpc::ObAdminSetConfigArg set_config_arg;
1562set_config_arg.is_inner_ = true;
1563const char *upgrade_stage_name = "_upgrade_stage";
1564ObAdminSetConfig admin_set_config(ctx_);
1565char ori_min_server_version[OB_SERVER_VERSION_LENGTH] = {'\0'};
1566char min_server_version[OB_SERVER_VERSION_LENGTH] = {'\0'};
1567uint64_t ori_cluster_version = GET_MIN_CLUSTER_VERSION();
1568
1569if (!arg.is_valid()) {
1570ret = OB_INVALID_ARGUMENT;
1571LOG_WARN("invalid arg", KR(ret), K(arg));
1572} else if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
1573ori_min_server_version, OB_SERVER_VERSION_LENGTH, ori_cluster_version)) {
1574ret = OB_INVALID_ARGUMENT;
1575LOG_WARN("fail to print version str", KR(ret), K(ori_cluster_version));
1576} else if (OB_FAIL(item.name_.assign(upgrade_stage_name))) {
1577LOG_WARN("assign _upgrade_stage config name failed", KR(ret), K(arg));
1578} else if (OB_FAIL(item.value_.assign(obrpc::get_upgrade_stage_str(arg.stage_)))) {
1579LOG_WARN("assign _upgrade_stage config value failed", KR(ret), K(arg));
1580} else if (OB_FAIL(set_config_arg.items_.push_back(item))) {
1581LOG_WARN("add _upgrade_stage config item failed", KR(ret), K(arg));
1582} else if (obrpc::OB_UPGRADE_STAGE_POSTUPGRADE == arg.stage_) {
1583// wait min_observer_version to report to inner table
1584ObTimeoutCtx ctx;
1585if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
1586LOG_WARN("fail to set default timeout", KR(ret));
1587} else {
1588const int64_t CHECK_INTERVAL = 100 * 1000L; // 100ms
1589while (OB_SUCC(ret)) {
1590uint64_t min_observer_version = 0;
1591if (ctx.is_timeouted()) {
1592ret = OB_TIMEOUT;
1593LOG_WARN("wait min_server_version report to inner table failed",
1594KR(ret), "abs_timeout", ctx.get_abs_timeout());
1595} else if (OB_FAIL(SVR_TRACER.get_min_server_version(
1596min_server_version, min_observer_version))) {
1597LOG_WARN("failed to get the min server version", KR(ret));
1598} else if (min_observer_version > CLUSTER_CURRENT_VERSION) {
1599ret = OB_ERR_UNEXPECTED;
1600LOG_WARN("min_observer_version is larger than CLUSTER_CURRENT_VERSION",
1601KR(ret), "min_server_version", min_server_version,
1602K(min_observer_version), "CLUSTER_CURRENT_VERSION", CLUSTER_CURRENT_VERSION);
1603} else if (min_observer_version < CLUSTER_CURRENT_VERSION) {
1604if (REACH_TIME_INTERVAL(1 * 1000 * 1000L)) { // 1s
1605LOG_INFO("min_observer_version is not reported yet, just wait",
1606KR(ret), "min_server_version", min_server_version,
1607K(min_observer_version), "CLUSTER_CURRENT_VERSION", CLUSTER_CURRENT_VERSION);
1608}
1609ob_usleep(CHECK_INTERVAL);
1610} else {
1611break;
1612}
1613} // end while
1614}
1615// end rolling upgrade, should raise min_observer_version
1616const char *min_obs_version_name = "min_observer_version";
1617if (FAILEDx(item.name_.assign(min_obs_version_name))) {
1618LOG_WARN("assign min_observer_version config name failed",
1619KR(ret), K(min_obs_version_name));
1620} else if (OB_FAIL(item.value_.assign(min_server_version))) {
1621LOG_WARN("assign min_observer_version config value failed",
1622KR(ret), K(min_server_version));
1623} else if (OB_FAIL(set_config_arg.items_.push_back(item))) {
1624LOG_WARN("add min_observer_version config item failed", KR(ret), K(item));
1625}
1626}
1627if (FAILEDx(admin_set_config.execute(set_config_arg))) {
1628LOG_WARN("execute set config failed", KR(ret));
1629} else {
1630share::ObServerInfoInTable::ObBuildVersion build_version;
1631if (OB_FAIL(observer::ObService::get_build_version(build_version))) {
1632LOG_WARN("fail to get build version", KR(ret));
1633} else if (obrpc::OB_UPGRADE_STAGE_POSTUPGRADE != arg.stage_) {
1634CLUSTER_EVENT_SYNC_ADD("UPGRADE", "BEGIN_ROLLING_UPGRADE",
1635"cluster_version", ori_min_server_version,
1636"build_version", build_version.ptr());
1637} else {
1638CLUSTER_EVENT_SYNC_ADD("UPGRADE", "END_ROLLING_UPGRADE",
1639"cluster_version", min_server_version,
1640"ori_cluster_version", ori_min_server_version,
1641"build_version", build_version.ptr());
1642}
1643LOG_INFO("change upgrade parameters", KR(ret), "_upgrade_stage", arg.stage_);
1644}
1645}
1646return ret;
1647}
1648
1649DEFINE_ENUM_FUNC(ObInnerJob, inner_job, OB_INNER_JOB_DEF);
1650
1651int ObAdminRunJob::execute(const ObRunJobArg &arg)
1652{
1653int ret = OB_SUCCESS;
1654ObInnerJob job = INVALID_INNER_JOB;
1655if (!ctx_.is_inited()) {
1656ret = OB_NOT_INIT;
1657LOG_WARN("not init", KR(ret));
1658} else if (!arg.is_valid()) {
1659ret = OB_INVALID_ARGUMENT;
1660LOG_WARN("invalid arg", K(arg), KR(ret));
1661} else if (INVALID_INNER_JOB == (job = get_inner_job_value(arg.job_))) {
1662ret = OB_INVALID_ARGUMENT;
1663LOG_WARN("invalid inner job", K(arg), KR(ret));
1664} else {
1665switch(job) {
1666case CHECK_PARTITION_TABLE: {
1667ObAdminCheckPartitionTable job_executor(ctx_);
1668if (OB_FAIL(job_executor.execute(arg))) {
1669LOG_WARN("execute job failed", K(arg), KR(ret));
1670}
1671break;
1672}
1673case ROOT_INSPECTION: {
1674ObAdminRootInspection job_executor(ctx_);
1675if (OB_FAIL(job_executor.execute(arg))) {
1676LOG_WARN("execute job failed", K(arg), KR(ret));
1677}
1678break;
1679}
1680case UPGRADE_STORAGE_FORMAT_VERSION:
1681case STOP_UPGRADE_STORAGE_FORMAT_VERSION: {
1682ObAdminUpgradeStorageFormatVersionExecutor job_executor(ctx_);
1683if (OB_FAIL(job_executor.execute(arg))) {
1684LOG_WARN("fail to execute upgrade storage format version job", KR(ret));
1685}
1686break;
1687}
1688case CREATE_INNER_SCHEMA: {
1689ObAdminCreateInnerSchema job_executor(ctx_);
1690if (OB_FAIL(job_executor.execute(arg))) {
1691LOG_WARN("execute job failed", KR(ret));
1692}
1693break;
1694}
1695case IO_CALIBRATION: {
1696ObAdminIOCalibration job_executor(ctx_);
1697if (OB_FAIL(job_executor.execute(arg))) {
1698LOG_WARN("execute job failed", KR(ret));
1699}
1700break;
1701}
1702default: {
1703ret = OB_ERR_UNEXPECTED;
1704LOG_WARN("not known job", K(job), KR(ret));
1705break;
1706}
1707}
1708}
1709return ret;
1710}
1711
1712int ObAdminCheckPartitionTable::execute(const obrpc::ObRunJobArg &arg)
1713{
1714UNUSEDx(arg);
1715return OB_NOT_SUPPORTED;
1716}
1717
1718int ObAdminCheckPartitionTable::call_server(const ObAddr &server)
1719{
1720int ret = OB_SUCCESS;
1721if (!ctx_.is_inited()) {
1722ret = OB_NOT_INIT;
1723LOG_WARN("not init", KR(ret));
1724} else if (!server.is_valid()) {
1725ret = OB_INVALID_ARGUMENT;
1726LOG_WARN("invalid server", K(server), KR(ret));
1727} else if (OB_FAIL(ctx_.rpc_proxy_->to(server).check_partition_table())) {
1728LOG_WARN("request check partition table failed", KR(ret), K(server));
1729}
1730return ret;
1731}
1732
1733int ObAdminCreateInnerSchema::execute(const obrpc::ObRunJobArg &arg)
1734{
1735int ret = OB_SUCCESS;
1736LOG_INFO("execute create inner role request", KR(ret));
1737if (!ctx_.is_inited()) {
1738ret = OB_NOT_INIT;
1739LOG_WARN("not init", KR(ret));
1740} else if (!arg.is_valid()) {
1741ret = OB_INVALID_ARGUMENT;
1742LOG_WARN("invalid argument", KR(ret), K(arg));
1743} else if (CREATE_INNER_SCHEMA != get_inner_job_value(arg.job_)) {
1744ret = OB_ERR_UNEXPECTED;
1745LOG_WARN("job to run not create inner role", KR(ret), K(arg));
1746} else if (OB_UNLIKELY(nullptr == ctx_.root_service_)) {
1747ret = OB_ERR_UNEXPECTED;
1748LOG_WARN("root service ptr is null", KR(ret));
1749} else if (OB_FAIL(ctx_.root_service_->submit_create_inner_schema_task())) {
1750LOG_WARN("fail to submit create inner role task", KR(ret));
1751}
1752return ret;
1753}
1754
1755int ObAdminIOCalibration::execute(const obrpc::ObRunJobArg &arg)
1756{
1757int ret = OB_SUCCESS;
1758LOG_INFO("execute io calibration quest", KR(ret));
1759if (!ctx_.is_inited()) {
1760ret = OB_NOT_INIT;
1761LOG_WARN("not init", KR(ret));
1762} else if (!arg.is_valid()) {
1763ret = OB_INVALID_ARGUMENT;
1764LOG_WARN("invalid argument", KR(ret), K(arg));
1765} else if (IO_CALIBRATION != get_inner_job_value(arg.job_)) {
1766ret = OB_ERR_UNEXPECTED;
1767LOG_WARN("unexpected job type", KR(ret), K(arg));
1768} else if (OB_FAIL(call_all(arg))) {
1769LOG_WARN("call all server failed", K(ret), K(arg));
1770}
1771return ret;
1772}
1773
1774int ObAdminIOCalibration::call_server(const common::ObAddr &server)
1775{
1776int ret = OB_SUCCESS;
1777if (OB_UNLIKELY(!ctx_.is_inited())) {
1778ret = OB_NOT_INIT;
1779LOG_WARN("not init", K(ret));
1780} else if (OB_UNLIKELY(!server.is_valid())) {
1781ret = OB_INVALID_ARGUMENT;
1782LOG_WARN("invalid server", K(server), K(ret));
1783} else if (OB_FAIL(ctx_.rpc_proxy_->to(server).execute_io_benchmark())) {
1784LOG_WARN("request io calibration failed", KR(ret), K(server));
1785}
1786return ret;
1787}
1788
1789int ObAdminRefreshIOCalibration::execute(const obrpc::ObAdminRefreshIOCalibrationArg &arg)
1790{
1791int ret = OB_SUCCESS;
1792ObArray<ObAddr> server_list;
1793if (OB_UNLIKELY(!ctx_.is_inited())) {
1794ret = OB_NOT_INIT;
1795LOG_WARN("not init", K(ret));
1796} else if (OB_UNLIKELY(!arg.is_valid())) {
1797ret = OB_INVALID_ARGUMENT;
1798LOG_WARN("invalid argument", K(ret), K(arg));
1799} else if (OB_FAIL(get_server_list(arg, server_list))) {
1800LOG_WARN("get server list failed", K(ret), K(arg));
1801} else if (arg.only_refresh_) {
1802// do nothing
1803} else {
1804ObIOAbility io_ability;
1805for (int64_t i = 0; OB_SUCC(ret) && i < arg.calibration_list_.count(); ++i) {
1806const ObIOBenchResult &item = arg.calibration_list_.at(i);
1807if (OB_FAIL(io_ability.add_measure_item(item))) {
1808LOG_WARN("add item failed", K(ret), K(item));
1809}
1810}
1811if (OB_SUCC(ret)) {
1812if (arg.calibration_list_.count() > 0 && !io_ability.is_valid()) {
1813ret = OB_INVALID_ARGUMENT;
1814LOG_WARN("invalid calibration list", K(ret), K(arg), K(io_ability));
1815}
1816}
1817if (OB_SUCC(ret)) {
1818ObMySQLTransaction trans;
1819if (OB_FAIL(trans.start(ctx_.sql_proxy_, OB_SYS_TENANT_ID))) {
1820LOG_WARN("start transaction failed", K(ret));
1821} else {
1822for (int64_t i = 0; OB_SUCC(ret) && i < server_list.count(); ++i) {
1823if (OB_FAIL(ObIOCalibration::get_instance().write_into_table(trans, server_list.at(i), io_ability))) {
1824LOG_WARN("write io ability failed", K(ret), K(io_ability), K(server_list.at(i)));
1825}
1826}
1827bool is_commit = OB_SUCCESS == ret;
1828int tmp_ret = trans.end(is_commit);
1829if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
1830LOG_WARN("end transaction failed", K(tmp_ret), K(is_commit));
1831ret = OB_SUCC(ret) ? tmp_ret : ret;
1832}
1833}
1834}
1835}
1836if (OB_SUCC(ret)) {
1837ObRefreshIOCalibrationArg refresh_arg;
1838refresh_arg.storage_name_ = arg.storage_name_;
1839refresh_arg.only_refresh_ = arg.only_refresh_;
1840if (OB_FAIL(refresh_arg.calibration_list_.assign(arg.calibration_list_))) {
1841LOG_WARN("assign calibration list failed", K(ret), K(arg.calibration_list_));
1842} else {
1843int64_t succ_count = 0;
1844FOREACH_CNT(server, server_list) {
1845int tmp_ret = ctx_.rpc_proxy_->to(*server).refresh_io_calibration(refresh_arg);
1846if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
1847LOG_WARN("request io calibration failed", KR(tmp_ret), K(*server), K(refresh_arg));
1848} else {
1849++succ_count;
1850}
1851}
1852if (server_list.count() != succ_count) {
1853ret = OB_PARTIAL_FAILED;
1854LOG_USER_ERROR(OB_PARTIAL_FAILED);
1855}
1856}
1857}
1858LOG_INFO("admin refresh io calibration", K(ret), K(arg), K(server_list));
1859return ret;
1860}
1861
1862int ObAdminRefreshIOCalibration::call_server(const common::ObAddr &server)
1863{
1864// should never go here
1865UNUSED(server);
1866return OB_NOT_SUPPORTED;
1867}
1868
1869int ObAdminRootInspection::execute(const obrpc::ObRunJobArg &arg)
1870{
1871int ret = OB_SUCCESS;
1872LOG_INFO("execute root inspection request", K(arg));
1873ObAddr rs_addr;
1874if (!ctx_.is_inited()) {
1875ret = OB_NOT_INIT;
1876LOG_WARN("not init", KR(ret));
1877} else if (!arg.is_valid()) {
1878ret = OB_INVALID_ARGUMENT;
1879LOG_WARN("invalid arg", K(arg), KR(ret));
1880} else if (ROOT_INSPECTION != get_inner_job_value(arg.job_)) {
1881ret = OB_ERR_UNEXPECTED;
1882LOG_WARN("job to run not root inspection", K(arg), KR(ret));
1883} else if (OB_ISNULL(GCTX.rs_mgr_)) {
1884ret = OB_ERR_UNEXPECTED;
1885LOG_WARN("GCTX.rs_mgr_ is null", KR(ret), KP(GCTX.rs_mgr_));
1886} else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) {
1887LOG_WARN("fail to get master root server", KR(ret));
1888} else if (OB_UNLIKELY(!rs_addr.is_valid())) {
1889ret = OB_INVALID_ARGUMENT;
1890LOG_WARN("rs_addr is invalid", KR(ret), K(rs_addr));
1891} else if (!ctx_.root_inspection_->is_inited()) {
1892ret = OB_INNER_STAT_ERROR;
1893LOG_WARN("root_inspection not inited", KR(ret));
1894} else if (!arg.zone_.is_empty()) {
1895ret = OB_INVALID_ARGUMENT;
1896LOG_WARN("root inspection can't execute by zone", K(arg), KR(ret));
1897} else if (arg.server_.is_valid() && arg.server_ != rs_addr) {
1898ret = OB_INVALID_ARGUMENT;
1899LOG_WARN("only rs can execute root inspection", K(arg),
1900"rs", rs_addr, KR(ret));
1901} else if (OB_FAIL(ctx_.root_inspection_->check_all())) {
1902LOG_WARN("root_inspection check_all failed", KR(ret));
1903}
1904
1905return ret;
1906}
1907
1908int ObAdminUpgradeStorageFormatVersionExecutor::execute(const obrpc::ObRunJobArg &arg)
1909{
1910int ret = OB_SUCCESS;
1911ObInnerJob job = INVALID_INNER_JOB;
1912LOG_INFO("execute upgrade storage format version request", K(arg));
1913if (OB_UNLIKELY(!ctx_.is_inited())) {
1914ret = OB_INVALID_ARGUMENT;
1915LOG_WARN("ObAdminUpgradeStorageFormatVersionExecutor has not been inited", KR(ret));
1916} else if (OB_UNLIKELY(!arg.is_valid())) {
1917ret = OB_INVALID_ARGUMENT;
1918LOG_WARN("invalid arguments", KR(ret), K(arg));
1919} else {
1920job = get_inner_job_value(arg.job_);
1921if (UPGRADE_STORAGE_FORMAT_VERSION == job) {
1922if (OB_ISNULL(ctx_.root_service_)) {
1923ret = OB_ERR_UNEXPECTED;
1924LOG_WARN("error unexpected, root service must not be NULL", KR(ret));
1925} else if (OB_FAIL(ctx_.root_service_->submit_upgrade_storage_format_version_task())) {
1926LOG_WARN("fail to submit upgrade storage format version task", KR(ret));
1927}
1928} else if (STOP_UPGRADE_STORAGE_FORMAT_VERSION == job) {
1929if (OB_ISNULL(ctx_.upgrade_storage_format_executor_)) {
1930ret = OB_ERR_UNEXPECTED;
1931LOG_WARN("executor is null", KR(ret));
1932} else if (OB_FAIL(ctx_.upgrade_storage_format_executor_->stop())) {
1933LOG_WARN("fail to stop upgrade_storage_format task", KR(ret));
1934} else {
1935ctx_.upgrade_storage_format_executor_->start();
1936}
1937} else {
1938ret = OB_ERR_UNEXPECTED;
1939LOG_WARN("invalid job type", KR(ret), K(job));
1940}
1941}
1942return ret;
1943}
1944
1945int ObAdminFlushCache::execute(const obrpc::ObAdminFlushCacheArg &arg)
1946{
1947int ret = OB_SUCCESS;
1948int64_t tenant_num = arg.tenant_ids_.count();
1949ObSEArray<ObAddr, 8> server_list;
1950ObFlushCacheArg fc_arg;
1951// fine-grained plan evict only will pass this way.
1952// This because fine-grained plan evict must specify tenant
1953// if tenant num is 0, flush all tenant, else, flush appointed tenant
1954if (tenant_num != 0) { //flush appointed tenant
1955for (int64_t i = 0; OB_SUCC(ret) && i < tenant_num; ++i) {
1956//get tenant server list;
1957if (OB_FAIL(get_tenant_servers(arg.tenant_ids_.at(i), server_list))) {
1958LOG_WARN("fail to get tenant servers", "tenant_id", arg.tenant_ids_.at(i));
1959} else {
1960//call tenant servers;
1961fc_arg.is_all_tenant_ = false;
1962fc_arg.cache_type_ = arg.cache_type_;
1963fc_arg.ns_type_ = arg.ns_type_;
1964// fine-grained plan evict args
1965if (arg.is_fine_grained_) {
1966fc_arg.sql_id_ = arg.sql_id_;
1967fc_arg.is_fine_grained_ = arg.is_fine_grained_;
1968fc_arg.schema_id_ = arg.schema_id_;
1969for(int64_t j=0; OB_SUCC(ret) && j<arg.db_ids_.count(); j++) {
1970if (OB_FAIL(fc_arg.push_database(arg.db_ids_.at(j)))) {
1971LOG_WARN("fail to add db ids", KR(ret));
1972}
1973}
1974}
1975for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
1976fc_arg.tenant_id_ = arg.tenant_ids_.at(i);
1977LOG_INFO("flush server cache", K(fc_arg), K(server_list.at(j)));
1978if (OB_FAIL(call_server(server_list.at(j), fc_arg))) {
1979LOG_WARN("fail to call tenant server",
1980"tenant_id", arg.tenant_ids_.at(i),
1981"server addr", server_list.at(j));
1982}
1983}
1984}
1985server_list.reset();
1986}
1987} else { // flush all tenant
1988//get all server list, server_mgr_.get_alive_servers
1989if (OB_FAIL(get_all_servers(server_list))) {
1990LOG_WARN("fail to get all servers", KR(ret));
1991} else {
1992fc_arg.is_all_tenant_ = true;
1993fc_arg.tenant_id_ = common::OB_INVALID_TENANT_ID;
1994fc_arg.cache_type_ = arg.cache_type_;
1995fc_arg.ns_type_ = arg.ns_type_;
1996for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
1997LOG_INFO("flush server cache", K(fc_arg), K(server_list.at(j)));
1998if (OB_FAIL(call_server(server_list.at(j), fc_arg))) {
1999LOG_WARN("fail to call tenant server",
2000"server addr", server_list.at(j));
2001}
2002}
2003}
2004}
2005return ret;
2006}
2007
2008#ifdef OB_BUILD_SPM
2009int ObAdminLoadBaseline::execute(const obrpc::ObLoadPlanBaselineArg &arg)
2010{
2011int ret = OB_SUCCESS;
2012ObSEArray<ObAddr, 8> server_list;
2013if (OB_FAIL(get_tenant_servers(arg.tenant_id_, server_list))) {
2014LOG_WARN("fail to get tenant servers", "tenant_id", arg.tenant_id_, KR(ret));
2015} else {
2016//call tenant servers;
2017for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
2018if (OB_FAIL(call_server(server_list.at(j), arg))) {
2019LOG_WARN("fail to call tenant server",
2020"tenant_id", arg.tenant_id_,
2021"server addr", server_list.at(j),
2022KR(ret));
2023}
2024}
2025}
2026server_list.reset();
2027return ret;
2028}
2029
2030int ObAdminLoadBaseline::call_server(const common::ObAddr &server,
2031const obrpc::ObLoadPlanBaselineArg &arg)
2032{
2033int ret = OB_SUCCESS;
2034if (!ctx_.is_inited()) {
2035ret = OB_NOT_INIT;
2036LOG_WARN("not init", KR(ret));
2037} else if (!server.is_valid()) {
2038ret = OB_INVALID_ARGUMENT;
2039LOG_WARN("invalid server", K(server), KR(ret));
2040} else if (OB_FAIL(ctx_.rpc_proxy_->to(server)
2041.by(arg.tenant_id_)
2042.as(arg.tenant_id_)
2043.load_baseline(arg))) {
2044LOG_WARN("request server load baseline failed", KR(ret), K(server));
2045}
2046
2047return ret;
2048}
2049
2050int ObAdminLoadBaselineV2::execute(const obrpc::ObLoadPlanBaselineArg &arg, uint64_t &total_load_count)
2051{
2052int ret = OB_SUCCESS;
2053ObSEArray<ObAddr, 8> server_list;
2054if (OB_FAIL(get_tenant_servers(arg.tenant_id_, server_list))) {
2055LOG_WARN("fail to get tenant servers", "tenant_id", arg.tenant_id_, KR(ret));
2056} else {
2057//call tenant servers;
2058for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
2059obrpc::ObLoadBaselineRes res;
2060if (OB_FAIL(call_server(server_list.at(j), arg, res))) {
2061LOG_WARN("fail to call tenant server",
2062"tenant_id", arg.tenant_id_,
2063"server addr", server_list.at(j),
2064KR(ret));
2065} else {
2066total_load_count += res.load_count_;
2067}
2068}
2069}
2070server_list.reset();
2071return ret;
2072}
2073
2074int ObAdminLoadBaselineV2::call_server(const common::ObAddr &server,
2075const obrpc::ObLoadPlanBaselineArg &arg,
2076obrpc::ObLoadBaselineRes &res)
2077{
2078int ret = OB_SUCCESS;
2079if (!ctx_.is_inited()) {
2080ret = OB_NOT_INIT;
2081LOG_WARN("not init", KR(ret));
2082} else if (!server.is_valid()) {
2083ret = OB_INVALID_ARGUMENT;
2084LOG_WARN("invalid server", K(server), KR(ret));
2085} else if (OB_FAIL(ctx_.rpc_proxy_->to(server)
2086.by(arg.tenant_id_)
2087.as(arg.tenant_id_)
2088.load_baseline_v2(arg, res))) {
2089LOG_WARN("request server load baseline failed", KR(ret), K(server));
2090}
2091return ret;
2092}
2093#endif
2094
2095int ObTenantServerAdminUtil::get_tenant_servers(const uint64_t tenant_id, common::ObIArray<ObAddr> &servers)
2096{
2097int ret = OB_SUCCESS;
2098// sys tenant, get all servers directly
2099if (OB_SYS_TENANT_ID == tenant_id) {
2100if (OB_FAIL(get_all_servers(servers))) {
2101LOG_WARN("fail to get all servers", KR(ret));
2102}
2103} else {
2104ObArray<uint64_t> pool_ids;
2105if (OB_ISNULL(ctx_.unit_mgr_)) {
2106ret = OB_INVALID_ARGUMENT;
2107LOG_WARN("invalid argument", K(ctx_.unit_mgr_), KR(ret));
2108} else if (!SVR_TRACER.has_build() || !ctx_.unit_mgr_->check_inner_stat()) {
2109ret = OB_SERVER_IS_INIT;
2110LOG_WARN("server manager or unit manager hasn't built",
2111"unit_mgr built", ctx_.unit_mgr_->check_inner_stat(), KR(ret));
2112} else if (OB_FAIL(ctx_.unit_mgr_->get_pool_ids_of_tenant(tenant_id, pool_ids))) {
2113LOG_WARN("get_pool_ids_of_tenant failed", K(tenant_id), KR(ret));
2114} else {
2115ObArray<ObUnitInfo> unit_infos;
2116for (int64_t i = 0; OB_SUCC(ret) && i < pool_ids.count(); ++i) {
2117unit_infos.reuse();
2118if (OB_FAIL(ctx_.unit_mgr_->get_unit_infos_of_pool(pool_ids.at(i), unit_infos))) {
2119LOG_WARN("get_unit_infos_of_pool failed", "pool_id", pool_ids.at(i), KR(ret));
2120} else {
2121for (int64_t j = 0; OB_SUCC(ret) && j < unit_infos.count(); ++j) {
2122bool is_alive = false;
2123const ObUnit &unit = unit_infos.at(j).unit_;
2124if (OB_FAIL(SVR_TRACER.check_server_alive(unit.server_, is_alive))) {
2125LOG_WARN("check_server_alive failed", "server", unit.server_, KR(ret));
2126} else if (is_alive) {
2127if (OB_FAIL(servers.push_back(unit.server_))) {
2128LOG_WARN("push_back failed", KR(ret));
2129}
2130}
2131if (OB_SUCC(ret)) {
2132if (unit.migrate_from_server_.is_valid()) {
2133if (OB_FAIL(SVR_TRACER.check_server_alive(
2134unit.migrate_from_server_, is_alive))) {
2135LOG_WARN("check_server_alive failed", "server",
2136unit.migrate_from_server_, KR(ret));
2137} else if (is_alive) {
2138if (OB_FAIL(servers.push_back(unit.migrate_from_server_))) {
2139LOG_WARN("push_back failed", KR(ret));
2140}
2141}
2142}
2143}
2144} // for unit infos end
2145}
2146} // for pool ids end
2147}
2148}
2149
2150return ret;
2151}
2152
2153int ObTenantServerAdminUtil::get_all_servers(common::ObIArray<ObAddr> &servers)
2154{
2155int ret = OB_SUCCESS;
2156ObZone empty_zone;
2157if (OB_FAIL(SVR_TRACER.get_alive_servers(empty_zone, servers))) {
2158//if zone is empty, get all servers
2159LOG_WARN("fail to get all servers", KR(ret));
2160}
2161return ret;
2162}
2163
2164int ObAdminFlushCache::call_server(const common::ObAddr &server, const obrpc::ObFlushCacheArg &arg)
2165{
2166int ret = OB_SUCCESS;
2167if (!ctx_.is_inited()) {
2168ret = OB_NOT_INIT;
2169LOG_WARN("not init", KR(ret));
2170} else if (!server.is_valid()) {
2171ret = OB_INVALID_ARGUMENT;
2172LOG_WARN("invalid server", K(server), KR(ret));
2173} else if (OB_FAIL(ctx_.rpc_proxy_->to(server).flush_cache(arg))) {
2174LOG_WARN("request server flush cache failed", KR(ret), K(server));
2175}
2176return ret;
2177}
2178
2179int ObAdminSetTP::execute(const obrpc::ObAdminSetTPArg &arg)
2180{
2181LOG_INFO("start execute set_tp request", K(arg));
2182int ret = OB_SUCCESS;
2183if (!ctx_.is_inited()) {
2184ret = OB_NOT_INIT;
2185LOG_WARN("not init", KR(ret));
2186} else if (!arg.is_valid()) {
2187ret = OB_INVALID_ARGUMENT;
2188LOG_WARN("invalid arg", K(arg), KR(ret));
2189} else if (OB_FAIL(call_all(arg))) {
2190LOG_WARN("execute report replica failed", KR(ret), K(arg));
2191}
2192LOG_INFO("end execute set_tp request", K(arg));
2193return ret;
2194}
2195
2196int ObAdminSetTP::call_server(const ObAddr &server)
2197{
2198int ret = OB_SUCCESS;
2199if (!ctx_.is_inited()) {
2200ret = OB_NOT_INIT;
2201LOG_WARN("not init", KR(ret));
2202} else if (!server.is_valid()) {
2203ret = OB_INVALID_ARGUMENT;
2204LOG_WARN("invalid server", K(server), KR(ret));
2205} else if (OB_FAIL(ctx_.rpc_proxy_->to(server).set_tracepoint(arg_))) {
2206LOG_WARN("request server report replica failed", KR(ret), K(server));
2207}
2208return ret;
2209}
2210
2211int ObAdminSyncRewriteRules::execute(const obrpc::ObSyncRewriteRuleArg &arg)
2212{
2213int ret = OB_SUCCESS;
2214ObSEArray<ObAddr, 8> server_list;
2215if (OB_FAIL(get_tenant_servers(arg.tenant_id_, server_list))) {
2216LOG_WARN("fail to get tenant servers", "tenant_id", arg.tenant_id_, KR(ret));
2217} else {
2218//call tenant servers;
2219for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
2220if (OB_FAIL(call_server(server_list.at(j), arg))) {
2221LOG_WARN("fail to call tenant server",
2222"tenant_id", arg.tenant_id_,
2223"server addr", server_list.at(j),
2224KR(ret));
2225}
2226}
2227}
2228server_list.reset();
2229return ret;
2230}
2231
2232int ObAdminSyncRewriteRules::call_server(const common::ObAddr &server,
2233const obrpc::ObSyncRewriteRuleArg &arg)
2234{
2235int ret = OB_SUCCESS;
2236if (!ctx_.is_inited()) {
2237ret = OB_NOT_INIT;
2238LOG_WARN("not init", KR(ret));
2239} else if (!server.is_valid()) {
2240ret = OB_INVALID_ARGUMENT;
2241LOG_WARN("invalid server", K(server), KR(ret));
2242} else if (OB_ISNULL(ctx_.rpc_proxy_)) {
2243ret = OB_ERR_UNEXPECTED;
2244LOG_WARN("get unexpected null", K(ret));
2245} else if (OB_FAIL(ctx_.rpc_proxy_->to(server)
2246.by(arg.tenant_id_)
2247.as(arg.tenant_id_)
2248.sync_rewrite_rules(arg))) {
2249LOG_WARN("request server sync rewrite rules failed", KR(ret), K(server));
2250}
2251return ret;
2252}
2253
2254} // end namespace rootserver
2255} // end namespace oceanbase
2256