oceanbase

Форк
0
/
ob_server_zone_op_service.cpp 
847 строк · 34.3 Кб
1
/**
2
 * Copyright (c) 2022 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "ob_server_zone_op_service.h"
16

17
#include "share/ob_zone_table_operation.h"
18
#include "share/ob_service_epoch_proxy.h"
19
#include "share/ob_max_id_fetcher.h"
20
#include "lib/mysqlclient/ob_mysql_transaction.h"  // ObMySQLTransaction
21
#include "lib/utility/ob_tracepoint.h" // ERRSIM
22
#include "rootserver/ob_root_service.h" // callback
23
#include "share/ob_all_server_tracer.h"
24
#include "rootserver/ob_server_manager.h"
25

26
namespace oceanbase
27
{
28
using namespace common;
29
using namespace share;
30
using namespace obrpc;
31
namespace rootserver
32
{
33
ObServerZoneOpService::ObServerZoneOpService()
34
    : is_inited_(false),
35
      server_change_callback_(NULL),
36
      rpc_proxy_(NULL),
37
      sql_proxy_(NULL),
38
      lst_operator_(NULL),
39
      unit_manager_(NULL)
40
#ifdef OB_BUILD_TDE_SECURITY
41
      , master_key_mgr_()
42
#endif
43
{
44
}
45
ObServerZoneOpService::~ObServerZoneOpService()
46
{
47
}
48
int ObServerZoneOpService::init(
49
    ObIServerChangeCallback &server_change_callback,
50
    ObSrvRpcProxy &rpc_proxy,
51
    ObLSTableOperator &lst_operator,
52
    ObUnitManager &unit_manager,
53
    ObMySQLProxy &sql_proxy
54
#ifdef OB_BUILD_TDE_SECURITY
55
    , ObRsMasterKeyManager *master_key_mgr
56
#endif
57
)
58
{
59
  int ret = OB_SUCCESS;
60
  if (OB_UNLIKELY(is_inited_)) {
61
    ret = OB_INIT_TWICE;
62
    LOG_WARN("server zone operation service has been inited already", KR(ret), K(is_inited_));
63
#ifdef OB_BUILD_TDE_SECURITY
64
  } else if (OB_ISNULL(master_key_mgr)) {
65
    ret = OB_ERR_UNEXPECTED;
66
    LOG_WARN("master key mgr is null", KR(ret), KP(master_key_mgr));
67
#endif
68
  } else if (OB_FAIL(st_operator_.init(&sql_proxy))) {
69
    LOG_WARN("fail to init server table operator", KR(ret));
70
  } else {
71
    server_change_callback_ = &server_change_callback;
72
    rpc_proxy_ = &rpc_proxy;
73
    sql_proxy_ = &sql_proxy;
74
    lst_operator_ = &lst_operator;
75
    unit_manager_ = &unit_manager;
76
#ifdef OB_BUILD_TDE_SECURITY
77
    master_key_mgr_ = master_key_mgr;
78
#endif
79
    is_inited_ = true;
80
  }
81
  return ret;
82
}
83
int ObServerZoneOpService::add_servers(const ObIArray<ObAddr> &servers, const ObZone &zone, bool is_bootstrap)
84
{
85
  int ret = OB_SUCCESS;
86
  uint64_t sys_tenant_data_version = 0;
87
  ObCheckServerForAddingServerArg rpc_arg;
88
  ObCheckServerForAddingServerResult rpc_result;
89
  ObZone picked_zone;
90
  ObTimeoutCtx ctx;
91
#ifdef OB_BUILD_TDE_SECURITY
92
  ObWaitMasterKeyInSyncArg wms_in_sync_arg;
93
  // master key mgr sync
94
#endif
95
  if (OB_UNLIKELY(!is_inited_)) {
96
    ret = OB_NOT_INIT;
97
    LOG_WARN("not init", KR(ret), K(is_inited_));
98
  } else if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, sys_tenant_data_version))) {
99
    LOG_WARN("fail to get sys tenant's min data version", KR(ret));
100
  } else if (OB_ISNULL(rpc_proxy_)) {
101
    ret = OB_ERR_UNEXPECTED;
102
    LOG_WARN("rpc_proxy_ is null", KR(ret), KP(rpc_proxy_));
103
#ifdef OB_BUILD_TDE_SECURITY
104
  } else if (OB_ISNULL(master_key_mgr_)) {
105
    ret = OB_ERR_UNEXPECTED;
106
    LOG_WARN("master_key_mgr_ is null", KR(ret), KP(master_key_mgr_));
107
  } else if (OB_FAIL(construct_rs_list_arg(wms_in_sync_arg.rs_list_arg_))) {
108
    LOG_WARN("fail to construct rs list arg", KR(ret));
109
#endif
110
  } else if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
111
    LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx));
112
  } else {
113
#ifdef OB_BUILD_TDE_SECURITY
114
    SpinRLockGuard sync_guard(master_key_mgr_->sync());
115
#endif
116
    for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); ++i) {
117
      const ObAddr &addr = servers.at(i);
118
      int64_t timeout = ctx.get_timeout();
119
      uint64_t server_id = OB_INVALID_ID;
120
      const int64_t ERR_MSG_BUF_LEN = OB_MAX_SERVER_ADDR_SIZE + 100;
121
      char non_empty_server_err_msg[ERR_MSG_BUF_LEN] = "";
122
      int64_t pos = 0;
123
      rpc_arg.reset();
124
      if (OB_UNLIKELY(timeout <= 0)) {
125
        ret = OB_TIMEOUT;
126
        LOG_WARN("ctx time out", KR(ret), K(timeout));
127
      } else if (OB_FAIL(databuff_printf(
128
          non_empty_server_err_msg,
129
          ERR_MSG_BUF_LEN,
130
          pos,
131
          "add non-empty server %s",
132
          to_cstring(addr)))) {
133
        LOG_WARN("fail to execute databuff_printf", KR(ret), K(addr));
134
      } else if (OB_FAIL(fetch_new_server_id_(server_id))) {
135
        // fetch a new server id and insert the server into __all_server table
136
        LOG_WARN("fail to fetch new server id", KR(ret));
137
      } else if (OB_UNLIKELY(!is_valid_server_id(server_id))) {
138
        ret = OB_INVALID_ARGUMENT;
139
        LOG_WARN("server id is invalid", KR(ret), K(server_id));
140
      } else if (OB_FAIL(rpc_arg.init(
141
          ObCheckServerForAddingServerArg::ADD_SERVER,
142
          sys_tenant_data_version,
143
          server_id))) {
144
        LOG_WARN("fail to init rpc arg", KR(ret), K(sys_tenant_data_version), K(server_id));
145
      } else if (OB_FAIL(rpc_proxy_->to(addr)
146
          .timeout(timeout)
147
          .check_server_for_adding_server(rpc_arg, rpc_result))) {
148
        LOG_WARN("fail to check whether the server is empty", KR(ret), K(addr));
149
      } else if (!rpc_result.get_is_server_empty()) {
150
        ret = OB_OP_NOT_ALLOW;
151
        LOG_WARN("adding non-empty server is not allowed", KR(ret));
152
        LOG_USER_ERROR(OB_OP_NOT_ALLOW, non_empty_server_err_msg);
153
      } else if (OB_FAIL(zone_checking_for_adding_server_(zone, rpc_result.get_zone(), picked_zone))) {
154
        LOG_WARN("zone checking for adding server is failed", KR(ret), K(zone), K(rpc_result.get_zone()));
155
#ifdef OB_BUILD_TDE_SECURITY
156
      } else if (!is_bootstrap && OB_FAIL(master_key_checking_for_adding_server(addr, picked_zone, wms_in_sync_arg))) {
157
        LOG_WARN("master key checking for adding server is failed", KR(ret), K(addr), K(picked_zone));
158
#endif
159
      } else if (OB_FAIL(add_server_(
160
          addr,
161
          server_id,
162
          picked_zone,
163
          rpc_result.get_sql_port(),
164
          rpc_result.get_build_version()))) {
165
        LOG_WARN("add_server failed", KR(ret), K(addr),  K(server_id), K(picked_zone), "sql_port",
166
            rpc_result.get_sql_port(), "build_version", rpc_result.get_build_version());
167
      } else {}
168
    }
169
  }
170
  return ret;
171
}
172
int ObServerZoneOpService::delete_servers(
173
    const ObIArray<ObAddr> &servers,
174
    const ObZone &zone)
175
{
176
  int ret = OB_SUCCESS;
177
  if (OB_UNLIKELY(!is_inited_)) {
178
    ret = OB_NOT_INIT;
179
    LOG_WARN("not init", KR(ret), K(is_inited_));
180
  } else if (OB_ISNULL(GCTX.root_service_)) {
181
    ret = OB_ERR_UNEXPECTED;
182
    LOG_WARN("root_service_ is null", KR(ret), KP(GCTX.root_service_));
183
  } else if (OB_UNLIKELY(servers.count() <= 0)) {
184
    ret = OB_INVALID_ARGUMENT;
185
    LOG_WARN("invalid argument", KR(ret), K(servers));
186
  } else if (OB_FAIL(check_server_have_enough_resource_for_delete_server_(servers, zone))) {
187
    LOG_WARN("not enough resource, cannot delete servers", KR(ret), K(servers), K(zone));
188
  } else if (OB_FAIL(GCTX.root_service_->check_all_ls_has_leader("delete server"))) {
189
    LOG_WARN("fail to check whether all ls has leader", KR(ret));
190
  } else {
191
    for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); ++i) {
192
      if (OB_FAIL(delete_server_(servers.at(i), zone))) {
193
        LOG_WARN("delete_server failed", "server", servers.at(i), "zone", zone, KR(ret));
194
      }
195
    }
196
  }
197
  return ret;
198
}
199
int ObServerZoneOpService::cancel_delete_servers(
200
    const ObIArray<ObAddr> &servers,
201
    const ObZone &zone)
202
{
203
  int ret = OB_SUCCESS;
204
  if (OB_UNLIKELY(!is_inited_)) {
205
    ret = OB_NOT_INIT;
206
    LOG_WARN("not init", KR(ret), K(is_inited_));
207
  } else if (OB_ISNULL(unit_manager_) || OB_ISNULL(sql_proxy_)) {
208
    ret = OB_ERR_UNEXPECTED;
209
    LOG_WARN("unit_manager_ or sql_proxy_ or server_change_callback_ is null", KR(ret),
210
        KP(unit_manager_), KP(sql_proxy_));
211
  } else {
212
    ObServerInfoInTable server_info_in_table;
213
    for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); ++i) {
214
      const ObAddr &server = servers.at(i);
215
      const int64_t now = ObTimeUtility::current_time();
216
      ObMySQLTransaction trans;
217
      server_info_in_table.reset();
218
      if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
219
        LOG_WARN("fail to start trans", KR(ret));
220
      } else if (OB_FAIL(check_and_end_delete_server_(trans, server, zone, true /* is_cancel */, server_info_in_table))) {
221
        LOG_WARN("fail to check and end delete server", KR(ret), K(server), K(zone));
222
      } else if (OB_FAIL(ObServerTableOperator::update_status(
223
          trans,
224
          server,
225
          ObServerStatus::OB_SERVER_DELETING,
226
          server_info_in_table.is_alive() ? ObServerStatus::OB_SERVER_ACTIVE : ObServerStatus::OB_SERVER_INACTIVE))) {
227
        LOG_WARN("fail to update status in __all_server table", KR(ret),
228
            K(server), K(server_info_in_table));
229
      } else if (OB_FAIL(unit_manager_->cancel_migrate_out_units(server))) {
230
        LOG_WARN("unit_manager_ cancel_migrate_out_units failed", KR(ret), K(server));
231
      }
