oceanbase

Форк
0
/
ob_admin_drtask_util.cpp 
653 строки · 29.2 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14
#include "ob_admin_drtask_util.h"
15
#include "logservice/ob_log_service.h" // for ObLogService
16
#include "share/ob_locality_parser.h" // for ObLocalityParser
17
#include "storage/tx_storage/ob_ls_service.h" // for ObLSService
18
#include "storage/ls/ob_ls.h" // for ObLS
19
#include "observer/ob_server_event_history_table_operator.h" // for SERVER_EVENT_ADD
20

21
namespace oceanbase
22
{
23
namespace rootserver
24
{
25
static const char* obadmin_drtask_ret_comment_strs[] = {
26
  "succeed to send ob_admin command",
27
  "invalid tenant_id or ls_id in command",
28
  "expect leader to execute this ob_admin command",
29
  "fail to send rpc",
30
  "fail to execute ob_admin command",
31
  ""/*default max*/
32
};
33

34
const char* ob_admin_drtask_ret_comment_strs(const rootserver::ObAdminDRTaskRetComment ret_comment)
35
{
36
  STATIC_ASSERT(ARRAYSIZEOF(obadmin_drtask_ret_comment_strs) == (int64_t)rootserver::ObAdminDRTaskRetComment::MAX_COMMENT + 1,
37
                "ret_comment string array size mismatch enum ObAdminDRTaskRetComment count");
38
  const char *str = NULL;
39
  if (ret_comment >= rootserver::ObAdminDRTaskRetComment::SUCCEED_TO_SEND_COMMAND && ret_comment <= rootserver::ObAdminDRTaskRetComment::MAX_COMMENT) {
40
    str = obadmin_drtask_ret_comment_strs[static_cast<int64_t>(ret_comment)];
41
  } else {
42
    str = obadmin_drtask_ret_comment_strs[static_cast<int64_t>(rootserver::ObAdminDRTaskRetComment::MAX_COMMENT)];
43
    LOG_WARN_RET(OB_INVALID_ARGUMENT, "invalid ObAdminDRTaskRetComment", K(ret_comment));
44
  }
45
  return str;
46
}
47

48
int ObAdminDRTaskUtil::handle_obadmin_command(const ObAdminCommandArg &command_arg)
49
{
50
  int ret = OB_SUCCESS;
51
  int tmp_ret = OB_SUCCESS;
52
  FLOG_INFO("begin to handle ob_admin command", K(command_arg));
53
  uint64_t tenant_id = OB_INVALID_TENANT_ID;
54
  share::ObLSID ls_id;
55
  ObSqlString result_comment("ResCmmt");
56
  ObAdminDRTaskRetComment ret_comment = FAIL_TO_EXECUTE_COMMAND;
57
  int64_t check_begin_time = ObTimeUtility::current_time();
58

59
  if (OB_UNLIKELY(!command_arg.is_valid())) {
60
    ret = OB_INVALID_ARGUMENT;
61
    LOG_WARN("invalid argument", KR(ret), K(command_arg));
62
  } else if (command_arg.is_remove_task()) {
63
    if (OB_FAIL(handle_remove_command_(command_arg, tenant_id, ls_id, ret_comment))) {
64
      LOG_WARN("fail to handle remove command", KR(ret), K(command_arg));
65
    }
66
  } else if (command_arg.is_add_task()) {
67
    if (OB_FAIL(handle_add_command_(command_arg, tenant_id, ls_id, ret_comment))) {
68
      LOG_WARN("fail to handle add command", KR(ret), K(command_arg));
69
    }
70
  } else {
71
    ret = OB_INVALID_ARGUMENT;
72
    LOG_WARN("invalid task type", KR(ret), K(command_arg));
73
  }
74

75
  if (OB_SUCCESS != (tmp_ret = try_construct_result_comment_(ret, ret_comment, result_comment))) {
76
    LOG_WARN("fail to construct result comment", K(tmp_ret), KR(ret), K(ret_comment));
77
  }
78
  SERVER_EVENT_ADD("ob_admin", command_arg.get_type_str(),
79
                   "tenant_id", tenant_id,
80
                   "ls_id", ls_id.id(),
81
                   "arg", command_arg,
82
                   "result", result_comment,
83
                   "trace_id", ObCurTraceId::get_trace_id_str(),
84
                   "comment", command_arg.get_comment());
85

86
  int64_t cost = ObTimeUtility::current_time() - check_begin_time;
87
  FLOG_INFO("finish handle ob_admin command", K(command_arg), K(tenant_id), K(ls_id),
88
            K(result_comment), K(ret_comment), K(cost));
89
  return ret;
90
}
91

92
int ObAdminDRTaskUtil::handle_add_command_(
93
    const ObAdminCommandArg &command_arg,
94
    uint64_t &tenant_id,
95
    share::ObLSID &ls_id,
96
    ObAdminDRTaskRetComment &ret_comment)
97
{
98
  int ret = OB_SUCCESS;
99
  tenant_id = OB_INVALID_TENANT_ID;
100
  ret_comment = FAIL_TO_EXECUTE_COMMAND;
101
  ObLSAddReplicaArg arg;
102

103
  if (OB_UNLIKELY(!command_arg.is_valid())
104
      || OB_UNLIKELY(!command_arg.is_add_task())) {
105
    ret = OB_INVALID_ARGUMENT;
106
    LOG_WARN("invalid argument", KR(ret), K(command_arg));
107
  } else if (OB_FAIL(construct_arg_for_add_command_(command_arg, arg, ret_comment))) {
108
    LOG_WARN("fail to construct arg for add command", KR(ret), K(command_arg),
109
             K(arg), K(ret_comment));
110
  } else if (OB_FAIL(execute_task_for_add_command_(command_arg, arg, ret_comment))) {
111
    LOG_WARN("fail to execute task for add command", KR(ret), K(command_arg), K(arg), K(ret_comment));
112
  } else {
113
    tenant_id = arg.tenant_id_;
114
    ls_id = arg.ls_id_;
115
    ret_comment = SUCCEED_TO_SEND_COMMAND;
116
  }
117
  return ret;
118
}
119

120
int ObAdminDRTaskUtil::construct_arg_for_add_command_(
121
    const ObAdminCommandArg &command_arg,
122
    ObLSAddReplicaArg &arg,
123
    ObAdminDRTaskRetComment &ret_comment)
124
{
125
  int ret = OB_SUCCESS;
126
  ret_comment = FAIL_TO_EXECUTE_COMMAND;
127
  uint64_t tenant_id = OB_INVALID_TENANT_ID;
128
  share::ObLSID ls_id;
129
  ObReplicaType replica_type = REPLICA_TYPE_FULL;
130
  common::ObAddr data_source_server;
131
  common::ObAddr target_server;
132
  int64_t orig_paxos_replica_number = 0;
133
  int64_t new_paxos_replica_number = 0;
134

135
  if (OB_UNLIKELY(!command_arg.is_valid())
136
      || OB_UNLIKELY(!command_arg.is_add_task())) {
137
    ret = OB_INVALID_ARGUMENT;
138
    LOG_WARN("invalid argument", KR(ret), K(command_arg));
139
  // STEP 1: parse parameters from ob_admin command directly
140
  } else if (OB_FAIL(parse_params_from_obadmin_command_arg(
141
                         command_arg, tenant_id, ls_id, replica_type, data_source_server,
142
                         target_server, orig_paxos_replica_number, new_paxos_replica_number))) {
143
    LOG_WARN("fail to parse parameters provided in ob_admin command", KR(ret), K(command_arg));
144
  } else if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) {
145
    ret = OB_INVALID_ARGUMENT;
146
    ret_comment = ObAdminDRTaskRetComment::TENANT_ID_OR_LS_ID_NOT_VALID;
147
    LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(command_arg), K(tenant_id), K(ls_id));
148
  } else if (OB_UNLIKELY(!target_server.is_valid())
149
             || OB_UNLIKELY(REPLICA_TYPE_FULL != replica_type && REPLICA_TYPE_READONLY != replica_type)) {
150
    ret = OB_INVALID_ARGUMENT;
151
    LOG_WARN("invalid argument", KR(ret), K(replica_type), K(target_server));
152
  // STEP 2: construct orig_paxos_replica_number and data_source_server if not specified by ob_admin command
153
  } else if (0 == orig_paxos_replica_number || !data_source_server.is_valid()) {
154
    if (OB_FAIL(construct_default_params_for_add_command_(
155
                    tenant_id,
156
                    ls_id,
157
                    orig_paxos_replica_number,
158
                    data_source_server))) {
159
      LOG_WARN("fail to fetch ls info and construct related parameters", KR(ret), K(tenant_id),
160
               K(ls_id), K(orig_paxos_replica_number), K(data_source_server));
161
    }
162
  }
163

164
  if (OB_SUCC(ret)) {
165
    new_paxos_replica_number = 0 == new_paxos_replica_number
166
                               ? orig_paxos_replica_number
167
                               : new_paxos_replica_number;
168
    ObReplicaMember data_source_member(data_source_server, 0/*timstamp*/);
169
    ObReplicaMember add_member(target_server, ObTimeUtility::current_time(), replica_type);
170
    // STEP 3: construct arg
171
    if (OB_ISNULL(ObCurTraceId::get_trace_id())) {
172
      ret = OB_INVALID_ARGUMENT;
173
      LOG_WARN("invalid argument", KR(ret));
174
    } else if (OB_FAIL(arg.init(
175
                      *ObCurTraceId::get_trace_id()/*task_id*/,
176
                      tenant_id,
177
                      ls_id,
178
                      add_member,
179
                      data_source_member,
180
                      orig_paxos_replica_number,
181
                      new_paxos_replica_number,
182
                      false/*is_skip_change_member_list-not used*/))) {
183
      LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(ls_id), K(add_member),
184
               K(data_source_member), K(orig_paxos_replica_number), K(new_paxos_replica_number));
185
    }
186
  }
