oceanbase
847 строк · 34.3 Кб
1/**
2* Copyright (c) 2022 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14
15#include "ob_server_zone_op_service.h"
16
17#include "share/ob_zone_table_operation.h"
18#include "share/ob_service_epoch_proxy.h"
19#include "share/ob_max_id_fetcher.h"
20#include "lib/mysqlclient/ob_mysql_transaction.h" // ObMySQLTransaction
21#include "lib/utility/ob_tracepoint.h" // ERRSIM
22#include "rootserver/ob_root_service.h" // callback
23#include "share/ob_all_server_tracer.h"
24#include "rootserver/ob_server_manager.h"
25
26namespace oceanbase
27{
28using namespace common;
29using namespace share;
30using namespace obrpc;
31namespace rootserver
32{
33ObServerZoneOpService::ObServerZoneOpService()
34: is_inited_(false),
35server_change_callback_(NULL),
36rpc_proxy_(NULL),
37sql_proxy_(NULL),
38lst_operator_(NULL),
39unit_manager_(NULL)
40#ifdef OB_BUILD_TDE_SECURITY
41, master_key_mgr_()
42#endif
43{
44}
45ObServerZoneOpService::~ObServerZoneOpService()
46{
47}
48int ObServerZoneOpService::init(
49ObIServerChangeCallback &server_change_callback,
50ObSrvRpcProxy &rpc_proxy,
51ObLSTableOperator &lst_operator,
52ObUnitManager &unit_manager,
53ObMySQLProxy &sql_proxy
54#ifdef OB_BUILD_TDE_SECURITY
55, ObRsMasterKeyManager *master_key_mgr
56#endif
57)
58{
59int ret = OB_SUCCESS;
60if (OB_UNLIKELY(is_inited_)) {
61ret = OB_INIT_TWICE;
62LOG_WARN("server zone operation service has been inited already", KR(ret), K(is_inited_));
63#ifdef OB_BUILD_TDE_SECURITY
64} else if (OB_ISNULL(master_key_mgr)) {
65ret = OB_ERR_UNEXPECTED;
66LOG_WARN("master key mgr is null", KR(ret), KP(master_key_mgr));
67#endif
68} else if (OB_FAIL(st_operator_.init(&sql_proxy))) {
69LOG_WARN("fail to init server table operator", KR(ret));
70} else {
71server_change_callback_ = &server_change_callback;
72rpc_proxy_ = &rpc_proxy;
73sql_proxy_ = &sql_proxy;
74lst_operator_ = &lst_operator;
75unit_manager_ = &unit_manager;
76#ifdef OB_BUILD_TDE_SECURITY
77master_key_mgr_ = master_key_mgr;
78#endif
79is_inited_ = true;
80}
81return ret;
82}
83int ObServerZoneOpService::add_servers(const ObIArray<ObAddr> &servers, const ObZone &zone, bool is_bootstrap)
84{
85int ret = OB_SUCCESS;
86uint64_t sys_tenant_data_version = 0;
87ObCheckServerForAddingServerArg rpc_arg;
88ObCheckServerForAddingServerResult rpc_result;
89ObZone picked_zone;
90ObTimeoutCtx ctx;
91#ifdef OB_BUILD_TDE_SECURITY
92ObWaitMasterKeyInSyncArg wms_in_sync_arg;
93// master key mgr sync
94#endif
95if (OB_UNLIKELY(!is_inited_)) {
96ret = OB_NOT_INIT;
97LOG_WARN("not init", KR(ret), K(is_inited_));
98} else if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, sys_tenant_data_version))) {
99LOG_WARN("fail to get sys tenant's min data version", KR(ret));
100} else if (OB_ISNULL(rpc_proxy_)) {
101ret = OB_ERR_UNEXPECTED;
102LOG_WARN("rpc_proxy_ is null", KR(ret), KP(rpc_proxy_));
103#ifdef OB_BUILD_TDE_SECURITY
104} else if (OB_ISNULL(master_key_mgr_)) {
105ret = OB_ERR_UNEXPECTED;
106LOG_WARN("master_key_mgr_ is null", KR(ret), KP(master_key_mgr_));
107} else if (OB_FAIL(construct_rs_list_arg(wms_in_sync_arg.rs_list_arg_))) {
108LOG_WARN("fail to construct rs list arg", KR(ret));
109#endif
110} else if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
111LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx));
112} else {
113#ifdef OB_BUILD_TDE_SECURITY
114SpinRLockGuard sync_guard(master_key_mgr_->sync());
115#endif
116for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); ++i) {
117const ObAddr &addr = servers.at(i);
118int64_t timeout = ctx.get_timeout();
119uint64_t server_id = OB_INVALID_ID;
120const int64_t ERR_MSG_BUF_LEN = OB_MAX_SERVER_ADDR_SIZE + 100;
121char non_empty_server_err_msg[ERR_MSG_BUF_LEN] = "";
122int64_t pos = 0;
123rpc_arg.reset();
124if (OB_UNLIKELY(timeout <= 0)) {
125ret = OB_TIMEOUT;
126LOG_WARN("ctx time out", KR(ret), K(timeout));
127} else if (OB_FAIL(databuff_printf(
128non_empty_server_err_msg,
129ERR_MSG_BUF_LEN,
130pos,
131"add non-empty server %s",
132to_cstring(addr)))) {
133LOG_WARN("fail to execute databuff_printf", KR(ret), K(addr));
134} else if (OB_FAIL(fetch_new_server_id_(server_id))) {
135// fetch a new server id and insert the server into __all_server table
136LOG_WARN("fail to fetch new server id", KR(ret));
137} else if (OB_UNLIKELY(!is_valid_server_id(server_id))) {
138ret = OB_INVALID_ARGUMENT;
139LOG_WARN("server id is invalid", KR(ret), K(server_id));
140} else if (OB_FAIL(rpc_arg.init(
141ObCheckServerForAddingServerArg::ADD_SERVER,
142sys_tenant_data_version,
143server_id))) {
144LOG_WARN("fail to init rpc arg", KR(ret), K(sys_tenant_data_version), K(server_id));
145} else if (OB_FAIL(rpc_proxy_->to(addr)
146.timeout(timeout)
147.check_server_for_adding_server(rpc_arg, rpc_result))) {
148LOG_WARN("fail to check whether the server is empty", KR(ret), K(addr));
149} else if (!rpc_result.get_is_server_empty()) {
150ret = OB_OP_NOT_ALLOW;
151LOG_WARN("adding non-empty server is not allowed", KR(ret));
152LOG_USER_ERROR(OB_OP_NOT_ALLOW, non_empty_server_err_msg);
153} else if (OB_FAIL(zone_checking_for_adding_server_(zone, rpc_result.get_zone(), picked_zone))) {
154LOG_WARN("zone checking for adding server is failed", KR(ret), K(zone), K(rpc_result.get_zone()));
155#ifdef OB_BUILD_TDE_SECURITY
156} else if (!is_bootstrap && OB_FAIL(master_key_checking_for_adding_server(addr, picked_zone, wms_in_sync_arg))) {
157LOG_WARN("master key checking for adding server is failed", KR(ret), K(addr), K(picked_zone));
158#endif
159} else if (OB_FAIL(add_server_(
160addr,
161server_id,
162picked_zone,
163rpc_result.get_sql_port(),
164rpc_result.get_build_version()))) {
165LOG_WARN("add_server failed", KR(ret), K(addr), K(server_id), K(picked_zone), "sql_port",
166rpc_result.get_sql_port(), "build_version", rpc_result.get_build_version());
167} else {}
168}
169}
170return ret;
171}
172int ObServerZoneOpService::delete_servers(
173const ObIArray<ObAddr> &servers,
174const ObZone &zone)
175{
176int ret = OB_SUCCESS;
177if (OB_UNLIKELY(!is_inited_)) {
178ret = OB_NOT_INIT;
179LOG_WARN("not init", KR(ret), K(is_inited_));
180} else if (OB_ISNULL(GCTX.root_service_)) {
181ret = OB_ERR_UNEXPECTED;
182LOG_WARN("root_service_ is null", KR(ret), KP(GCTX.root_service_));
183} else if (OB_UNLIKELY(servers.count() <= 0)) {
184ret = OB_INVALID_ARGUMENT;
185LOG_WARN("invalid argument", KR(ret), K(servers));
186} else if (OB_FAIL(check_server_have_enough_resource_for_delete_server_(servers, zone))) {
187LOG_WARN("not enough resource, cannot delete servers", KR(ret), K(servers), K(zone));
188} else if (OB_FAIL(GCTX.root_service_->check_all_ls_has_leader("delete server"))) {
189LOG_WARN("fail to check whether all ls has leader", KR(ret));
190} else {
191for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); ++i) {
192if (OB_FAIL(delete_server_(servers.at(i), zone))) {
193LOG_WARN("delete_server failed", "server", servers.at(i), "zone", zone, KR(ret));
194}
195}
196}
197return ret;
198}
199int ObServerZoneOpService::cancel_delete_servers(
200const ObIArray<ObAddr> &servers,
201const ObZone &zone)
202{
203int ret = OB_SUCCESS;
204if (OB_UNLIKELY(!is_inited_)) {
205ret = OB_NOT_INIT;
206LOG_WARN("not init", KR(ret), K(is_inited_));
207} else if (OB_ISNULL(unit_manager_) || OB_ISNULL(sql_proxy_)) {
208ret = OB_ERR_UNEXPECTED;
209LOG_WARN("unit_manager_ or sql_proxy_ or server_change_callback_ is null", KR(ret),
210KP(unit_manager_), KP(sql_proxy_));
211} else {
212ObServerInfoInTable server_info_in_table;
213for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); ++i) {
214const ObAddr &server = servers.at(i);
215const int64_t now = ObTimeUtility::current_time();
216ObMySQLTransaction trans;
217server_info_in_table.reset();
218if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
219LOG_WARN("fail to start trans", KR(ret));
220} else if (OB_FAIL(check_and_end_delete_server_(trans, server, zone, true /* is_cancel */, server_info_in_table))) {
221LOG_WARN("fail to check and end delete server", KR(ret), K(server), K(zone));
222} else if (OB_FAIL(ObServerTableOperator::update_status(
223trans,
224server,
225ObServerStatus::OB_SERVER_DELETING,
226server_info_in_table.is_alive() ? ObServerStatus::OB_SERVER_ACTIVE : ObServerStatus::OB_SERVER_INACTIVE))) {
227LOG_WARN("fail to update status in __all_server table", KR(ret),
228K(server), K(server_info_in_table));
229} else if (OB_FAIL(unit_manager_->cancel_migrate_out_units(server))) {
230LOG_WARN("unit_manager_ cancel_migrate_out_units failed", KR(ret), K(server));
231}
232(void) end_trans_and_on_server_change_(ret, trans, "cancel_delete_server", server, server_info_in_table.get_zone(), now);
233}
234}
235return ret;
236}
237int ObServerZoneOpService::finish_delete_server(
238const ObAddr &server,
239const ObZone &zone)
240{
241int ret = OB_SUCCESS;
242ObServerInfoInTable server_info_in_table;
243const int64_t now = ObTimeUtility::current_time();
244ObMySQLTransaction trans;
245if (OB_UNLIKELY(!is_inited_)) {
246ret = OB_NOT_INIT;
247LOG_WARN("not init", KR(ret), K(is_inited_));
248} else if (OB_ISNULL(sql_proxy_)) {
249ret = OB_ERR_UNEXPECTED;
250LOG_WARN("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
251} else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
252LOG_WARN("fail to start trans", KR(ret));
253} else if (OB_FAIL(check_and_end_delete_server_(trans, server, zone, false /* is_cancel */, server_info_in_table))) {
254LOG_WARN("fail to check and end delete server", KR(ret), K(server), K(zone));
255} else if (OB_FAIL(ObServerManager::try_delete_server_working_dir(
256server_info_in_table.get_zone(),
257server,
258server_info_in_table.get_server_id()))) {
259LOG_WARN("fail to delete server working dir", KR(ret), K(server_info_in_table));
260} else if (OB_FAIL(st_operator_.remove(server, trans))) {
261LOG_WARN("fail to remove this server from __all_server table", KR(ret), K(server));
262}
263(void) end_trans_and_on_server_change_(ret, trans, "finish_delete_server", server, server_info_in_table.get_zone(), now);
264return ret;
265}
266int ObServerZoneOpService::stop_servers(
267const ObIArray<ObAddr> &servers,
268const ObZone &zone,
269const obrpc::ObAdminServerArg::AdminServerOp &op)
270{
271int ret = OB_SUCCESS;
272if (OB_UNLIKELY(!is_inited_)) {
273ret = OB_NOT_INIT;
274LOG_WARN("not init", KR(ret), K(is_inited_));
275} else if (OB_FAIL(stop_server_precheck(servers, op))) {
276LOG_WARN("fail to precheck stop server", KR(ret), K(servers), K(zone));
277} else {
278for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); i++) {
279const ObAddr &server = servers.at(i);
280if (OB_FAIL(start_or_stop_server_(server, zone, op))) {
281LOG_WARN("fail to stop server", KR(ret), K(server), K(zone));
282}
283}
284}
285return ret;
286}
287int ObServerZoneOpService::start_servers(
288const ObIArray<ObAddr> &servers,
289const ObZone &zone)
290{
291int ret = OB_SUCCESS;
292if (OB_UNLIKELY(!is_inited_)) {
293ret = OB_NOT_INIT;
294LOG_WARN("not init", KR(ret), K(is_inited_));
295} else if (OB_UNLIKELY(servers.count() <= 0)) {
296ret = OB_INVALID_ARGUMENT;
297LOG_WARN("servers' count is zero", KR(ret), K(servers));
298} else {
299for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); ++i) {
300const ObAddr &server = servers.at(i);
301if (OB_FAIL(start_or_stop_server_(server, zone, ObAdminServerArg::START))) {
302LOG_WARN("fail to start server", KR(ret), K(server), K(zone));
303}
304}
305}
306return ret;
307}
308#ifdef OB_BUILD_TDE_SECURITY
309int ObServerZoneOpService::master_key_checking_for_adding_server(
310const common::ObAddr &server,
311const ObZone &zone,
312obrpc::ObWaitMasterKeyInSyncArg &wms_in_sync_arg)
313{
314int ret = OB_SUCCESS;
315if (OB_UNLIKELY(!is_inited_)) {
316ret = OB_NOT_INIT;
317LOG_WARN("not init", KR(ret), K(is_inited_));
318} else if (OB_ISNULL(master_key_mgr_)) {
319ret = OB_ERR_UNEXPECTED;
320LOG_WARN("master_key_mgr_ is null", KR(ret), KP(master_key_mgr_));
321} else {
322bool master_key_empty = true;
323share::ObLeaseResponse tmp_lease_response;
324bool encryption = false;
325ObTimeoutCtx ctx;
326if (OB_FAIL(master_key_mgr_->check_master_key_empty(master_key_empty))) {
327LOG_WARN("fail to check whether master key is empty", KR(ret));
328} else if (master_key_empty) {
329LOG_INFO("empty master key, no need to sync master key info");
330} else if (!master_key_empty && zone.is_empty()) {
331ret = OB_NOT_SUPPORTED;
332LOG_USER_ERROR(OB_NOT_SUPPORTED, "not support to add a server "
333"without a specified zone when the master key is valid");
334} else if (OB_FAIL(ObZoneTableOperation::check_encryption_zone(*sql_proxy_, zone, encryption))) {
335LOG_WARN("fail to check zone encryption", KR(ret), "zone", zone);
336} else if (encryption) {
337LOG_INFO("server in encrypted zone, no need to sync master key info", "zone", zone);
338} else if (OB_FAIL(master_key_mgr_->get_all_tenant_master_key(
339zone, wms_in_sync_arg.tenant_max_key_version_))) {
340LOG_WARN("fail to get all tenant master key", KR(ret));
341} else if (OB_FAIL(OTC_MGR.get_lease_response(tmp_lease_response))) {
342LOG_WARN("fail to get lease response", KR(ret));
343} else if (OB_FAIL(wms_in_sync_arg.tenant_config_version_.assign(
344tmp_lease_response.tenant_config_version_))) {
345LOG_WARN("fail to assign tenant config version", KR(ret));
346} else if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
347LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx));
348} else {
349int64_t timeout = ctx.get_timeout();
350if (OB_UNLIKELY(timeout <= 0)) {
351ret = OB_TIMEOUT;
352LOG_WARN("ctx time out", KR(ret), K(timeout));
353} else if (OB_FAIL(rpc_proxy_->to(server)
354.timeout(timeout)
355.wait_master_key_in_sync(wms_in_sync_arg))) {
356LOG_WARN("fail to wait master key in sync", KR(ret), K(server));
357} else {}
358}
359}
360return ret;
361}
362#endif
363int ObServerZoneOpService::stop_server_precheck(
364const ObIArray<ObAddr> &servers,
365const obrpc::ObAdminServerArg::AdminServerOp &op)
366{
367int ret = OB_SUCCESS;
368ObZone zone;
369bool is_same_zone = false;
370bool is_all_stopped = false;
371ObArray<ObServerInfoInTable> all_servers_info_in_table;
372ObServerInfoInTable server_info;
373if (OB_UNLIKELY(!is_inited_)) {
374ret = OB_NOT_INIT;
375LOG_WARN("not init", KR(ret), K(is_inited_));
376} else if (OB_UNLIKELY(servers.count() <= 0)) {
377ret = OB_INVALID_ARGUMENT;
378LOG_WARN("servers' count is zero", KR(ret), K(servers));
379} else if (OB_ISNULL(GCTX.root_service_) || OB_ISNULL(sql_proxy_)) {
380ret = OB_ERR_UNEXPECTED;
381LOG_WARN("GCTX.root_service_ or sql_proxy_ is null", KR(ret), KP(GCTX.root_service_), KP(sql_proxy_));
382} else if (OB_FAIL(ObServerTableOperator::get(*sql_proxy_, all_servers_info_in_table))) {
383LOG_WARN("fail to read __all_server table", KR(ret), KP(sql_proxy_));
384} else if (OB_FAIL(check_zone_and_server_(
385all_servers_info_in_table,
386servers,
387is_same_zone,
388is_all_stopped))) {
389LOG_WARN("fail to check zone and server", KR(ret), K(all_servers_info_in_table), K(servers));
390} else if (is_all_stopped) {
391//nothing todo
392} else if (!is_same_zone) {
393ret = OB_STOP_SERVER_IN_MULTIPLE_ZONES;
394LOG_WARN("can not stop servers in multiple zones", KR(ret), K(server_info), K(servers));
395} else if (OB_FAIL((ObRootUtils::find_server_info(all_servers_info_in_table, servers.at(0), server_info)))) {
396LOG_WARN("fail to find server info", KR(ret), K(all_servers_info_in_table), K(servers.at(0)));
397} else {
398const ObZone &zone = server_info.get_zone();
399if (ObAdminServerArg::ISOLATE == op) {
400//"Isolate server" does not need to check the total number and status of replicas; it cannot be restarted later;
401if (OB_FAIL(GCTX.root_service_->check_can_stop(zone, servers, false /*is_stop_zone*/))) {
402LOG_WARN("fail to check can stop", KR(ret), K(zone), K(servers), K(op));
403if (OB_OP_NOT_ALLOW == ret) {
404LOG_USER_ERROR(OB_OP_NOT_ALLOW, "Stop all servers in primary region is");
405}
406}
407} else {
408if (ObRootUtils::have_other_stop_task(zone)) {
409ret = OB_STOP_SERVER_IN_MULTIPLE_ZONES;
410LOG_WARN("can not stop servers in multiple zones", KR(ret), K(zone), K(servers), K(op));
411LOG_USER_ERROR(OB_STOP_SERVER_IN_MULTIPLE_ZONES,
412"cannot stop server or stop zone in multiple zones");
413} else if (OB_FAIL(GCTX.root_service_->check_majority_and_log_in_sync(
414servers,
415ObAdminServerArg::FORCE_STOP == op,/*skip_log_sync_check*/
416"stop server"))) {
417LOG_WARN("fail to check majority and log in-sync", KR(ret), K(zone), K(servers), K(op));
418}
419}
420}
421return ret;
422}
423int ObServerZoneOpService::zone_checking_for_adding_server_(
424const ObZone &command_zone,
425const ObZone &rpc_zone,
426ObZone &picked_zone)
427{
428int ret = OB_SUCCESS;
429// command_zone: the zone specified in the system command ADD SERVER
430// rpc_zone: the zone specified in the server's local config and send to rs via rpc
431// picked_zone: the zone we will use in add_server
432if (OB_UNLIKELY(!is_inited_)) {
433ret = OB_NOT_INIT;
434LOG_WARN("not init", KR(ret), K(is_inited_));
435} else if (OB_UNLIKELY(rpc_zone.is_empty())) {
436ret = OB_INVALID_ARGUMENT;
437LOG_WARN("rpc_zone cannot be empty. It implies that server's local config zone is empty.",
438KR(ret), K(rpc_zone));
439} else if (!command_zone.is_empty() && command_zone != rpc_zone) {
440ret = OB_SERVER_ZONE_NOT_MATCH;
441LOG_WARN("the zone specified in the server's local config is not the same as"
442" the zone specified in the command", KR(ret), K(command_zone), K(rpc_zone));
443} else if (OB_FAIL(picked_zone.assign(rpc_zone))) {
444LOG_WARN("fail to assign picked_zone", KR(ret), K(rpc_zone));
445} else {}
446return ret;
447}
448int ObServerZoneOpService::add_server_(
449const ObAddr &server,
450const uint64_t server_id,
451const ObZone &zone,
452const int64_t sql_port,
453const ObServerInfoInTable::ObBuildVersion &build_version)
454{
455int ret = OB_SUCCESS;
456bool is_active = false;
457const int64_t now = ObTimeUtility::current_time();
458ObServerInfoInTable server_info_in_table;
459ObMySQLTransaction trans;
460if (OB_UNLIKELY(!is_inited_)) {
461ret = OB_NOT_INIT;
462LOG_WARN("not init", KR(ret), K(is_inited_));
463} else if (OB_UNLIKELY(!server.is_valid()
464|| !is_valid_server_id(server_id)
465|| zone.is_empty()
466|| sql_port <= 0
467|| build_version.is_empty())) {
468ret = OB_INVALID_ARGUMENT;
469LOG_WARN("invalid argument", KR(ret), K(server), K(server_id), K(zone), K(sql_port), K(build_version));
470} else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(server_change_callback_)) {
471ret = OB_ERR_UNEXPECTED;
472LOG_WARN("sql_proxy_ or server_change_callback_ is null", KR(ret),
473KP(sql_proxy_), KP(server_change_callback_));
474} else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
475LOG_WARN("fail to start trans", KR(ret));
476} else if (OB_FAIL(check_and_update_service_epoch_(trans))) {
477LOG_WARN("fail to check and update service epoch", KR(ret));
478} else if (OB_FAIL(ObZoneTableOperation::check_zone_active(trans, zone, is_active))){
479// we do not need to lock the zone info in __all_zone table
480// all server/zone operations are mutually exclusive since we locked the service epoch
481LOG_WARN("fail to check whether the zone is active", KR(ret), K(zone));
482} else if (OB_UNLIKELY(!is_active)) {
483ret = OB_ZONE_NOT_ACTIVE;
484LOG_WARN("the zone is not active", KR(ret), K(zone), K(is_active));
485} else if (OB_FAIL(ObServerTableOperator::get(trans, server, server_info_in_table))) {
486if (OB_SERVER_NOT_IN_WHITE_LIST == ret) {
487ret = OB_SUCCESS;
488} else {
489LOG_WARN("fail to get server_info in table", KR(ret), K(server));
490}
491} else {
492ret = OB_ENTRY_EXIST;
493LOG_WARN("server exists", KR(ret), K(server_info_in_table));
494}
495if (FAILEDx(server_info_in_table.init(
496server,
497server_id,
498zone,
499sql_port,
500false, /* with_rootserver */
501ObServerStatus::OB_SERVER_ACTIVE,
502build_version,
5030, /* stop_time */
5040, /* start_service_time */
5050 /* last_offline_time */))) {
506LOG_WARN("fail to init server info in table", KR(ret), K(server), K(server_id), K(zone),
507K(sql_port), K(build_version), K(now));
508} else if (OB_FAIL(ObServerTableOperator::insert(trans, server_info_in_table))) {
509LOG_WARN("fail to insert server info into __all_server table", KR(ret), K(server_info_in_table));
510}
511(void) end_trans_and_on_server_change_(ret, trans, "add_server", server, zone, now);
512return ret;
513}
514int ObServerZoneOpService::delete_server_(
515const common::ObAddr &server,
516const ObZone &zone)
517{
518int ret = OB_SUCCESS;
519ObServerInfoInTable server_info_in_table;
520const int64_t now = ObTimeUtility::current_time();
521char ip[OB_MAX_SERVER_ADDR_SIZE] = "";
522ObMySQLTransaction trans;
523int64_t job_id = 0;
524if (OB_UNLIKELY(!is_inited_)) {
525ret = OB_NOT_INIT;
526LOG_WARN("not init", KR(ret), K(is_inited_));
527} else if (OB_UNLIKELY(!server.is_valid() || !server.ip_to_string(ip, sizeof(ip)))) {
528ret = OB_INVALID_ARGUMENT;
529LOG_WARN("invalid argument", KR(ret), K(server));
530} else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(server_change_callback_)) {
531ret = OB_ERR_UNEXPECTED;
532LOG_WARN("sql_proxy_ or server_change_callback_ is null", KR(ret),
533KP(sql_proxy_), KP(server_change_callback_));
534} else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
535LOG_WARN("fail to start trans", KR(ret));
536} else if (OB_FAIL(check_and_update_service_epoch_(trans))) {
537LOG_WARN("fail to check and update service epoch", KR(ret));
538} else if (OB_FAIL(ObServerTableOperator::get(trans, server, server_info_in_table))) {
539LOG_WARN("fail to get server_info in table", KR(ret), K(server));
540} else if (!zone.is_empty() && zone != server_info_in_table.get_zone()) {
541ret = OB_SERVER_ZONE_NOT_MATCH;
542LOG_WARN("zone not matches", KR(ret), K(server), K(zone), K(server_info_in_table));
543} else if (OB_UNLIKELY(server_info_in_table.is_deleting())) {
544ret = OB_SERVER_ALREADY_DELETED;
545LOG_WARN("the server has been deleted", KR(ret), K(server_info_in_table));
546} else if (OB_FAIL(RS_JOB_CREATE_WITH_RET(
547job_id,
548JOB_TYPE_DELETE_SERVER,
549trans,
550"svr_ip", ip,
551"svr_port", server.get_port()))) {
552LOG_WARN("fail to create rs job DELETE_SERVER", KR(ret));
553} else if (OB_FAIL(ObServerTableOperator::update_status(
554trans,
555server,
556server_info_in_table.get_status(),
557ObServerStatus::OB_SERVER_DELETING))) {
558LOG_WARN("fail to update status", KR(ret), K(server), K(server_info_in_table));
559}
560(void) end_trans_and_on_server_change_(ret, trans, "delete_server", server, server_info_in_table.get_zone(), now);
561return ret;
562}
563int ObServerZoneOpService::check_and_end_delete_server_(
564common::ObMySQLTransaction &trans,
565const common::ObAddr &server,
566const ObZone &zone,
567const bool is_cancel,
568share::ObServerInfoInTable &server_info)
569{
570int ret = OB_SUCCESS;
571server_info.reset();
572char ip[OB_MAX_SERVER_ADDR_SIZE] = "";
573if (OB_UNLIKELY(!is_inited_)) {
574ret = OB_NOT_INIT;
575LOG_WARN("not init", KR(ret), K(is_inited_));
576} else if (OB_UNLIKELY(!server.is_valid() || !server.ip_to_string(ip, sizeof(ip)))) {
577ret = OB_INVALID_ARGUMENT;
578LOG_WARN("invalid argument", KR(ret), K(server));
579} else if (OB_FAIL(check_and_update_service_epoch_(trans))) {
580LOG_WARN("fail to check and update service epoch", KR(ret));
581} else if (OB_FAIL(ObServerTableOperator::get(trans, server, server_info))) {
582LOG_WARN("fail to get server_info in table", KR(ret), K(server));
583} else if (!zone.is_empty() && zone != server_info.get_zone()) {
584ret = OB_SERVER_ZONE_NOT_MATCH;
585LOG_WARN("zone not matches", KR(ret), K(server), K(zone), K(server_info));
586} else if (OB_UNLIKELY(!server_info.is_deleting())) {
587ret = OB_SERVER_NOT_DELETING;
588LOG_ERROR("server is not in deleting status, cannot be removed from __all_server table",
589KR(ret), K(server_info));
590} else {
591int64_t job_id = 0;
592ret = RS_JOB_FIND(DELETE_SERVER, job_id, trans,
593"svr_ip", ip, "svr_port", server.get_port());
594if (OB_SUCC(ret) && job_id > 0) {
595int tmp_ret = is_cancel ? OB_CANCELED : OB_SUCCESS;
596if (OB_FAIL(RS_JOB_COMPLETE(job_id, tmp_ret, trans))) {
597LOG_WARN("fail to all_rootservice_job" , KR(ret), K(server));
598}
599} else {
600LOG_WARN("failed to find job", KR(ret), K(server));
601if (OB_ENTRY_NOT_EXIST == ret) {
602ret = OB_SUCCESS;
603}
604}
605}
606return ret;
607}
608int ObServerZoneOpService::start_or_stop_server_(
609const common::ObAddr &server,
610const ObZone &zone,
611const obrpc::ObAdminServerArg::AdminServerOp &op)
612{
613int ret = OB_SUCCESS;
614const int64_t now = ObTimeUtility::current_time();
615ObServerInfoInTable server_info;
616ObMySQLTransaction trans;
617bool is_start = (ObAdminServerArg::START == op);
618if (OB_UNLIKELY(!is_inited_)) {
619ret = OB_NOT_INIT;
620LOG_WARN("not init", KR(ret), K(is_inited_));
621} else if (OB_UNLIKELY(!server.is_valid())) {
622ret = OB_INVALID_ARGUMENT;
623LOG_WARN("invalid argument", KR(ret), K(server));
624} else if (OB_ISNULL(sql_proxy_)) {
625ret = OB_ERR_UNEXPECTED;
626LOG_WARN("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
627} else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
628LOG_WARN("fail to start trans", KR(ret));
629} else if (OB_FAIL(check_and_update_service_epoch_(trans))) {
630LOG_WARN("fail to check and update service epoch", KR(ret));
631} else if (OB_FAIL(ObServerTableOperator::get(trans, server, server_info))) {
632LOG_WARN("fail to get server_info", KR(ret), K(server));
633} else if (!zone.is_empty() && zone != server_info.get_zone()) {
634ret = OB_SERVER_ZONE_NOT_MATCH;
635LOG_WARN("zone not matches", KR(ret), K(server), K(zone), K(server_info));
636} else if (ObAdminServerArg::STOP == op || ObAdminServerArg::FORCE_STOP == op) {
637// check again, if there exists stopped servers in other zones
638if (ObRootUtils::have_other_stop_task(server_info.get_zone())) {
639ret = OB_STOP_SERVER_IN_MULTIPLE_ZONES;
640LOG_WARN("can not stop servers in multiple zones", KR(ret), K(server_info.get_zone()));
641LOG_USER_ERROR(OB_STOP_SERVER_IN_MULTIPLE_ZONES,
642"cannot stop server or stop zone in multiple zones");
643}
644}
645if (OB_SUCC(ret)) {
646int64_t new_stop_time = is_start ? 0 : now;
647int64_t old_stop_time = server_info.get_stop_time();
648if ((is_start && 0 != old_stop_time) || (!is_start && 0 == old_stop_time)) {
649if (OB_FAIL(ObServerTableOperator::update_stop_time(
650trans,
651server,
652old_stop_time,
653new_stop_time))) {
654LOG_WARN("fail to update stop_time", KR(ret), K(server), K(old_stop_time), K(new_stop_time));
655}
656}
657LOG_INFO("update stop time", KR(ret), K(server_info),
658K(old_stop_time), K(new_stop_time), K(op), K(is_start));
659}
660const char *op_print_str = is_start ? "start_server" : "stop_server";
661(void) end_trans_and_on_server_change_(ret, trans, op_print_str, server, server_info.get_zone(), now);
662return ret;
663}
664
665int ObServerZoneOpService::construct_rs_list_arg(ObRsListArg &rs_list_arg)
666{
667int ret = OB_SUCCESS;
668ObLSInfo ls_info;
669if (OB_UNLIKELY(!is_inited_)) {
670ret = OB_NOT_INIT;
671LOG_WARN("not init", KR(ret), K(is_inited_));
672} else if (OB_ISNULL(lst_operator_)) {
673ret = OB_ERR_UNEXPECTED;
674LOG_WARN("lst operator is null", KR(ret), KP(lst_operator_));
675} else if (OB_FAIL(lst_operator_->get(
676GCONF.cluster_id,
677OB_SYS_TENANT_ID,
678SYS_LS,
679share::ObLSTable::DEFAULT_MODE,
680ls_info))) {
681LOG_WARN("fail to get ls info", KR(ret));
682} else {
683rs_list_arg.master_rs_ = GCONF.self_addr_;
684FOREACH_CNT_X(replica, ls_info.get_replicas(), OB_SUCC(ret)) {
685if (replica->get_server() == GCONF.self_addr_
686|| (replica->is_in_service()
687&& ObReplicaTypeCheck::is_paxos_replica_V2(replica->get_replica_type()))) {
688if (OB_FAIL(rs_list_arg.rs_list_.push_back(replica->get_server()))) {
689LOG_WARN("fail to push a server into rs list", KR(ret), K(replica->get_server()));
690}
691}
692}
693}
694return ret;
695}
696int ObServerZoneOpService::check_and_update_service_epoch_(ObMySQLTransaction &trans)
697{
698int ret = OB_SUCCESS;
699int64_t service_epoch_in_table = palf::INVALID_PROPOSAL_ID;
700int64_t proposal_id = palf::INVALID_PROPOSAL_ID;
701ObRole role;
702if (OB_UNLIKELY(!is_inited_)) {
703ret = OB_NOT_INIT;
704LOG_WARN("not init", KR(ret), K(is_inited_));
705} else if (OB_FAIL(ObRootUtils::get_proposal_id_from_sys_ls(proposal_id, role))) {
706LOG_WARN("fail to get proposal id from sys ls", KR(ret));
707} else if (ObRole::LEADER != role) {
708ret = OB_NOT_MASTER;
709LOG_WARN("not leader ls", KR(ret), K(proposal_id), K(service_epoch_in_table), K(role));
710} else if (palf::INVALID_PROPOSAL_ID == proposal_id) {
711ret = OB_ERR_UNEXPECTED;
712LOG_WARN("invalid proposal id", KR(ret), K(proposal_id));
713} else if (OB_FAIL(ObServiceEpochProxy::check_and_update_service_epoch(
714trans,
715OB_SYS_TENANT_ID,
716ObServiceEpochProxy::SERVER_ZONE_OP_SERVICE_EPOCH,
717proposal_id))) {
718LOG_WARN("fail to check and update server zone op service epoch", KR(ret), K(proposal_id));
719} else {}
720return ret;
721}
722int ObServerZoneOpService::fetch_new_server_id_(uint64_t &server_id)
723{
724int ret = OB_SUCCESS;
725if (OB_UNLIKELY(!is_inited_)) {
726ret = OB_NOT_INIT;
727LOG_WARN("not init", KR(ret), K(is_inited_));
728} else if (OB_ISNULL(sql_proxy_)) {
729ret = OB_ERR_UNEXPECTED;
730LOG_WARN("invalid sql proxy", KR(ret), KP(sql_proxy_));
731} else {
732uint64_t new_max_id = OB_INVALID_ID;
733ObMaxIdFetcher id_fetcher(*sql_proxy_);
734if (OB_FAIL(id_fetcher.fetch_new_max_id(
735OB_SYS_TENANT_ID,
736OB_MAX_USED_SERVER_ID_TYPE,
737new_max_id))) {
738LOG_WARN("fetch_new_max_id failed", KR(ret));
739} else {
740server_id = new_max_id;
741}
742}
743return ret;
744}
745int ObServerZoneOpService::check_server_have_enough_resource_for_delete_server_(
746const ObIArray<ObAddr> &servers,
747const ObZone &zone)
748{
749int ret = OB_SUCCESS;
750if (OB_UNLIKELY(!is_inited_)) {
751ret = OB_NOT_INIT;
752LOG_WARN("not init", KR(ret), K(is_inited_));
753} else if (OB_ISNULL(unit_manager_) || OB_ISNULL(sql_proxy_)) {
754ret = OB_ERR_UNEXPECTED;
755LOG_WARN("unit_manager_ or sql_proxy_ is null", KR(ret), KP(unit_manager_), KP(sql_proxy_));
756} else {
757ObServerInfoInTable server_info;
758FOREACH_CNT_X(server, servers, OB_SUCC(ret)) {
759server_info.reset();
760if (OB_FAIL(ObServerTableOperator::get(*sql_proxy_, *server, server_info))) {
761LOG_WARN("fail to get server_info in table", KR(ret), KP(sql_proxy_), KPC(server));
762} else if (!zone.is_empty() && server_info.get_zone() != zone) {
763ret = OB_SERVER_ZONE_NOT_MATCH;
764LOG_WARN("the arg zone is not the same as the server's zone in __all_server table", KR(ret),
765K(zone), K(server_info));
766} else if (OB_FAIL(unit_manager_->check_enough_resource_for_delete_server(
767*server, server_info.get_zone()))) {
768LOG_WARN("fail to check enouch resource", KR(ret), KPC(server), K(server_info));
769}
770}//end for each
771}
772return ret;
773}
774int ObServerZoneOpService::check_zone_and_server_(
775const ObIArray<share::ObServerInfoInTable> &servers_info,
776const ObIArray<ObAddr> &servers,
777bool &is_same_zone,
778bool &is_all_stopped)
779{
780int ret = OB_SUCCESS;
781is_same_zone = true;
782is_all_stopped = true;
783if (OB_UNLIKELY(!is_inited_)) {
784ret = OB_NOT_INIT;
785LOG_WARN("not init", KR(ret), K(is_inited_));
786} else {
787ObServerInfoInTable server_info;
788ObZone zone;
789for (int64_t i = 0; i < servers.count() && OB_SUCC(ret) && (is_same_zone || is_all_stopped); i++) {
790const ObAddr &server = servers.at(i);
791server_info.reset();
792if (OB_FAIL(ObRootUtils::find_server_info(servers_info, server, server_info))) {
793LOG_WARN("fail to get server info", KR(ret), K(servers_info), K(server));
794} else if (0 == i) {
795if (OB_FAIL(zone.assign(server_info.get_zone()))) {
796LOG_WARN("fail to assign zone", KR(ret), K(server_info.get_zone()));
797}
798} else if (zone != server_info.get_zone()) {
799is_same_zone = false;
800LOG_WARN("server zone not same", K(zone), K(server_info), K(servers));
801}
802if (OB_FAIL(ret)) {
803} else if (!server_info.is_stopped()) {
804is_all_stopped = false;
805}
806}
807}
808return ret;
809}
810ERRSIM_POINT_DEF(ALL_SERVER_LIST_ERROR);
811void ObServerZoneOpService::end_trans_and_on_server_change_(
812int &ret,
813common::ObMySQLTransaction &trans,
814const char *op_print_str,
815const common::ObAddr &server,
816const ObZone &zone,
817const int64_t start_time)
818{
819int tmp_ret = OB_SUCCESS;
820LOG_INFO("start execute end_trans_and_on_server_change_", KR(ret),
821K(op_print_str), K(server), K(zone), K(start_time));
822if (OB_UNLIKELY(!trans.is_started())) {
823LOG_WARN("the transaction is not started");
824} else {
825if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
826LOG_WARN("fail to commit the transaction", KR(ret), KR(tmp_ret), K(server), K(zone));
827ret = OB_SUCC(ret) ? tmp_ret : ret;
828}
829}
830if (OB_TMP_FAIL(SVR_TRACER.refresh())) {
831LOG_WARN("fail to refresh server tracer", KR(ret), KR(tmp_ret));
832}
833bool no_on_server_change = ALL_SERVER_LIST_ERROR ? true : false;
834if (OB_ISNULL(server_change_callback_)) {
835tmp_ret = OB_ERR_UNEXPECTED;
836LOG_WARN("server_change_callback_ is null", KR(ret), KR(tmp_ret), KP(server_change_callback_));
837ret = OB_SUCC(ret) ? tmp_ret : ret;
838} else if (no_on_server_change) {
839} else if (OB_TMP_FAIL(server_change_callback_->on_server_change())) {
840LOG_WARN("fail to callback on server change", KR(ret), KR(tmp_ret));
841}
842int64_t time_cost = ::oceanbase::common::ObTimeUtility::current_time() - start_time;
843FLOG_INFO(op_print_str, K(server), K(zone), "time cost", time_cost, KR(ret));
844ROOTSERVICE_EVENT_ADD("server", op_print_str, K(server), K(ret));
845}
846}
847}
848