232
      (void) end_trans_and_on_server_change_(ret, trans, "cancel_delete_server", server, server_info_in_table.get_zone(), now);
233
    }
234
  }
235
  return ret;
236
}
237
int ObServerZoneOpService::finish_delete_server(
238
    const ObAddr &server,
239
    const ObZone &zone)
240
{
241
  int ret = OB_SUCCESS;
242
  ObServerInfoInTable server_info_in_table;
243
  const int64_t now = ObTimeUtility::current_time();
244
  ObMySQLTransaction trans;
245
  if (OB_UNLIKELY(!is_inited_)) {
246
    ret = OB_NOT_INIT;
247
    LOG_WARN("not init", KR(ret), K(is_inited_));
248
  } else if (OB_ISNULL(sql_proxy_)) {
249
    ret = OB_ERR_UNEXPECTED;
250
    LOG_WARN("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
251
  } else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
252
    LOG_WARN("fail to start trans", KR(ret));
253
  } else if (OB_FAIL(check_and_end_delete_server_(trans, server, zone, false /* is_cancel */, server_info_in_table))) {
254
    LOG_WARN("fail to check and end delete server", KR(ret), K(server), K(zone));
255
  } else if (OB_FAIL(ObServerManager::try_delete_server_working_dir(
256
      server_info_in_table.get_zone(),
257
      server,
258
      server_info_in_table.get_server_id()))) {
259
    LOG_WARN("fail to delete server working dir", KR(ret), K(server_info_in_table));
260
  } else if (OB_FAIL(st_operator_.remove(server, trans))) {
261
    LOG_WARN("fail to remove this server from __all_server table", KR(ret), K(server));
262
  }
263
  (void) end_trans_and_on_server_change_(ret, trans, "finish_delete_server", server, server_info_in_table.get_zone(), now);
264
  return ret;
265
}
266
int ObServerZoneOpService::stop_servers(
267
    const ObIArray<ObAddr> &servers,
268
    const ObZone &zone,
269
    const obrpc::ObAdminServerArg::AdminServerOp &op)