187
  return ret;
188
}
189

190
int ObAdminDRTaskUtil::construct_default_params_for_add_command_(
191
    const uint64_t &tenant_id,
192
    const share::ObLSID &ls_id,
193
    int64_t &orig_paxos_replica_number,
194
    common::ObAddr &data_source_server)
195
{
196
  int ret = OB_SUCCESS;
197
  share::ObLSInfo ls_info;
198
  const share::ObLSReplica *leader_replica = nullptr;
199

200
  if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) {
201
    ret = OB_INVALID_ARGUMENT;
202
    LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id));
203
  } else if (OB_ISNULL(GCTX.lst_operator_)) {
204
    ret = OB_INVALID_ARGUMENT;
205
    LOG_WARN("invalid ls table operator", KR(ret));
206
  } else if (OB_FAIL(GCTX.lst_operator_->get(GCONF.cluster_id, tenant_id, ls_id,
207
                     share::ObLSTable::COMPOSITE_MODE, ls_info))) {
208
    LOG_WARN("fail to get ls info", KR(ret), K(tenant_id), K(ls_id), K(ls_info));
209
  } else if (OB_FAIL(ls_info.find_leader(leader_replica))) {
210
    LOG_WARN("fail to get ls leader replica", KR(ret), K(ls_info));
211
  } else if (OB_ISNULL(leader_replica)) {
212
    ret = OB_INVALID_ARGUMENT;
213
    LOG_WARN("invalid leader replica", KR(ret), K(ls_info));
214
  } else {
215
    //   If [orig_paxos_replica_number] or [data_source_server] not specified in obadmin command,
216
    //   need to construct from leader_replica, use leader replica as default
217
    if (0 == orig_paxos_replica_number) {
218
      orig_paxos_replica_number = leader_replica->get_paxos_replica_number();
219
    }
220
    if (!data_source_server.is_valid()) {
221
      data_source_server = leader_replica->get_server();
222
    }
223
  }
