oceanbase
653 строки · 29.2 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14#include "ob_admin_drtask_util.h"
15#include "logservice/ob_log_service.h" // for ObLogService
16#include "share/ob_locality_parser.h" // for ObLocalityParser
17#include "storage/tx_storage/ob_ls_service.h" // for ObLSService
18#include "storage/ls/ob_ls.h" // for ObLS
19#include "observer/ob_server_event_history_table_operator.h" // for SERVER_EVENT_ADD
20
21namespace oceanbase
22{
23namespace rootserver
24{
25static const char* obadmin_drtask_ret_comment_strs[] = {
26"succeed to send ob_admin command",
27"invalid tenant_id or ls_id in command",
28"expect leader to execute this ob_admin command",
29"fail to send rpc",
30"fail to execute ob_admin command",
31""/*default max*/
32};
33
34const char* ob_admin_drtask_ret_comment_strs(const rootserver::ObAdminDRTaskRetComment ret_comment)
35{
36STATIC_ASSERT(ARRAYSIZEOF(obadmin_drtask_ret_comment_strs) == (int64_t)rootserver::ObAdminDRTaskRetComment::MAX_COMMENT + 1,
37"ret_comment string array size mismatch enum ObAdminDRTaskRetComment count");
38const char *str = NULL;
39if (ret_comment >= rootserver::ObAdminDRTaskRetComment::SUCCEED_TO_SEND_COMMAND && ret_comment <= rootserver::ObAdminDRTaskRetComment::MAX_COMMENT) {
40str = obadmin_drtask_ret_comment_strs[static_cast<int64_t>(ret_comment)];
41} else {
42str = obadmin_drtask_ret_comment_strs[static_cast<int64_t>(rootserver::ObAdminDRTaskRetComment::MAX_COMMENT)];
43LOG_WARN_RET(OB_INVALID_ARGUMENT, "invalid ObAdminDRTaskRetComment", K(ret_comment));
44}
45return str;
46}
47
48int ObAdminDRTaskUtil::handle_obadmin_command(const ObAdminCommandArg &command_arg)
49{
50int ret = OB_SUCCESS;
51int tmp_ret = OB_SUCCESS;
52FLOG_INFO("begin to handle ob_admin command", K(command_arg));
53uint64_t tenant_id = OB_INVALID_TENANT_ID;
54share::ObLSID ls_id;
55ObSqlString result_comment("ResCmmt");
56ObAdminDRTaskRetComment ret_comment = FAIL_TO_EXECUTE_COMMAND;
57int64_t check_begin_time = ObTimeUtility::current_time();
58
59if (OB_UNLIKELY(!command_arg.is_valid())) {
60ret = OB_INVALID_ARGUMENT;
61LOG_WARN("invalid argument", KR(ret), K(command_arg));
62} else if (command_arg.is_remove_task()) {
63if (OB_FAIL(handle_remove_command_(command_arg, tenant_id, ls_id, ret_comment))) {
64LOG_WARN("fail to handle remove command", KR(ret), K(command_arg));
65}
66} else if (command_arg.is_add_task()) {
67if (OB_FAIL(handle_add_command_(command_arg, tenant_id, ls_id, ret_comment))) {
68LOG_WARN("fail to handle add command", KR(ret), K(command_arg));
69}
70} else {
71ret = OB_INVALID_ARGUMENT;
72LOG_WARN("invalid task type", KR(ret), K(command_arg));
73}
74
75if (OB_SUCCESS != (tmp_ret = try_construct_result_comment_(ret, ret_comment, result_comment))) {
76LOG_WARN("fail to construct result comment", K(tmp_ret), KR(ret), K(ret_comment));
77}
78SERVER_EVENT_ADD("ob_admin", command_arg.get_type_str(),
79"tenant_id", tenant_id,
80"ls_id", ls_id.id(),
81"arg", command_arg,
82"result", result_comment,
83"trace_id", ObCurTraceId::get_trace_id_str(),
84"comment", command_arg.get_comment());
85
86int64_t cost = ObTimeUtility::current_time() - check_begin_time;
87FLOG_INFO("finish handle ob_admin command", K(command_arg), K(tenant_id), K(ls_id),
88K(result_comment), K(ret_comment), K(cost));
89return ret;
90}
91
92int ObAdminDRTaskUtil::handle_add_command_(
93const ObAdminCommandArg &command_arg,
94uint64_t &tenant_id,
95share::ObLSID &ls_id,
96ObAdminDRTaskRetComment &ret_comment)
97{
98int ret = OB_SUCCESS;
99tenant_id = OB_INVALID_TENANT_ID;
100ret_comment = FAIL_TO_EXECUTE_COMMAND;
101ObLSAddReplicaArg arg;
102
103if (OB_UNLIKELY(!command_arg.is_valid())
104|| OB_UNLIKELY(!command_arg.is_add_task())) {
105ret = OB_INVALID_ARGUMENT;
106LOG_WARN("invalid argument", KR(ret), K(command_arg));
107} else if (OB_FAIL(construct_arg_for_add_command_(command_arg, arg, ret_comment))) {
108LOG_WARN("fail to construct arg for add command", KR(ret), K(command_arg),
109K(arg), K(ret_comment));
110} else if (OB_FAIL(execute_task_for_add_command_(command_arg, arg, ret_comment))) {
111LOG_WARN("fail to execute task for add command", KR(ret), K(command_arg), K(arg), K(ret_comment));
112} else {
113tenant_id = arg.tenant_id_;
114ls_id = arg.ls_id_;
115ret_comment = SUCCEED_TO_SEND_COMMAND;
116}
117return ret;
118}
119
120int ObAdminDRTaskUtil::construct_arg_for_add_command_(
121const ObAdminCommandArg &command_arg,
122ObLSAddReplicaArg &arg,
123ObAdminDRTaskRetComment &ret_comment)
124{
125int ret = OB_SUCCESS;
126ret_comment = FAIL_TO_EXECUTE_COMMAND;
127uint64_t tenant_id = OB_INVALID_TENANT_ID;
128share::ObLSID ls_id;
129ObReplicaType replica_type = REPLICA_TYPE_FULL;
130common::ObAddr data_source_server;
131common::ObAddr target_server;
132int64_t orig_paxos_replica_number = 0;
133int64_t new_paxos_replica_number = 0;
134
135if (OB_UNLIKELY(!command_arg.is_valid())
136|| OB_UNLIKELY(!command_arg.is_add_task())) {
137ret = OB_INVALID_ARGUMENT;
138LOG_WARN("invalid argument", KR(ret), K(command_arg));
139// STEP 1: parse parameters from ob_admin command directly
140} else if (OB_FAIL(parse_params_from_obadmin_command_arg(
141command_arg, tenant_id, ls_id, replica_type, data_source_server,
142target_server, orig_paxos_replica_number, new_paxos_replica_number))) {
143LOG_WARN("fail to parse parameters provided in ob_admin command", KR(ret), K(command_arg));
144} else if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) {
145ret = OB_INVALID_ARGUMENT;
146ret_comment = ObAdminDRTaskRetComment::TENANT_ID_OR_LS_ID_NOT_VALID;
147LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(command_arg), K(tenant_id), K(ls_id));
148} else if (OB_UNLIKELY(!target_server.is_valid())
149|| OB_UNLIKELY(REPLICA_TYPE_FULL != replica_type && REPLICA_TYPE_READONLY != replica_type)) {
150ret = OB_INVALID_ARGUMENT;
151LOG_WARN("invalid argument", KR(ret), K(replica_type), K(target_server));
152// STEP 2: construct orig_paxos_replica_number and data_source_server if not specified by ob_admin command
153} else if (0 == orig_paxos_replica_number || !data_source_server.is_valid()) {
154if (OB_FAIL(construct_default_params_for_add_command_(
155tenant_id,
156ls_id,
157orig_paxos_replica_number,
158data_source_server))) {
159LOG_WARN("fail to fetch ls info and construct related parameters", KR(ret), K(tenant_id),
160K(ls_id), K(orig_paxos_replica_number), K(data_source_server));
161}
162}
163
164if (OB_SUCC(ret)) {
165new_paxos_replica_number = 0 == new_paxos_replica_number
166? orig_paxos_replica_number
167: new_paxos_replica_number;
168ObReplicaMember data_source_member(data_source_server, 0/*timstamp*/);
169ObReplicaMember add_member(target_server, ObTimeUtility::current_time(), replica_type);
170// STEP 3: construct arg
171if (OB_ISNULL(ObCurTraceId::get_trace_id())) {
172ret = OB_INVALID_ARGUMENT;
173LOG_WARN("invalid argument", KR(ret));
174} else if (OB_FAIL(arg.init(
175*ObCurTraceId::get_trace_id()/*task_id*/,
176tenant_id,
177ls_id,
178add_member,
179data_source_member,
180orig_paxos_replica_number,
181new_paxos_replica_number,
182false/*is_skip_change_member_list-not used*/))) {
183LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(ls_id), K(add_member),
184K(data_source_member), K(orig_paxos_replica_number), K(new_paxos_replica_number));
185}
186}
187return ret;
188}
189
190int ObAdminDRTaskUtil::construct_default_params_for_add_command_(
191const uint64_t &tenant_id,
192const share::ObLSID &ls_id,
193int64_t &orig_paxos_replica_number,
194common::ObAddr &data_source_server)
195{
196int ret = OB_SUCCESS;
197share::ObLSInfo ls_info;
198const share::ObLSReplica *leader_replica = nullptr;
199
200if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) {
201ret = OB_INVALID_ARGUMENT;
202LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id));
203} else if (OB_ISNULL(GCTX.lst_operator_)) {
204ret = OB_INVALID_ARGUMENT;
205LOG_WARN("invalid ls table operator", KR(ret));
206} else if (OB_FAIL(GCTX.lst_operator_->get(GCONF.cluster_id, tenant_id, ls_id,
207share::ObLSTable::COMPOSITE_MODE, ls_info))) {
208LOG_WARN("fail to get ls info", KR(ret), K(tenant_id), K(ls_id), K(ls_info));
209} else if (OB_FAIL(ls_info.find_leader(leader_replica))) {
210LOG_WARN("fail to get ls leader replica", KR(ret), K(ls_info));
211} else if (OB_ISNULL(leader_replica)) {
212ret = OB_INVALID_ARGUMENT;
213LOG_WARN("invalid leader replica", KR(ret), K(ls_info));
214} else {
215// If [orig_paxos_replica_number] or [data_source_server] not specified in obadmin command,
216// need to construct from leader_replica, use leader replica as default
217if (0 == orig_paxos_replica_number) {
218orig_paxos_replica_number = leader_replica->get_paxos_replica_number();
219}
220if (!data_source_server.is_valid()) {
221data_source_server = leader_replica->get_server();
222}
223}
224return ret;
225}
226
227int ObAdminDRTaskUtil::execute_task_for_add_command_(
228const ObAdminCommandArg &command_arg,
229const ObLSAddReplicaArg &arg,
230ObAdminDRTaskRetComment &ret_comment)
231{
232int ret = OB_SUCCESS;
233ret_comment = FAIL_TO_EXECUTE_COMMAND;
234const int64_t add_timeout = GCONF.rpc_timeout * 5;
235
236if (OB_UNLIKELY(!arg.is_valid())
237|| OB_UNLIKELY(!command_arg.is_valid())
238|| OB_UNLIKELY(!command_arg.is_add_task())) {
239ret = OB_INVALID_ARGUMENT;
240LOG_WARN("invalid argument", KR(ret), K(arg), K(command_arg));
241} else if (GCTX.self_addr() == arg.dst_.get_server()) {
242// do not need to send rpc, execute locally
243MTL_SWITCH(arg.tenant_id_) {
244if (OB_FAIL(observer::ObService::do_add_ls_replica(arg))) {
245LOG_WARN("fail to execute add replica rpc locally", KR(ret), K(arg));
246}
247}
248} else if (OB_ISNULL(GCTX.srv_rpc_proxy_)) {
249ret = OB_INVALID_ARGUMENT;
250LOG_WARN("svr rpc proxy is nullptr", KR(ret));
251} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(arg.dst_.get_server()).by(arg.tenant_id_).timeout(add_timeout).ls_add_replica(arg))) {
252ret_comment = ObAdminDRTaskRetComment::FAILED_TO_SEND_RPC;
253LOG_WARN("fail to execute add replica rpc", KR(ret), K(arg), K(add_timeout));
254}
255
256if (OB_SUCC(ret)) {
257// local execute or rpc is send, log task start, task finish will be recorded later
258ROOTSERVICE_EVENT_ADD("disaster_recovery", drtasklog::START_ADD_LS_REPLICA_STR,
259"tenant_id", arg.tenant_id_,
260"ls_id", arg.ls_id_.id(),
261"task_id", ObCurTraceId::get_trace_id_str(),
262"destination", arg.dst_,
263"comment", command_arg.get_comment());
264}
265return ret;
266}
267
268int ObAdminDRTaskUtil::handle_remove_command_(
269const ObAdminCommandArg &command_arg,
270uint64_t &tenant_id,
271share::ObLSID &ls_id,
272ObAdminDRTaskRetComment &ret_comment)
273{
274int ret = OB_SUCCESS;
275tenant_id = OB_INVALID_TENANT_ID;
276ret_comment = FAIL_TO_EXECUTE_COMMAND;
277ObReplicaType replica_type = REPLICA_TYPE_FULL;
278common::ObAddr data_source_server;
279common::ObAddr target_server;
280int64_t orig_paxos_replica_number = 0;
281int64_t new_paxos_replica_number = 0;
282
283if (OB_UNLIKELY(!command_arg.is_valid())
284|| OB_UNLIKELY(!command_arg.is_remove_task())) {
285ret = OB_INVALID_ARGUMENT;
286LOG_WARN("invalid argument", KR(ret), K(command_arg));
287// STEP 1: parse parameters from ob_admin command directly
288} else if (OB_FAIL(parse_params_from_obadmin_command_arg(
289command_arg, tenant_id, ls_id, replica_type, data_source_server,
290target_server, orig_paxos_replica_number, new_paxos_replica_number))) {
291LOG_WARN("fail to parse parameters provided in ob_admin command", KR(ret), K(command_arg));
292} else if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) {
293ret = OB_INVALID_ARGUMENT;
294ret_comment = ObAdminDRTaskRetComment::TENANT_ID_OR_LS_ID_NOT_VALID;
295LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(command_arg), K(tenant_id), K(ls_id));
296} else if (OB_UNLIKELY(!target_server.is_valid())
297|| OB_UNLIKELY(REPLICA_TYPE_FULL != replica_type && REPLICA_TYPE_READONLY != replica_type)) {
298ret = OB_INVALID_ARGUMENT;
299LOG_WARN("invalid argument", KR(ret), K(replica_type), K(target_server));
300} else {
301// STEP 2: construct args and execute
302if (REPLICA_TYPE_FULL == replica_type) {
303ObLSDropPaxosReplicaArg remove_paxos_arg;
304if (OB_FAIL(construct_remove_paxos_task_arg_(
305tenant_id, ls_id, target_server, orig_paxos_replica_number,
306new_paxos_replica_number, ret_comment, remove_paxos_arg))) {
307LOG_WARN("fail to construct remove paxos task arg", KR(ret), K(tenant_id), K(ls_id),
308K(target_server), K(orig_paxos_replica_number), K(new_paxos_replica_number),
309K(ret_comment), K(remove_paxos_arg));
310} else if (OB_FAIL(execute_remove_paxos_task_(command_arg, remove_paxos_arg))) {
311LOG_WARN("fail to execute remove paxos replica task", KR(ret), K(command_arg), K(remove_paxos_arg));
312} else {
313ret_comment = SUCCEED_TO_SEND_COMMAND;
314}
315} else if (REPLICA_TYPE_READONLY == replica_type) {
316ObLSDropNonPaxosReplicaArg remove_nonpaxos_arg;
317if (OB_FAIL(construct_remove_nonpaxos_task_arg_(
318tenant_id, ls_id, target_server, ret_comment, remove_nonpaxos_arg))) {
319LOG_WARN("fail to construct remove non-paxos replica task arg", KR(ret), K(tenant_id),
320K(ls_id), K(target_server), K(ret_comment), K(remove_nonpaxos_arg));
321} else if (OB_FAIL(execute_remove_nonpaxos_task_(command_arg, remove_nonpaxos_arg))) {
322LOG_WARN("fail to execute remove nonpaxos replica task", KR(ret), K(command_arg), K(remove_nonpaxos_arg));
323} else {
324ret_comment = SUCCEED_TO_SEND_COMMAND;
325}
326} else {
327ret = OB_INVALID_ARGUMENT;
328LOG_WARN("unexpected replica type", KR(ret), K(replica_type), K(tenant_id), K(ls_id), K(command_arg));
329}
330}
331return ret;
332}
333
334int ObAdminDRTaskUtil::construct_remove_paxos_task_arg_(
335const uint64_t &tenant_id,
336const share::ObLSID &ls_id,
337const common::ObAddr &target_server,
338int64_t &orig_paxos_replica_number,
339int64_t &new_paxos_replica_number,
340ObAdminDRTaskRetComment &ret_comment,
341ObLSDropPaxosReplicaArg &remove_paxos_arg)
342{
343int ret = OB_SUCCESS;
344ret_comment = FAIL_TO_EXECUTE_COMMAND;
345common::ObMember member;
346ObReplicaMember member_to_remove;
347palf::PalfStat palf_stat;
348
349if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))
350|| OB_UNLIKELY(!target_server.is_valid())) {
351ret = OB_INVALID_ARGUMENT;
352LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(tenant_id), K(ls_id), K(target_server));
353} else if (OB_FAIL(get_local_palf_stat_(tenant_id, ls_id, palf_stat, ret_comment))) {
354LOG_WARN("fail to get local palf stat", KR(ret), K(tenant_id), K(ls_id));
355} else if (OB_UNLIKELY(!palf_stat.is_valid())) {
356ret = OB_INVALID_ARGUMENT;
357LOG_WARN("invalid argument", KR(ret), K(palf_stat));
358} else if (OB_UNLIKELY(!palf_stat.paxos_member_list_.contains(target_server))) {
359ret = OB_ENTRY_NOT_EXIST;
360LOG_WARN("replica not found in member_list", KR(ret), K(target_server), K(palf_stat));
361} else if (OB_FAIL(palf_stat.paxos_member_list_.get_member_by_addr(target_server, member))) {
362LOG_WARN("fail to get member from paxos_member_list", KR(ret), K(palf_stat), K(target_server));
363} else {
364member_to_remove = ObReplicaMember(member);
365if (OB_FAIL(member_to_remove.set_replica_type(REPLICA_TYPE_FULL))) {
366LOG_WARN("fail to set replica type for member to remove", KR(ret));
367} else {
368// If [orig_paxos_replica_number] not specified in obadmin command,
369// use leader replica's info as default
370orig_paxos_replica_number = 0 == orig_paxos_replica_number
371? palf_stat.paxos_replica_num_
372: orig_paxos_replica_number;
373new_paxos_replica_number = 0 == new_paxos_replica_number
374? orig_paxos_replica_number
375: new_paxos_replica_number;
376}
377}
378if (OB_FAIL(ret)) {
379} else if (OB_ISNULL(ObCurTraceId::get_trace_id())) {
380ret = OB_INVALID_ARGUMENT;
381LOG_WARN("invalid argument", KR(ret));
382} else if (OB_FAIL(remove_paxos_arg.init(
383*ObCurTraceId::get_trace_id()/*task_id*/, tenant_id, ls_id,
384member_to_remove, orig_paxos_replica_number, new_paxos_replica_number))) {
385LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(ls_id), K(member_to_remove),
386K(orig_paxos_replica_number), K(new_paxos_replica_number));
387}
388return ret;
389}
390
391int ObAdminDRTaskUtil::construct_remove_nonpaxos_task_arg_(
392const uint64_t &tenant_id,
393const share::ObLSID &ls_id,
394const common::ObAddr &target_server,
395ObAdminDRTaskRetComment &ret_comment,
396ObLSDropNonPaxosReplicaArg &remove_nonpaxos_arg)
397{
398int ret = OB_SUCCESS;
399ret_comment = FAIL_TO_EXECUTE_COMMAND;
400common::ObMember member;
401ObReplicaMember member_to_remove;
402palf::PalfStat palf_stat;
403
404if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))
405|| OB_UNLIKELY(!target_server.is_valid())) {
406ret = OB_INVALID_ARGUMENT;
407LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(tenant_id), K(ls_id), K(target_server));
408} else if (OB_FAIL(get_local_palf_stat_(tenant_id, ls_id, palf_stat, ret_comment))) {
409LOG_WARN("fail to get local palf stat", KR(ret), K(tenant_id), K(ls_id));
410} else if (OB_UNLIKELY(!palf_stat.is_valid())) {
411ret = OB_INVALID_ARGUMENT;
412LOG_WARN("invalid argument", KR(ret), K(palf_stat));
413} else if (OB_UNLIKELY(!palf_stat.learner_list_.contains(target_server))) {
414ret = OB_ENTRY_NOT_EXIST;
415LOG_WARN("replica not found in learner_list", KR(ret), K(target_server), K(palf_stat));
416} else if (OB_FAIL(palf_stat.learner_list_.get_learner_by_addr(target_server, member))) {
417LOG_WARN("fail to get member from learner_list", KR(ret), K(palf_stat), K(target_server));
418} else {
419member_to_remove = ObReplicaMember(member);
420if (OB_FAIL(member_to_remove.set_replica_type(REPLICA_TYPE_READONLY))) {
421LOG_WARN("fail to set replica type for member to remove", KR(ret));
422} else if (OB_ISNULL(ObCurTraceId::get_trace_id())) {
423ret = OB_INVALID_ARGUMENT;
424LOG_WARN("invalid argument", KR(ret));
425} else if (OB_FAIL(remove_nonpaxos_arg.init(
426*ObCurTraceId::get_trace_id()/*task_id*/, tenant_id,
427ls_id, member_to_remove))) {
428LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(ls_id), K(member_to_remove));
429}
430}
431return ret;
432}
433
434int ObAdminDRTaskUtil::get_local_palf_stat_(
435const uint64_t &tenant_id,
436const share::ObLSID &ls_id,
437palf::PalfStat &palf_stat,
438ObAdminDRTaskRetComment &ret_comment)
439{
440int ret = OB_SUCCESS;
441ret_comment = FAIL_TO_EXECUTE_COMMAND;
442palf_stat.reset();
443
444if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) {
445ret = OB_INVALID_ARGUMENT;
446LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(tenant_id), K(ls_id));
447} else {
448MTL_SWITCH(tenant_id) {
449logservice::ObLogService *log_service = NULL;
450palf::PalfHandleGuard palf_handle_guard;
451if (OB_ISNULL(log_service = MTL(logservice::ObLogService*))) {
452ret = OB_ERR_UNEXPECTED;
453LOG_WARN("MTL ObLogService is null", KR(ret), K(tenant_id));
454} else if (OB_FAIL(log_service->open_palf(ls_id, palf_handle_guard))) {
455LOG_WARN("failed to open palf", KR(ret), K(tenant_id), K(ls_id));
456} else if (OB_FAIL(palf_handle_guard.stat(palf_stat))) {
457LOG_WARN("get palf_stat failed", KR(ret), K(tenant_id), K(ls_id));
458} else if (LEADER != palf_stat.role_) {
459ret = OB_STATE_NOT_MATCH;
460ret_comment = ObAdminDRTaskRetComment::SERVER_TO_EXECUTE_COMMAND_NOT_LEADER;
461LOG_WARN("invalid argument, expect self address is leader replica", KR(ret),
462K(tenant_id), K(ls_id), K(palf_stat));
463}
464}
465}
466return ret;
467}
468
469int ObAdminDRTaskUtil::execute_remove_paxos_task_(
470const ObAdminCommandArg &command_arg,
471const ObLSDropPaxosReplicaArg &remove_paxos_arg)
472{
473int ret = OB_SUCCESS;
474if (OB_UNLIKELY(!command_arg.is_valid())
475|| OB_UNLIKELY(!command_arg.is_remove_task())
476|| OB_UNLIKELY(!remove_paxos_arg.is_valid())) {
477ret = OB_INVALID_ARGUMENT;
478LOG_WARN("invalid argument", KR(ret), K(command_arg), K(remove_paxos_arg));
479} else {
480// do not need to send rpc, just execute locally
481LOG_INFO("start to remove member from member_list", K(remove_paxos_arg));
482MTL_SWITCH(remove_paxos_arg.tenant_id_) {
483if (OB_FAIL(observer::ObService::do_remove_ls_paxos_replica(remove_paxos_arg))) {
484LOG_WARN("fail to execute remove paxos replica rpc locally", KR(ret), K(remove_paxos_arg));
485}
486}
487}
488if (OB_SUCC(ret)) {
489// rpc is send, log task start, task finish will be recorded later
490ROOTSERVICE_EVENT_ADD("disaster_recovery", drtasklog::START_REMOVE_LS_PAXOS_REPLICA_STR,
491"tenant_id", remove_paxos_arg.tenant_id_,
492"ls_id", remove_paxos_arg.ls_id_.id(),
493"task_id", ObCurTraceId::get_trace_id_str(),
494"remove_server", remove_paxos_arg.remove_member_,
495"comment", command_arg.get_comment());
496}
497return ret;
498}
499
500int ObAdminDRTaskUtil::execute_remove_nonpaxos_task_(
501const ObAdminCommandArg &command_arg,
502const ObLSDropNonPaxosReplicaArg &remove_non_paxos_arg)
503{
504int ret = OB_SUCCESS;
505if (OB_UNLIKELY(!command_arg.is_valid())
506|| OB_UNLIKELY(!command_arg.is_remove_task())
507|| OB_UNLIKELY(!remove_non_paxos_arg.is_valid())) {
508ret = OB_INVALID_ARGUMENT;
509LOG_WARN("invalid argument", KR(ret), K(command_arg), K(remove_non_paxos_arg));
510} else {
511// do not need to send rpc, just execute locally
512LOG_INFO("start to remove learner from learner_list", K(remove_non_paxos_arg));
513MTL_SWITCH(remove_non_paxos_arg.tenant_id_) {
514if (OB_FAIL(observer::ObService::do_remove_ls_nonpaxos_replica(remove_non_paxos_arg))) {
515LOG_WARN("fail to execute remove non-paxos replica rpc locally", KR(ret), K(remove_non_paxos_arg));
516}
517}
518}
519if (OB_SUCC(ret)) {
520// rpc is send, log task start, task finish will be recorded later
521ROOTSERVICE_EVENT_ADD("disaster_recovery", drtasklog::START_REMOVE_LS_PAXOS_REPLICA_STR,
522"tenant_id", remove_non_paxos_arg.tenant_id_,
523"ls_id", remove_non_paxos_arg.ls_id_.id(),
524"task_id", ObCurTraceId::get_trace_id_str(),
525"remove_server", remove_non_paxos_arg.remove_member_,
526"comment", command_arg.get_comment());
527}
528return ret;
529}
530
531int ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(
532const ObAdminCommandArg &command_arg,
533uint64_t &tenant_id,
534share::ObLSID &ls_id,
535ObReplicaType &replica_type,
536common::ObAddr &data_source_server,
537common::ObAddr &target_server,
538int64_t &orig_paxos_replica_number,
539int64_t &new_paxos_replica_number)
540{
541int ret = OB_SUCCESS;
542// reset output params
543tenant_id = OB_INVALID_TENANT_ID;
544ls_id.reset();
545replica_type = REPLICA_TYPE_FULL;
546data_source_server.reset();
547target_server.reset();
548orig_paxos_replica_number = 0;
549new_paxos_replica_number = 0;
550// construct items to use
551ObArenaAllocator allocator("ObAdminDRTask");
552ObString admin_command_before_trim;
553ObString admin_command_after_trim;
554ObArray<ObString> command_params_array;
555if (OB_UNLIKELY(!command_arg.is_valid())) {
556ret = OB_INVALID_ARGUMENT;
557LOG_WARN("invalid argument", KR(ret), K(command_arg));
558} else if (OB_FAIL(ob_write_string(allocator, command_arg.get_admin_command_str(), admin_command_before_trim))) {
559LOG_WARN("fail to write string", KR(ret), K(command_arg));
560} else if (FALSE_IT(admin_command_after_trim = admin_command_before_trim.trim())) {
561} else if (OB_FAIL(split_on(admin_command_after_trim, ',', command_params_array))) {
562LOG_WARN("fail to split string", KR(ret), K(admin_command_after_trim), K(admin_command_before_trim));
563} else {
564LOG_INFO("start to parse parameters from command", K(command_arg), K(command_params_array));
565ObSqlString data_source_string("DtStr");
566for (int64_t param_index = 0;
567param_index < command_params_array.count() && OB_SUCC(ret);
568param_index++) {
569ObString param_name_with_value_str = command_params_array.at(param_index);
570ObArray<ObString> param_name_with_value;
571ObSqlString param_name("ParamN");
572ObSqlString param_value("ParamV");
573int64_t pos = 0;
574if (OB_FAIL(split_on(param_name_with_value_str, '=', param_name_with_value))) {
575LOG_WARN("fail to split param name and value", KR(ret), K(param_name_with_value_str));
576} else if (OB_UNLIKELY(2 != param_name_with_value.count())) {
577ret = OB_INVALID_ARGUMENT;
578LOG_WARN("invalid argument", KR(ret), K(param_name_with_value));
579} else if (OB_FAIL(param_name.assign(param_name_with_value.at(0).trim()))) {
580LOG_WARN("fail to construct parameter name", KR(ret), K(param_name_with_value));
581} else if (OB_FAIL(param_value.assign(param_name_with_value.at(1).trim()))) {
582LOG_WARN("fail to construct parameter value", KR(ret), K(param_name_with_value));
583} else if (0 == param_name.string().case_compare("tenant_id")) {
584int64_t tenant_id_to_set = OB_INVALID_TENANT_ID;
585if (OB_FAIL(extract_int(param_value.string(), 0, pos, tenant_id_to_set))) {
586LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(tenant_id_to_set));
587} else {
588tenant_id = tenant_id_to_set;
589}
590} else if (0 == param_name.string().case_compare("ls_id")) {
591int64_t ls_id_to_set;
592if (OB_FAIL(extract_int(param_value.string(), 0, pos, ls_id_to_set))) {
593LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(ls_id_to_set));
594} else {
595ls_id = share::ObLSID(ls_id_to_set);
596}
597} else if (0 == param_name.string().case_compare("replica_type")) {
598if (OB_FAIL(share::ObLocalityParser::parse_type(
599param_value.ptr(),
600param_value.length(),
601replica_type))) {
602LOG_WARN("fail to parse replica type", KR(ret), K(param_name_with_value), K(replica_type));
603}
604} else if (0 == param_name.string().case_compare("orig_paxos_replica_number")) {
605if (OB_FAIL(extract_int(param_value.string(), 0, pos, orig_paxos_replica_number))) {
606LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(orig_paxos_replica_number));
607}
608} else if (0 == param_name.string().case_compare("new_paxos_replica_number")) {
609if (OB_FAIL(extract_int(param_value.string(), 0, pos, new_paxos_replica_number))) {
610LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(new_paxos_replica_number));
611}
612} else if (0 == param_name.string().case_compare("server")) {
613if (OB_FAIL(target_server.parse_from_string(param_value.string()))) {
614LOG_WARN("fail to construct target server from string", KR(ret), K(param_value));
615}
616} else if (0 == param_name.string().case_compare("data_source")) {
617if (OB_FAIL(data_source_server.parse_from_string(param_value.string()))) {
618LOG_WARN("fail to construct data source server from string", KR(ret), K(param_value));
619}
620} else {
621ret = OB_INVALID_ARGUMENT;
622LOG_WARN("invalid argument", KR(ret), K(param_name_with_value_str), K(param_name_with_value));
623}
624}
625
626if (OB_SUCC(ret)) {
627// if [server] not specified, use local as default
628target_server = target_server.is_valid() ? target_server : GCTX.self_addr();
629}
630
631LOG_INFO("finish parse parameters from command", KR(ret), K(command_arg), K(command_params_array), K(tenant_id),
632K(ls_id), K(replica_type), K(data_source_server), K(target_server), K(orig_paxos_replica_number),
633K(new_paxos_replica_number));
634}
635return ret;
636}
637
638int ObAdminDRTaskUtil::try_construct_result_comment_(
639const int &ret_code,
640const ObAdminDRTaskRetComment &ret_comment,
641ObSqlString &result_comment)
642{
643int ret = OB_SUCCESS;
644result_comment.reset();
645if (OB_FAIL(result_comment.assign_fmt("ret:%d, %s; ret_comment:%s;",
646ret_code, common::ob_error_name(ret_code),
647ob_admin_drtask_ret_comment_strs(ret_comment)))) {
648LOG_WARN("fail to construct result comment", KR(ret), K(ret_code), K(ret_comment));
649}
650return ret;
651}
652} // end namespace rootserver
653} // end namespace oceanbase
654