270
{
271
  int ret = OB_SUCCESS;
272
  if (OB_UNLIKELY(!is_inited_)) {
273
    ret = OB_NOT_INIT;
274
    LOG_WARN("not init", KR(ret), K(is_inited_));
275
  } else if (OB_FAIL(stop_server_precheck(servers, op))) {
276
    LOG_WARN("fail to precheck stop server", KR(ret), K(servers), K(zone));
277
  } else {
278
    for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); i++) {
279
      const ObAddr &server = servers.at(i);
280
      if (OB_FAIL(start_or_stop_server_(server, zone, op))) {
281
        LOG_WARN("fail to stop server", KR(ret), K(server), K(zone));
282
      }
283
    }
284
  }
285
  return ret;
286
}
287
int ObServerZoneOpService::start_servers(
288
    const ObIArray<ObAddr> &servers,
289
    const ObZone &zone)
290
{
291
  int ret = OB_SUCCESS;
292
  if (OB_UNLIKELY(!is_inited_)) {
293
    ret = OB_NOT_INIT;
294
    LOG_WARN("not init", KR(ret), K(is_inited_));
295
  } else if (OB_UNLIKELY(servers.count() <= 0)) {
296
    ret = OB_INVALID_ARGUMENT;
297
    LOG_WARN("servers' count is zero", KR(ret), K(servers));
298
  } else {
299
    for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); ++i) {
300
      const ObAddr &server = servers.at(i);
301
      if (OB_FAIL(start_or_stop_server_(server, zone, ObAdminServerArg::START))) {
302
        LOG_WARN("fail to start server", KR(ret), K(server), K(zone));
303
      }
304
    }
305
  }
306
  return ret;
307
}
308
#ifdef OB_BUILD_TDE_SECURITY
309
int ObServerZoneOpService::master_key_checking_for_adding_server(
310
    const common::ObAddr &server,
311
    const ObZone &zone,
312
    obrpc::ObWaitMasterKeyInSyncArg &wms_in_sync_arg)
313
{
314
  int ret = OB_SUCCESS;
315
  if (OB_UNLIKELY(!is_inited_)) {
316
    ret = OB_NOT_INIT;
317
    LOG_WARN("not init", KR(ret), K(is_inited_));
318
  } else if (OB_ISNULL(master_key_mgr_)) {
319
    ret = OB_ERR_UNEXPECTED;
320
    LOG_WARN("master_key_mgr_ is null", KR(ret), KP(master_key_mgr_));
321
  } else {
322
    bool master_key_empty = true;
323
    share::ObLeaseResponse tmp_lease_response;
324
    bool encryption = false;
325
    ObTimeoutCtx ctx;
326
    if (OB_FAIL(master_key_mgr_->check_master_key_empty(master_key_empty))) {
327
      LOG_WARN("fail to check whether master key is empty", KR(ret));
328
    } else if (master_key_empty) {
329
      LOG_INFO("empty master key, no need to sync master key info");
330
    } else if (!master_key_empty && zone.is_empty()) {
331
      ret = OB_NOT_SUPPORTED;
332
      LOG_USER_ERROR(OB_NOT_SUPPORTED, "not support to add a server "
333
      "without a specified zone when the master key is valid");
334
    } else if (OB_FAIL(ObZoneTableOperation::check_encryption_zone(*sql_proxy_, zone, encryption))) {
335
      LOG_WARN("fail to check zone encryption", KR(ret), "zone", zone);
336
    } else if (encryption) {
337
      LOG_INFO("server in encrypted zone, no need to sync master key info", "zone", zone);
338
    } else if (OB_FAIL(master_key_mgr_->get_all_tenant_master_key(
339
            zone, wms_in_sync_arg.tenant_max_key_version_))) {
340
      LOG_WARN("fail to get all tenant master key", KR(ret));
341
    } else if (OB_FAIL(OTC_MGR.get_lease_response(tmp_lease_response))) {
342
      LOG_WARN("fail to get lease response", KR(ret));
343
    } else if (OB_FAIL(wms_in_sync_arg.tenant_config_version_.assign(
344
            tmp_lease_response.tenant_config_version_))) {
345
      LOG_WARN("fail to assign tenant config version", KR(ret));
346
    } else if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
347
      LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx));