224
  return ret;
225
}
226

227
int ObAdminDRTaskUtil::execute_task_for_add_command_(
228
    const ObAdminCommandArg &command_arg,
229
    const ObLSAddReplicaArg &arg,
230
    ObAdminDRTaskRetComment &ret_comment)
231
{
232
  int ret = OB_SUCCESS;
233
  ret_comment = FAIL_TO_EXECUTE_COMMAND;
234
  const int64_t add_timeout = GCONF.rpc_timeout * 5;
235

236
  if (OB_UNLIKELY(!arg.is_valid())
237
      || OB_UNLIKELY(!command_arg.is_valid())
238
      || OB_UNLIKELY(!command_arg.is_add_task())) {
239
    ret = OB_INVALID_ARGUMENT;
240
    LOG_WARN("invalid argument", KR(ret), K(arg), K(command_arg));
241
  } else if (GCTX.self_addr() == arg.dst_.get_server()) {
242
    // do not need to send rpc, execute locally
243
    MTL_SWITCH(arg.tenant_id_) {
244
      if (OB_FAIL(observer::ObService::do_add_ls_replica(arg))) {
245
        LOG_WARN("fail to execute add replica rpc locally", KR(ret), K(arg));
246
      }
247
    }
248
  } else if (OB_ISNULL(GCTX.srv_rpc_proxy_)) {
249
    ret = OB_INVALID_ARGUMENT;
250
    LOG_WARN("svr rpc proxy is nullptr", KR(ret));
251
  } else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(arg.dst_.get_server()).by(arg.tenant_id_).timeout(add_timeout).ls_add_replica(arg))) {
252
    ret_comment = ObAdminDRTaskRetComment::FAILED_TO_SEND_RPC;
253
    LOG_WARN("fail to execute add replica rpc", KR(ret), K(arg), K(add_timeout));
254
  }
255

256
  if (OB_SUCC(ret)) {
257
    // local execute or rpc is send, log task start, task finish will be recorded later
258
    ROOTSERVICE_EVENT_ADD("disaster_recovery", drtasklog::START_ADD_LS_REPLICA_STR,
259
                          "tenant_id", arg.tenant_id_,
260
                          "ls_id", arg.ls_id_.id(),
261
                          "task_id", ObCurTraceId::get_trace_id_str(),
262
                          "destination", arg.dst_,
263
                          "comment", command_arg.get_comment());
264
  }
265
  return ret;
266
}
267

268
int ObAdminDRTaskUtil::handle_remove_command_(
269
    const ObAdminCommandArg &command_arg,
270
    uint64_t &tenant_id,
271
    share::ObLSID &ls_id,
272
    ObAdminDRTaskRetComment &ret_comment)
273
{
274
  int ret = OB_SUCCESS;
275
  tenant_id = OB_INVALID_TENANT_ID;
276
  ret_comment = FAIL_TO_EXECUTE_COMMAND;
277
  ObReplicaType replica_type = REPLICA_TYPE_FULL;
278
  common::ObAddr data_source_server;
279
  common::ObAddr target_server;
280
  int64_t orig_paxos_replica_number = 0;
281
  int64_t new_paxos_replica_number = 0;
282

283
  if (OB_UNLIKELY(!command_arg.is_valid())
284
      || OB_UNLIKELY(!command_arg.is_remove_task())) {
285
    ret = OB_INVALID_ARGUMENT;
286
    LOG_WARN("invalid argument", KR(ret), K(command_arg));
287
  // STEP 1: parse parameters from ob_admin command directly
288
  } else if (OB_FAIL(parse_params_from_obadmin_command_arg(
289
                         command_arg, tenant_id, ls_id, replica_type, data_source_server,
290
                         target_server, orig_paxos_replica_number, new_paxos_replica_number))) {
291
    LOG_WARN("fail to parse parameters provided in ob_admin command", KR(ret), K(command_arg));
292
  } else if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) {
293
    ret = OB_INVALID_ARGUMENT;
294
    ret_comment = ObAdminDRTaskRetComment::TENANT_ID_OR_LS_ID_NOT_VALID;
295
    LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(command_arg), K(tenant_id), K(ls_id));
296
  } else if (OB_UNLIKELY(!target_server.is_valid())
297
             || OB_UNLIKELY(REPLICA_TYPE_FULL != replica_type && REPLICA_TYPE_READONLY != replica_type)) {
298
    ret = OB_INVALID_ARGUMENT;
299
    LOG_WARN("invalid argument", KR(ret), K(replica_type), K(target_server));
300
  } else {
301
    // STEP 2: construct args and execute
302
    if (REPLICA_TYPE_FULL == replica_type) {
303
      ObLSDropPaxosReplicaArg remove_paxos_arg;
304
      if (OB_FAIL(construct_remove_paxos_task_arg_(
305
                      tenant_id, ls_id, target_server, orig_paxos_replica_number,
306
                      new_paxos_replica_number, ret_comment, remove_paxos_arg))) {
307
        LOG_WARN("fail to construct remove paxos task arg", KR(ret), K(tenant_id), K(ls_id),
308
                 K(target_server), K(orig_paxos_replica_number), K(new_paxos_replica_number),
309
                 K(ret_comment), K(remove_paxos_arg));
310
      } else if (OB_FAIL(execute_remove_paxos_task_(command_arg, remove_paxos_arg))) {
311
        LOG_WARN("fail to execute remove paxos replica task", KR(ret), K(command_arg), K(remove_paxos_arg));
312
      } else {
313
        ret_comment = SUCCEED_TO_SEND_COMMAND;
314
      }
315
    } else if (REPLICA_TYPE_READONLY == replica_type) {
316
      ObLSDropNonPaxosReplicaArg remove_nonpaxos_arg;
317
      if (OB_FAIL(construct_remove_nonpaxos_task_arg_(
318
                      tenant_id, ls_id, target_server, ret_comment, remove_nonpaxos_arg))) {
319
        LOG_WARN("fail to construct remove non-paxos replica task arg", KR(ret), K(tenant_id),
320
                 K(ls_id), K(target_server), K(ret_comment), K(remove_nonpaxos_arg));
321
      } else if (OB_FAIL(execute_remove_nonpaxos_task_(command_arg, remove_nonpaxos_arg))) {
322
        LOG_WARN("fail to execute remove nonpaxos replica task", KR(ret), K(command_arg), K(remove_nonpaxos_arg));
323
      } else {
324
        ret_comment = SUCCEED_TO_SEND_COMMAND;
325
      }
326
    } else {
327
      ret = OB_INVALID_ARGUMENT;
328
      LOG_WARN("unexpected replica type", KR(ret), K(replica_type), K(tenant_id), K(ls_id), K(command_arg));
329
    }
330
  }