348
    } else {
349
      int64_t timeout = ctx.get_timeout();
350
      if (OB_UNLIKELY(timeout <= 0)) {
351
        ret = OB_TIMEOUT;
352
        LOG_WARN("ctx time out", KR(ret), K(timeout));
353
      } else if (OB_FAIL(rpc_proxy_->to(server)
354
          .timeout(timeout)
355
          .wait_master_key_in_sync(wms_in_sync_arg))) {
356
        LOG_WARN("fail to wait master key in sync", KR(ret), K(server));
357
      } else {}
358
    }
359
  }
360
  return ret;
361
}
362
#endif
363
int ObServerZoneOpService::stop_server_precheck(
364
    const ObIArray<ObAddr> &servers,
365
    const obrpc::ObAdminServerArg::AdminServerOp &op)
366
{
367
  int ret = OB_SUCCESS;
368
  ObZone zone;
369
  bool is_same_zone = false;
370
  bool is_all_stopped = false;
371
  ObArray<ObServerInfoInTable> all_servers_info_in_table;
372
  ObServerInfoInTable server_info;
373
  if (OB_UNLIKELY(!is_inited_)) {
374
    ret = OB_NOT_INIT;
375
    LOG_WARN("not init", KR(ret), K(is_inited_));
376
  } else if (OB_UNLIKELY(servers.count() <= 0)) {
377
    ret = OB_INVALID_ARGUMENT;
378
    LOG_WARN("servers' count is zero", KR(ret), K(servers));
379
  } else if (OB_ISNULL(GCTX.root_service_) || OB_ISNULL(sql_proxy_)) {
380
    ret = OB_ERR_UNEXPECTED;
381
    LOG_WARN("GCTX.root_service_ or sql_proxy_ is null", KR(ret), KP(GCTX.root_service_), KP(sql_proxy_));
382
  } else if (OB_FAIL(ObServerTableOperator::get(*sql_proxy_, all_servers_info_in_table))) {
383
    LOG_WARN("fail to read __all_server table", KR(ret), KP(sql_proxy_));
384
  } else if (OB_FAIL(check_zone_and_server_(
385
      all_servers_info_in_table,
386
      servers,
387
      is_same_zone,
388
      is_all_stopped))) {
389
    LOG_WARN("fail to check zone and server", KR(ret), K(all_servers_info_in_table), K(servers));
390
  } else if (is_all_stopped) {
391
    //nothing todo
392
  } else if (!is_same_zone) {
393
    ret = OB_STOP_SERVER_IN_MULTIPLE_ZONES;
394
    LOG_WARN("can not stop servers in multiple zones", KR(ret), K(server_info), K(servers));
395
  } else if (OB_FAIL((ObRootUtils::find_server_info(all_servers_info_in_table, servers.at(0), server_info)))) {
396
    LOG_WARN("fail to find server info", KR(ret), K(all_servers_info_in_table), K(servers.at(0)));
397
  } else {
398
    const ObZone &zone = server_info.get_zone();
399
    if (ObAdminServerArg::ISOLATE == op) {
400
      //"Isolate server" does not need to check the total number and status of replicas; it cannot be restarted later;
401
      if (OB_FAIL(GCTX.root_service_->check_can_stop(zone, servers, false /*is_stop_zone*/))) {
402
        LOG_WARN("fail to check can stop", KR(ret), K(zone), K(servers), K(op));
403
        if (OB_OP_NOT_ALLOW == ret) {
404
          LOG_USER_ERROR(OB_OP_NOT_ALLOW, "Stop all servers in primary region is");
405
        }
406
      }
407
    } else {
408
      if (ObRootUtils::have_other_stop_task(zone)) {
409
        ret = OB_STOP_SERVER_IN_MULTIPLE_ZONES;
410
        LOG_WARN("can not stop servers in multiple zones", KR(ret), K(zone), K(servers), K(op));
411
        LOG_USER_ERROR(OB_STOP_SERVER_IN_MULTIPLE_ZONES,
412
            "cannot stop server or stop zone in multiple zones");
413
      } else if (OB_FAIL(GCTX.root_service_->check_majority_and_log_in_sync(
414
          servers,
415
          ObAdminServerArg::FORCE_STOP == op,/*skip_log_sync_check*/
416
          "stop server"))) {
417
        LOG_WARN("fail to check majority and log in-sync", KR(ret), K(zone), K(servers), K(op));
418
      }
419
    }
420
  }
421
  return ret;
422
}
423
int ObServerZoneOpService::zone_checking_for_adding_server_(
424
    const ObZone &command_zone,
425
    const ObZone &rpc_zone,
426
    ObZone &picked_zone)