331
  return ret;
332
}
333

334
int ObAdminDRTaskUtil::construct_remove_paxos_task_arg_(
335
    const uint64_t &tenant_id,
336
    const share::ObLSID &ls_id,
337
    const common::ObAddr &target_server,
338
    int64_t &orig_paxos_replica_number,
339
    int64_t &new_paxos_replica_number,
340
    ObAdminDRTaskRetComment &ret_comment,
341
    ObLSDropPaxosReplicaArg &remove_paxos_arg)
342
{
343
  int ret = OB_SUCCESS;
344
  ret_comment = FAIL_TO_EXECUTE_COMMAND;
345
  common::ObMember member;
346
  ObReplicaMember member_to_remove;
347
  palf::PalfStat palf_stat;
348

349
  if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))
350
      || OB_UNLIKELY(!target_server.is_valid())) {
351
    ret = OB_INVALID_ARGUMENT;
352
    LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(tenant_id), K(ls_id), K(target_server));
353
  } else if (OB_FAIL(get_local_palf_stat_(tenant_id, ls_id, palf_stat, ret_comment))) {
354
    LOG_WARN("fail to get local palf stat", KR(ret), K(tenant_id), K(ls_id));
355
  } else if (OB_UNLIKELY(!palf_stat.is_valid())) {
356
    ret = OB_INVALID_ARGUMENT;
357
    LOG_WARN("invalid argument", KR(ret), K(palf_stat));
358
  } else if (OB_UNLIKELY(!palf_stat.paxos_member_list_.contains(target_server))) {
359
    ret = OB_ENTRY_NOT_EXIST;
360
    LOG_WARN("replica not found in member_list", KR(ret), K(target_server), K(palf_stat));
361
  } else if (OB_FAIL(palf_stat.paxos_member_list_.get_member_by_addr(target_server, member))) {
362
    LOG_WARN("fail to get member from paxos_member_list", KR(ret), K(palf_stat), K(target_server));
363
  } else {
364
    member_to_remove = ObReplicaMember(member);
365
    if (OB_FAIL(member_to_remove.set_replica_type(REPLICA_TYPE_FULL))) {
366
      LOG_WARN("fail to set replica type for member to remove", KR(ret));
367
    } else {
368
      //  If [orig_paxos_replica_number] not specified in obadmin command,
369
      //  use leader replica's info as default
370
      orig_paxos_replica_number = 0 == orig_paxos_replica_number
371
                                ? palf_stat.paxos_replica_num_
372
                                : orig_paxos_replica_number;
373
      new_paxos_replica_number = 0 == new_paxos_replica_number
374
                               ? orig_paxos_replica_number
375
                               : new_paxos_replica_number;
376
    }
377
  }
378
  if (OB_FAIL(ret)) {
379
  } else if (OB_ISNULL(ObCurTraceId::get_trace_id())) {
380
    ret = OB_INVALID_ARGUMENT;
381
    LOG_WARN("invalid argument", KR(ret));
382
  } else if (OB_FAIL(remove_paxos_arg.init(
383
                         *ObCurTraceId::get_trace_id()/*task_id*/, tenant_id, ls_id,
384
                         member_to_remove, orig_paxos_replica_number, new_paxos_replica_number))) {
385
    LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(ls_id), K(member_to_remove),
386
             K(orig_paxos_replica_number), K(new_paxos_replica_number));
387
  }
388
  return ret;
389
}
390

391
int ObAdminDRTaskUtil::construct_remove_nonpaxos_task_arg_(
392
    const uint64_t &tenant_id,
393
    const share::ObLSID &ls_id,
394
    const common::ObAddr &target_server,
395
    ObAdminDRTaskRetComment &ret_comment,
396
    ObLSDropNonPaxosReplicaArg &remove_nonpaxos_arg)
397
{
398
  int ret = OB_SUCCESS;
399
  ret_comment = FAIL_TO_EXECUTE_COMMAND;
400
  common::ObMember member;
401
  ObReplicaMember member_to_remove;
402
  palf::PalfStat palf_stat;
403

404
  if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))
405
      || OB_UNLIKELY(!target_server.is_valid())) {
406
    ret = OB_INVALID_ARGUMENT;
407
    LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(tenant_id), K(ls_id), K(target_server));