427
{
428
  int ret = OB_SUCCESS;
429
  // command_zone: the zone specified in the system command ADD SERVER
430
  // rpc_zone: the zone specified in the server's local config and send to rs via rpc
431
  // picked_zone: the zone we will use in add_server
432
  if (OB_UNLIKELY(!is_inited_)) {
433
    ret = OB_NOT_INIT;
434
    LOG_WARN("not init", KR(ret), K(is_inited_));
435
  } else if (OB_UNLIKELY(rpc_zone.is_empty())) {
436
    ret = OB_INVALID_ARGUMENT;
437
    LOG_WARN("rpc_zone cannot be empty. It implies that server's local config zone is empty.",
438
    KR(ret), K(rpc_zone));
439
  } else if (!command_zone.is_empty() && command_zone != rpc_zone) {
440
    ret = OB_SERVER_ZONE_NOT_MATCH;
441
    LOG_WARN("the zone specified in the server's local config is not the same as"
442
        " the zone specified in the command", KR(ret), K(command_zone), K(rpc_zone));
443
  } else if (OB_FAIL(picked_zone.assign(rpc_zone))) {
444
    LOG_WARN("fail to assign picked_zone", KR(ret), K(rpc_zone));
445
  } else {}
446
  return ret;
447
}
448
int ObServerZoneOpService::add_server_(
449
    const ObAddr &server,
450
    const uint64_t server_id,
451
    const ObZone &zone,
452
    const int64_t sql_port,
453
    const ObServerInfoInTable::ObBuildVersion &build_version)
454
{
455
  int ret = OB_SUCCESS;
456
  bool is_active = false;
457
  const int64_t now = ObTimeUtility::current_time();
458
  ObServerInfoInTable server_info_in_table;
459
  ObMySQLTransaction trans;
460
  if (OB_UNLIKELY(!is_inited_)) {
461
    ret = OB_NOT_INIT;
462
    LOG_WARN("not init", KR(ret), K(is_inited_));
463
  } else if (OB_UNLIKELY(!server.is_valid()
464
      || !is_valid_server_id(server_id)
465
      || zone.is_empty()
466
      || sql_port <= 0
467
      || build_version.is_empty())) {
468
    ret = OB_INVALID_ARGUMENT;
469
    LOG_WARN("invalid argument", KR(ret), K(server), K(server_id), K(zone), K(sql_port), K(build_version));
470
  } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(server_change_callback_)) {
471
    ret = OB_ERR_UNEXPECTED;
472
    LOG_WARN("sql_proxy_ or server_change_callback_ is null", KR(ret),
473
        KP(sql_proxy_), KP(server_change_callback_));
474
  } else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
475
    LOG_WARN("fail to start trans", KR(ret));
476
  } else if (OB_FAIL(check_and_update_service_epoch_(trans))) {
477
    LOG_WARN("fail to check and update service epoch", KR(ret));
478
  } else if (OB_FAIL(ObZoneTableOperation::check_zone_active(trans, zone, is_active))){
479
    // we do not need to lock the zone info in __all_zone table
480
    // all server/zone operations are mutually exclusive since we locked the service epoch
481
    LOG_WARN("fail to check whether the zone is active", KR(ret), K(zone));
482
  } else if (OB_UNLIKELY(!is_active)) {
483
    ret = OB_ZONE_NOT_ACTIVE;
484
    LOG_WARN("the zone is not active", KR(ret), K(zone), K(is_active));
485
  } else if (OB_FAIL(ObServerTableOperator::get(trans, server, server_info_in_table))) {
486
    if (OB_SERVER_NOT_IN_WHITE_LIST == ret) {
487
      ret = OB_SUCCESS;
488
    } else {
489
      LOG_WARN("fail to get server_info in table", KR(ret), K(server));
490
    }
491
  } else {
492
    ret = OB_ENTRY_EXIST;
493
    LOG_WARN("server exists", KR(ret), K(server_info_in_table));
494
  }
495
  if (FAILEDx(server_info_in_table.init(
496
      server,
497
      server_id,
498
      zone,
499
      sql_port,
500
      false, /* with_rootserver */
501
      ObServerStatus::OB_SERVER_ACTIVE,
502
      build_version,
503
      0, /* stop_time */
504
      0, /* start_service_time */
505
      0 /* last_offline_time */))) {
506
    LOG_WARN("fail to init server info in table", KR(ret), K(server), K(server_id), K(zone),
507
        K(sql_port), K(build_version), K(now));
508
  } else if (OB_FAIL(ObServerTableOperator::insert(trans, server_info_in_table))) {
509
    LOG_WARN("fail to insert server info into __all_server table", KR(ret), K(server_info_in_table));
510
  }
511
  (void) end_trans_and_on_server_change_(ret, trans, "add_server", server, zone, now);
512
  return ret;
513
}
514
int ObServerZoneOpService::delete_server_(
515
    const common::ObAddr &server,
516
    const ObZone &zone)
517
{
518
  int ret = OB_SUCCESS;
519
  ObServerInfoInTable server_info_in_table;
520
  const int64_t now = ObTimeUtility::current_time();
521
  char ip[OB_MAX_SERVER_ADDR_SIZE] = "";
522
  ObMySQLTransaction trans;
523
  int64_t job_id = 0;
524
  if (OB_UNLIKELY(!is_inited_)) {
525
    ret = OB_NOT_INIT;
526
    LOG_WARN("not init", KR(ret), K(is_inited_));
527
  } else if (OB_UNLIKELY(!server.is_valid() || !server.ip_to_string(ip, sizeof(ip)))) {
528
    ret = OB_INVALID_ARGUMENT;
529
    LOG_WARN("invalid argument", KR(ret), K(server));
530
  } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(server_change_callback_)) {
531
    ret = OB_ERR_UNEXPECTED;
532
    LOG_WARN("sql_proxy_ or server_change_callback_ is null", KR(ret),
533
        KP(sql_proxy_), KP(server_change_callback_));
534
  } else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
535
    LOG_WARN("fail to start trans", KR(ret));