408
  } else if (OB_FAIL(get_local_palf_stat_(tenant_id, ls_id, palf_stat, ret_comment))) {
409
    LOG_WARN("fail to get local palf stat", KR(ret), K(tenant_id), K(ls_id));
410
  } else if (OB_UNLIKELY(!palf_stat.is_valid())) {
411
    ret = OB_INVALID_ARGUMENT;
412
    LOG_WARN("invalid argument", KR(ret), K(palf_stat));
413
  } else if (OB_UNLIKELY(!palf_stat.learner_list_.contains(target_server))) {
414
    ret = OB_ENTRY_NOT_EXIST;
415
    LOG_WARN("replica not found in learner_list", KR(ret), K(target_server), K(palf_stat));
416
  } else if (OB_FAIL(palf_stat.learner_list_.get_learner_by_addr(target_server, member))) {
417
    LOG_WARN("fail to get member from learner_list", KR(ret), K(palf_stat), K(target_server));
418
  } else {
419
    member_to_remove = ObReplicaMember(member);
420
    if (OB_FAIL(member_to_remove.set_replica_type(REPLICA_TYPE_READONLY))) {
421
      LOG_WARN("fail to set replica type for member to remove", KR(ret));
422
    } else if (OB_ISNULL(ObCurTraceId::get_trace_id())) {
423
      ret = OB_INVALID_ARGUMENT;
424
      LOG_WARN("invalid argument", KR(ret));
425
    } else if (OB_FAIL(remove_nonpaxos_arg.init(
426
                           *ObCurTraceId::get_trace_id()/*task_id*/, tenant_id,
427
                           ls_id, member_to_remove))) {
428
      LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(ls_id), K(member_to_remove));
429
    }
430
  }
431
  return ret;
432
}
433

434
int ObAdminDRTaskUtil::get_local_palf_stat_(
435
    const uint64_t &tenant_id,
436
    const share::ObLSID &ls_id,
437
    palf::PalfStat &palf_stat,
438
    ObAdminDRTaskRetComment &ret_comment)
439
{
440
  int ret = OB_SUCCESS;
441
  ret_comment = FAIL_TO_EXECUTE_COMMAND;
442
  palf_stat.reset();
443

444
  if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) {
445
    ret = OB_INVALID_ARGUMENT;
446
    LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(tenant_id), K(ls_id));
447
  } else {
448
    MTL_SWITCH(tenant_id) {
449
      logservice::ObLogService *log_service = NULL;
450
      palf::PalfHandleGuard palf_handle_guard;
451
      if (OB_ISNULL(log_service = MTL(logservice::ObLogService*))) {
452
        ret = OB_ERR_UNEXPECTED;
453
        LOG_WARN("MTL ObLogService is null", KR(ret), K(tenant_id));
454
      } else if (OB_FAIL(log_service->open_palf(ls_id, palf_handle_guard))) {
455
        LOG_WARN("failed to open palf", KR(ret), K(tenant_id), K(ls_id));
456
      } else if (OB_FAIL(palf_handle_guard.stat(palf_stat))) {
457
        LOG_WARN("get palf_stat failed", KR(ret), K(tenant_id), K(ls_id));
458
      } else if (LEADER != palf_stat.role_) {
459
        ret = OB_STATE_NOT_MATCH;
460
        ret_comment = ObAdminDRTaskRetComment::SERVER_TO_EXECUTE_COMMAND_NOT_LEADER;
461
        LOG_WARN("invalid argument, expect self address is leader replica", KR(ret),
462
                 K(tenant_id), K(ls_id), K(palf_stat));
463
      }
464
    }
465
  }
466
  return ret;
467
}
468

469
int ObAdminDRTaskUtil::execute_remove_paxos_task_(
470
    const ObAdminCommandArg &command_arg,
471
    const ObLSDropPaxosReplicaArg &remove_paxos_arg)
472
{
473
  int ret = OB_SUCCESS;
474
  if (OB_UNLIKELY(!command_arg.is_valid())
475
      || OB_UNLIKELY(!command_arg.is_remove_task())
476
      || OB_UNLIKELY(!remove_paxos_arg.is_valid())) {
477
    ret = OB_INVALID_ARGUMENT;
478
    LOG_WARN("invalid argument", KR(ret), K(command_arg), K(remove_paxos_arg));
479
  } else {
480
    // do not need to send rpc, just execute locally
481
    LOG_INFO("start to remove member from member_list", K(remove_paxos_arg));
482
    MTL_SWITCH(remove_paxos_arg.tenant_id_) {
483
      if (OB_FAIL(observer::ObService::do_remove_ls_paxos_replica(remove_paxos_arg))) {
484
        LOG_WARN("fail to execute remove paxos replica rpc locally", KR(ret), K(remove_paxos_arg));
485
      }
486
    }
487
  }
488
  if (OB_SUCC(ret)) {
489
    // rpc is send, log task start, task finish will be recorded later
490
    ROOTSERVICE_EVENT_ADD("disaster_recovery", drtasklog::START_REMOVE_LS_PAXOS_REPLICA_STR,
491
                          "tenant_id", remove_paxos_arg.tenant_id_,
492
                          "ls_id", remove_paxos_arg.ls_id_.id(),
493
                          "task_id", ObCurTraceId::get_trace_id_str(),
494
                          "remove_server", remove_paxos_arg.remove_member_,
495
                          "comment", command_arg.get_comment());
496
  }
497
  return ret;
498
}
499