536
  } else if (OB_FAIL(check_and_update_service_epoch_(trans))) {
537
    LOG_WARN("fail to check and update service epoch", KR(ret));
538
  } else if (OB_FAIL(ObServerTableOperator::get(trans, server, server_info_in_table))) {
539
    LOG_WARN("fail to get server_info in table", KR(ret), K(server));
540
  } else if (!zone.is_empty() && zone != server_info_in_table.get_zone()) {
541
    ret = OB_SERVER_ZONE_NOT_MATCH;
542
    LOG_WARN("zone not matches", KR(ret), K(server), K(zone), K(server_info_in_table));
543
  } else if (OB_UNLIKELY(server_info_in_table.is_deleting())) {
544
    ret = OB_SERVER_ALREADY_DELETED;
545
    LOG_WARN("the server has been deleted", KR(ret), K(server_info_in_table));
546
  } else if (OB_FAIL(RS_JOB_CREATE_WITH_RET(
547
      job_id,
548
      JOB_TYPE_DELETE_SERVER,
549
      trans,
550
      "svr_ip", ip,
551
      "svr_port", server.get_port()))) {
552
    LOG_WARN("fail to create rs job DELETE_SERVER", KR(ret));
553
  } else if (OB_FAIL(ObServerTableOperator::update_status(
554
      trans,
555
      server,
556
      server_info_in_table.get_status(),
557
      ObServerStatus::OB_SERVER_DELETING))) {
558
    LOG_WARN("fail to update status", KR(ret), K(server), K(server_info_in_table));
559
  }
560
  (void) end_trans_and_on_server_change_(ret, trans, "delete_server", server, server_info_in_table.get_zone(), now);
561
  return ret;
562
}
563
int ObServerZoneOpService::check_and_end_delete_server_(
564
    common::ObMySQLTransaction &trans,
565
    const common::ObAddr &server,
566
    const ObZone &zone,
567
    const bool is_cancel,
568
    share::ObServerInfoInTable &server_info)
569
{
570
  int ret = OB_SUCCESS;
571
  server_info.reset();
572
  char ip[OB_MAX_SERVER_ADDR_SIZE] = "";
573
  if (OB_UNLIKELY(!is_inited_)) {
574
    ret = OB_NOT_INIT;
575
    LOG_WARN("not init", KR(ret), K(is_inited_));
576
  } else if (OB_UNLIKELY(!server.is_valid() || !server.ip_to_string(ip, sizeof(ip)))) {
577
    ret = OB_INVALID_ARGUMENT;
578
    LOG_WARN("invalid argument", KR(ret), K(server));
579
  } else if (OB_FAIL(check_and_update_service_epoch_(trans))) {
580
    LOG_WARN("fail to check and update service epoch", KR(ret));
581
  } else if (OB_FAIL(ObServerTableOperator::get(trans, server, server_info))) {
582
    LOG_WARN("fail to get server_info in table", KR(ret), K(server));
583
  } else if (!zone.is_empty() && zone != server_info.get_zone()) {
584
    ret = OB_SERVER_ZONE_NOT_MATCH;
585
    LOG_WARN("zone not matches", KR(ret), K(server), K(zone), K(server_info));
586
  } else if (OB_UNLIKELY(!server_info.is_deleting())) {
587
    ret = OB_SERVER_NOT_DELETING;
588
    LOG_ERROR("server is not in deleting status, cannot be removed from __all_server table",
589
        KR(ret), K(server_info));
590
  } else {
591
    int64_t job_id = 0;
592
    ret = RS_JOB_FIND(DELETE_SERVER, job_id, trans,
593
                      "svr_ip", ip, "svr_port", server.get_port());
594
    if (OB_SUCC(ret)  && job_id > 0) {
595
      int tmp_ret = is_cancel ? OB_CANCELED : OB_SUCCESS;
596
      if (OB_FAIL(RS_JOB_COMPLETE(job_id, tmp_ret, trans))) {
597
        LOG_WARN("fail to all_rootservice_job" , KR(ret), K(server));
598
      }
599
    } else {
600
      LOG_WARN("failed to find job", KR(ret), K(server));
601
      if (OB_ENTRY_NOT_EXIST == ret) {
602
        ret = OB_SUCCESS;
603
      }
604
    }
605
  }
606
  return ret;
607
}
608
int ObServerZoneOpService::start_or_stop_server_(
609
    const common::ObAddr &server,
610
    const ObZone &zone,
611
    const obrpc::ObAdminServerArg::AdminServerOp &op)