500
int ObAdminDRTaskUtil::execute_remove_nonpaxos_task_(
501
    const ObAdminCommandArg &command_arg,
502
    const ObLSDropNonPaxosReplicaArg &remove_non_paxos_arg)
503
{
504
  int ret = OB_SUCCESS;
505
  if (OB_UNLIKELY(!command_arg.is_valid())
506
      || OB_UNLIKELY(!command_arg.is_remove_task())
507
      || OB_UNLIKELY(!remove_non_paxos_arg.is_valid())) {
508
    ret = OB_INVALID_ARGUMENT;
509
    LOG_WARN("invalid argument", KR(ret), K(command_arg), K(remove_non_paxos_arg));
510
  } else {
511
    // do not need to send rpc, just execute locally
512
    LOG_INFO("start to remove learner from learner_list", K(remove_non_paxos_arg));
513
    MTL_SWITCH(remove_non_paxos_arg.tenant_id_) {
514
      if (OB_FAIL(observer::ObService::do_remove_ls_nonpaxos_replica(remove_non_paxos_arg))) {
515
        LOG_WARN("fail to execute remove non-paxos replica rpc locally", KR(ret), K(remove_non_paxos_arg));
516
      }
517
    }
518
  }
519
  if (OB_SUCC(ret)) {
520
    // rpc is send, log task start, task finish will be recorded later
521
    ROOTSERVICE_EVENT_ADD("disaster_recovery", drtasklog::START_REMOVE_LS_PAXOS_REPLICA_STR,
522
                          "tenant_id", remove_non_paxos_arg.tenant_id_,
523
                          "ls_id", remove_non_paxos_arg.ls_id_.id(),
524
                          "task_id", ObCurTraceId::get_trace_id_str(),
525
                          "remove_server", remove_non_paxos_arg.remove_member_,
526
                          "comment", command_arg.get_comment());
527
  }
528
  return ret;
529
}
530

531
int ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(
532
    const ObAdminCommandArg &command_arg,
533
    uint64_t &tenant_id,
534
    share::ObLSID &ls_id,
535
    ObReplicaType &replica_type,
536
    common::ObAddr &data_source_server,
537
    common::ObAddr &target_server,
538
    int64_t &orig_paxos_replica_number,
539
    int64_t &new_paxos_replica_number)