612
{
613
  int ret = OB_SUCCESS;
614
  const int64_t now = ObTimeUtility::current_time();
615
  ObServerInfoInTable server_info;
616
  ObMySQLTransaction trans;
617
  bool is_start = (ObAdminServerArg::START == op);
618
  if (OB_UNLIKELY(!is_inited_)) {
619
    ret = OB_NOT_INIT;
620
    LOG_WARN("not init", KR(ret), K(is_inited_));
621
  } else if (OB_UNLIKELY(!server.is_valid())) {
622
    ret = OB_INVALID_ARGUMENT;
623
    LOG_WARN("invalid argument", KR(ret), K(server));
624
  } else if (OB_ISNULL(sql_proxy_)) {
625
    ret = OB_ERR_UNEXPECTED;
626
    LOG_WARN("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
627
  } else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
628
    LOG_WARN("fail to start trans", KR(ret));
629
  } else if (OB_FAIL(check_and_update_service_epoch_(trans))) {
630
    LOG_WARN("fail to check and update service epoch", KR(ret));
631
  } else if (OB_FAIL(ObServerTableOperator::get(trans, server, server_info))) {
632
    LOG_WARN("fail to get server_info", KR(ret), K(server));
633
  } else if (!zone.is_empty() && zone != server_info.get_zone()) {
634
    ret = OB_SERVER_ZONE_NOT_MATCH;
635
    LOG_WARN("zone not matches", KR(ret), K(server), K(zone), K(server_info));
636
  } else if (ObAdminServerArg::STOP == op || ObAdminServerArg::FORCE_STOP == op) {
637
    // check again, if there exists stopped servers in other zones
638
    if (ObRootUtils::have_other_stop_task(server_info.get_zone())) {
639
      ret = OB_STOP_SERVER_IN_MULTIPLE_ZONES;
640
      LOG_WARN("can not stop servers in multiple zones", KR(ret), K(server_info.get_zone()));
641
      LOG_USER_ERROR(OB_STOP_SERVER_IN_MULTIPLE_ZONES,
642
          "cannot stop server or stop zone in multiple zones");
643
    }
644
  }
645
  if (OB_SUCC(ret)) {
646
      int64_t new_stop_time = is_start ? 0 : now;
647
      int64_t old_stop_time = server_info.get_stop_time();
648
      if ((is_start && 0 != old_stop_time) || (!is_start && 0 == old_stop_time)) {
649
        if (OB_FAIL(ObServerTableOperator::update_stop_time(
650
          trans,
651
          server,
652
          old_stop_time,
653
          new_stop_time))) {
654
            LOG_WARN("fail to update stop_time", KR(ret), K(server), K(old_stop_time), K(new_stop_time));
655
        }
656
      }
657
      LOG_INFO("update stop time", KR(ret), K(server_info),
658
          K(old_stop_time), K(new_stop_time), K(op), K(is_start));
659
  }
660
  const char *op_print_str = is_start ? "start_server" : "stop_server";
661
  (void) end_trans_and_on_server_change_(ret, trans, op_print_str, server, server_info.get_zone(), now);
662
  return ret;
663
}
664

665
int ObServerZoneOpService::construct_rs_list_arg(ObRsListArg &rs_list_arg)
666
{
667
  int ret = OB_SUCCESS;
668
  ObLSInfo ls_info;
669
  if (OB_UNLIKELY(!is_inited_)) {
670
    ret = OB_NOT_INIT;
671
    LOG_WARN("not init", KR(ret), K(is_inited_));
672
  } else if (OB_ISNULL(lst_operator_)) {
673
    ret = OB_ERR_UNEXPECTED;
674
    LOG_WARN("lst operator is null", KR(ret), KP(lst_operator_));
675
  } else if (OB_FAIL(lst_operator_->get(
676
      GCONF.cluster_id,
677
      OB_SYS_TENANT_ID,
678
      SYS_LS,
679
      share::ObLSTable::DEFAULT_MODE,
680
      ls_info))) {
681
    LOG_WARN("fail to get ls info", KR(ret));
682
  } else {
683
    rs_list_arg.master_rs_ = GCONF.self_addr_;
684
    FOREACH_CNT_X(replica, ls_info.get_replicas(), OB_SUCC(ret)) {
685
      if (replica->get_server() == GCONF.self_addr_
686
          || (replica->is_in_service()
687
              && ObReplicaTypeCheck::is_paxos_replica_V2(replica->get_replica_type()))) {
688
        if (OB_FAIL(rs_list_arg.rs_list_.push_back(replica->get_server()))) {
689
          LOG_WARN("fail to push a server into rs list", KR(ret), K(replica->get_server()));
690
        }
691
      }
692
    }
693
  }
694
  return ret;
695
}
696
int ObServerZoneOpService::check_and_update_service_epoch_(ObMySQLTransaction &trans)
697
{
698
  int ret = OB_SUCCESS;
699
  int64_t service_epoch_in_table = palf::INVALID_PROPOSAL_ID;
700
  int64_t proposal_id = palf::INVALID_PROPOSAL_ID;
701
  ObRole role;
702
  if (OB_UNLIKELY(!is_inited_)) {
703
    ret = OB_NOT_INIT;
704
    LOG_WARN("not init", KR(ret), K(is_inited_));
705
  } else if (OB_FAIL(ObRootUtils::get_proposal_id_from_sys_ls(proposal_id, role))) {
706
    LOG_WARN("fail to get proposal id from sys ls", KR(ret));
707
  } else if (ObRole::LEADER != role) {
708
    ret = OB_NOT_MASTER;
709
    LOG_WARN("not leader ls", KR(ret), K(proposal_id), K(service_epoch_in_table), K(role));
710
  } else if (palf::INVALID_PROPOSAL_ID == proposal_id) {
711
    ret = OB_ERR_UNEXPECTED;
712
    LOG_WARN("invalid proposal id", KR(ret), K(proposal_id));
713
  } else if (OB_FAIL(ObServiceEpochProxy::check_and_update_service_epoch(
714
      trans,
715
      OB_SYS_TENANT_ID,
716
      ObServiceEpochProxy::SERVER_ZONE_OP_SERVICE_EPOCH,
717
      proposal_id))) {
718
    LOG_WARN("fail to check and update server zone op service epoch", KR(ret), K(proposal_id));
719
  } else {}
720
  return ret;
721
}
722
int ObServerZoneOpService::fetch_new_server_id_(uint64_t &server_id)
723
{
724
  int ret = OB_SUCCESS;
725
  if (OB_UNLIKELY(!is_inited_)) {
726
    ret = OB_NOT_INIT;
727
    LOG_WARN("not init", KR(ret), K(is_inited_));
728
  } else if (OB_ISNULL(sql_proxy_)) {
729
    ret = OB_ERR_UNEXPECTED;
730
    LOG_WARN("invalid sql proxy", KR(ret), KP(sql_proxy_));
731
  } else {
732
    uint64_t new_max_id = OB_INVALID_ID;
733
    ObMaxIdFetcher id_fetcher(*sql_proxy_);
734
    if (OB_FAIL(id_fetcher.fetch_new_max_id(
735
        OB_SYS_TENANT_ID,
736
        OB_MAX_USED_SERVER_ID_TYPE,
737
        new_max_id))) {
738
      LOG_WARN("fetch_new_max_id failed", KR(ret));
739
    } else {
740
      server_id = new_max_id;
741
    }
742
  }
743
  return ret;
744
}
745
int ObServerZoneOpService::check_server_have_enough_resource_for_delete_server_(
746
    const ObIArray<ObAddr> &servers,
747
    const ObZone &zone)
748
{
749
  int ret = OB_SUCCESS;
750
  if (OB_UNLIKELY(!is_inited_)) {
751
    ret = OB_NOT_INIT;
752
    LOG_WARN("not init", KR(ret), K(is_inited_));
753
  } else if (OB_ISNULL(unit_manager_) || OB_ISNULL(sql_proxy_)) {
754
    ret = OB_ERR_UNEXPECTED;
755
    LOG_WARN("unit_manager_ or sql_proxy_ is null", KR(ret), KP(unit_manager_), KP(sql_proxy_));
756
  } else {
757
    ObServerInfoInTable server_info;
758
    FOREACH_CNT_X(server, servers, OB_SUCC(ret)) {
759
      server_info.reset();
760
      if (OB_FAIL(ObServerTableOperator::get(*sql_proxy_, *server, server_info))) {
761
        LOG_WARN("fail to get server_info in table", KR(ret), KP(sql_proxy_), KPC(server));
762
      } else if (!zone.is_empty() && server_info.get_zone() != zone) {
763
        ret = OB_SERVER_ZONE_NOT_MATCH;
764
        LOG_WARN("the arg zone is not the same as the server's zone in __all_server table", KR(ret),
765
            K(zone), K(server_info));
766
      } else if (OB_FAIL(unit_manager_->check_enough_resource_for_delete_server(
767
              *server, server_info.get_zone()))) {
768
        LOG_WARN("fail to check enouch resource", KR(ret), KPC(server), K(server_info));
769
      }
770
    }//end for each
771
  }
772
  return ret;
773
}
774
int ObServerZoneOpService::check_zone_and_server_(
775
    const ObIArray<share::ObServerInfoInTable> &servers_info,
776
    const ObIArray<ObAddr> &servers,
777
    bool &is_same_zone,
778
    bool &is_all_stopped)
779
{
780
  int ret = OB_SUCCESS;
781
  is_same_zone = true;
782
  is_all_stopped = true;
783
  if (OB_UNLIKELY(!is_inited_)) {
784
    ret = OB_NOT_INIT;
785
    LOG_WARN("not init", KR(ret), K(is_inited_));
786
  } else {
787
    ObServerInfoInTable server_info;
788
    ObZone zone;
789
    for (int64_t i = 0; i < servers.count() && OB_SUCC(ret) && (is_same_zone || is_all_stopped); i++) {
790
      const ObAddr &server = servers.at(i);
791
      server_info.reset();
792
      if (OB_FAIL(ObRootUtils::find_server_info(servers_info, server, server_info))) {
793
        LOG_WARN("fail to get server info", KR(ret), K(servers_info), K(server));
794
      } else if (0 == i) {
795
        if (OB_FAIL(zone.assign(server_info.get_zone()))) {
796
          LOG_WARN("fail to assign zone", KR(ret), K(server_info.get_zone()));
797
        }
798
      } else if (zone != server_info.get_zone()) {
799
        is_same_zone = false;
800
        LOG_WARN("server zone not same", K(zone), K(server_info), K(servers));
801
      }
802
      if (OB_FAIL(ret)) {
803
      } else if (!server_info.is_stopped()) {
804
        is_all_stopped = false;
805
      }
806
    }
807
  }
808
  return ret;
809
}
810
ERRSIM_POINT_DEF(ALL_SERVER_LIST_ERROR);
811
void ObServerZoneOpService::end_trans_and_on_server_change_(
812
    int &ret,
813
    common::ObMySQLTransaction &trans,
814
    const char *op_print_str,
815
    const common::ObAddr &server,
816
    const ObZone &zone,
817
    const int64_t start_time)
818
{
819
  int tmp_ret = OB_SUCCESS;
820
  LOG_INFO("start execute end_trans_and_on_server_change_", KR(ret),
821
      K(op_print_str), K(server), K(zone), K(start_time));
822
  if (OB_UNLIKELY(!trans.is_started())) {
823
    LOG_WARN("the transaction is not started");
824
  } else {
825
    if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
826
      LOG_WARN("fail to commit the transaction", KR(ret), KR(tmp_ret), K(server), K(zone));
827
      ret = OB_SUCC(ret) ? tmp_ret : ret;
828
    }
829
  }
830
  if (OB_TMP_FAIL(SVR_TRACER.refresh())) {
831
    LOG_WARN("fail to refresh server tracer", KR(ret), KR(tmp_ret));
832
  }
833
  bool no_on_server_change = ALL_SERVER_LIST_ERROR ? true : false;
834
  if (OB_ISNULL(server_change_callback_)) {
835
    tmp_ret = OB_ERR_UNEXPECTED;
836
    LOG_WARN("server_change_callback_ is null", KR(ret), KR(tmp_ret), KP(server_change_callback_));
837
    ret = OB_SUCC(ret) ? tmp_ret : ret;
838
  } else if (no_on_server_change) {
839
  } else if (OB_TMP_FAIL(server_change_callback_->on_server_change())) {
840
    LOG_WARN("fail to callback on server change", KR(ret), KR(tmp_ret));
841
  }
842
  int64_t time_cost = ::oceanbase::common::ObTimeUtility::current_time() - start_time;
843
  FLOG_INFO(op_print_str, K(server),  K(zone), "time cost", time_cost, KR(ret));
844
  ROOTSERVICE_EVENT_ADD("server", op_print_str, K(server), K(ret));
845
}
846
}
847
}
848

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.