540
{
541
  int ret = OB_SUCCESS;
542
  // reset output params
543
  tenant_id = OB_INVALID_TENANT_ID;
544
  ls_id.reset();
545
  replica_type = REPLICA_TYPE_FULL;
546
  data_source_server.reset();
547
  target_server.reset();
548
  orig_paxos_replica_number = 0;
549
  new_paxos_replica_number = 0;
550
  // construct items to use
551
  ObArenaAllocator allocator("ObAdminDRTask");
552
  ObString admin_command_before_trim;
553
  ObString admin_command_after_trim;
554
  ObArray<ObString> command_params_array;
555
  if (OB_UNLIKELY(!command_arg.is_valid())) {
556
    ret = OB_INVALID_ARGUMENT;
557
    LOG_WARN("invalid argument", KR(ret), K(command_arg));
558
  } else if (OB_FAIL(ob_write_string(allocator, command_arg.get_admin_command_str(), admin_command_before_trim))) {
559
    LOG_WARN("fail to write string", KR(ret), K(command_arg));
560
  } else if (FALSE_IT(admin_command_after_trim = admin_command_before_trim.trim())) {
561
  } else if (OB_FAIL(split_on(admin_command_after_trim, ',', command_params_array))) {
562
    LOG_WARN("fail to split string", KR(ret), K(admin_command_after_trim), K(admin_command_before_trim));
563
  } else {
564
    LOG_INFO("start to parse parameters from command", K(command_arg), K(command_params_array));
565
    ObSqlString data_source_string("DtStr");
566
    for (int64_t param_index = 0;
567
         param_index < command_params_array.count() && OB_SUCC(ret);
568
         param_index++) {
569
      ObString param_name_with_value_str = command_params_array.at(param_index);
570
      ObArray<ObString> param_name_with_value;
571
      ObSqlString param_name("ParamN");
572
      ObSqlString param_value("ParamV");
573
      int64_t pos = 0;
574
      if (OB_FAIL(split_on(param_name_with_value_str, '=', param_name_with_value))) {
575
        LOG_WARN("fail to split param name and value", KR(ret), K(param_name_with_value_str));
576
      } else if (OB_UNLIKELY(2 != param_name_with_value.count())) {
577
        ret = OB_INVALID_ARGUMENT;
578
        LOG_WARN("invalid argument", KR(ret), K(param_name_with_value));
579
      } else if (OB_FAIL(param_name.assign(param_name_with_value.at(0).trim()))) {
580
        LOG_WARN("fail to construct parameter name", KR(ret), K(param_name_with_value));
581
      } else if (OB_FAIL(param_value.assign(param_name_with_value.at(1).trim()))) {
582
        LOG_WARN("fail to construct parameter value", KR(ret), K(param_name_with_value));
583
      } else if (0 == param_name.string().case_compare("tenant_id")) {
584
        int64_t tenant_id_to_set = OB_INVALID_TENANT_ID;
585
        if (OB_FAIL(extract_int(param_value.string(), 0, pos, tenant_id_to_set))) {
586
          LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(tenant_id_to_set));
587
        } else {
588
          tenant_id = tenant_id_to_set;
589
        }
590
      } else if (0 == param_name.string().case_compare("ls_id")) {
591
        int64_t ls_id_to_set;
592
        if (OB_FAIL(extract_int(param_value.string(), 0, pos, ls_id_to_set))) {
593
          LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(ls_id_to_set));
594
        } else {
595
          ls_id = share::ObLSID(ls_id_to_set);
596
        }
597
      } else if (0 == param_name.string().case_compare("replica_type")) {
598
        if (OB_FAIL(share::ObLocalityParser::parse_type(
599
                        param_value.ptr(),
600
                        param_value.length(),
601
                        replica_type))) {
602
          LOG_WARN("fail to parse replica type", KR(ret), K(param_name_with_value), K(replica_type));
603
        }
604
      } else if (0 == param_name.string().case_compare("orig_paxos_replica_number")) {
605
        if (OB_FAIL(extract_int(param_value.string(), 0, pos, orig_paxos_replica_number))) {
606
          LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(orig_paxos_replica_number));
607
        }
608
      } else if (0 == param_name.string().case_compare("new_paxos_replica_number")) {
609
        if (OB_FAIL(extract_int(param_value.string(), 0, pos, new_paxos_replica_number))) {
610
          LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(new_paxos_replica_number));
611
        }
612
      } else if (0 == param_name.string().case_compare("server")) {
613
        if (OB_FAIL(target_server.parse_from_string(param_value.string()))) {
614
          LOG_WARN("fail to construct target server from string", KR(ret), K(param_value));
615
        }
616
      } else if (0 == param_name.string().case_compare("data_source")) {
617
        if (OB_FAIL(data_source_server.parse_from_string(param_value.string()))) {
618
          LOG_WARN("fail to construct data source server from string", KR(ret), K(param_value));
619
        }
620
      } else {
621
        ret = OB_INVALID_ARGUMENT;
622
        LOG_WARN("invalid argument", KR(ret), K(param_name_with_value_str), K(param_name_with_value));
623
      }
624
    }
625

626
    if (OB_SUCC(ret)) {
627
      // if [server] not specified, use local as default
628
      target_server = target_server.is_valid() ? target_server : GCTX.self_addr();
629
    }
630

631
    LOG_INFO("finish parse parameters from command", KR(ret), K(command_arg), K(command_params_array), K(tenant_id),
632
             K(ls_id), K(replica_type), K(data_source_server), K(target_server), K(orig_paxos_replica_number),
633
             K(new_paxos_replica_number));
634
  }
635
  return ret;
636
}
637

638
int ObAdminDRTaskUtil::try_construct_result_comment_(
639
    const int &ret_code,
640
    const ObAdminDRTaskRetComment &ret_comment,
641
    ObSqlString &result_comment)
642
{
643
  int ret = OB_SUCCESS;
644
  result_comment.reset();
645
  if (OB_FAIL(result_comment.assign_fmt("ret:%d, %s; ret_comment:%s;",
646
                                         ret_code, common::ob_error_name(ret_code),
647
                                         ob_admin_drtask_ret_comment_strs(ret_comment)))) {
648
    LOG_WARN("fail to construct result comment", KR(ret), K(ret_code), K(ret_comment));
649
  }
650
  return ret;
651
}
652
} // end namespace rootserver
653
} // end namespace oceanbase
654

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.