oceanbase
1110 строк · 45.5 Кб
1/**
2* Copyright (c) 2022 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS14#include "ob_heartbeat_service.h"15
16#include "share/ob_define.h"17#include "share/ob_service_epoch_proxy.h"18#include "share/ob_version.h"19#include "share/ob_zone_table_operation.h"20#include "lib/thread/threads.h" // set_run_wrapper21#include "lib/mysqlclient/ob_mysql_transaction.h" // ObMySQLTransaction22#include "lib/utility/ob_unify_serialize.h"23#include "lib/time/ob_time_utility.h"24#include "observer/ob_server_struct.h" // GCTX25#include "logservice/ob_log_base_header.h" // ObLogBaseHeader26#include "logservice/ob_log_handler.h"27#include "storage/tx_storage/ob_ls_service.h"28#include "storage/tx_storage/ob_ls_handle.h"29#include "rootserver/ob_root_utils.h" // get_proposal_id_from_sys_ls30#include "rootserver/ob_rs_event_history_table_operator.h" // ROOTSERVICE_EVENT_ADD31#include "rootserver/ob_root_service.h"32#include "lib/utility/utility.h"33
34namespace oceanbase35{
36using namespace common;37using namespace share;38using observer::ObServerHealthStatus;39namespace rootserver40{
41#define HBS_LOG_INFO(fmt, args...) FLOG_INFO("[HEARTBEAT_SERVICE] " fmt, ##args)42#define HBS_LOG_WARN(fmt, args...) FLOG_WARN("[HEARTBEAT_SERVICE] " fmt, ##args)43#define HBS_LOG_ERROR(fmt, args...) FLOG_ERROR("[HEARTBEAT_SERVICE] " fmt, ##args)44ObHeartbeatService::ObHeartbeatService()45: is_inited_(false),46sql_proxy_(NULL),47srv_rpc_proxy_(NULL),48epoch_id_(palf::INVALID_PROPOSAL_ID),49whitelist_epoch_id_(palf::INVALID_PROPOSAL_ID),50hb_responses_epoch_id_(palf::INVALID_PROPOSAL_ID),51hb_responses_rwlock_(ObLatchIds::HB_RESPONSES_LOCK),52all_servers_info_in_table_rwlock_(ObLatchIds::ALL_SERVERS_INFO_IN_TABLE_LOCK),53all_servers_hb_info_(),54all_servers_info_in_table_(),55inactive_zone_list_(),56hb_responses_(),57need_process_hb_responses_(false)58{
59}
60ObHeartbeatService::~ObHeartbeatService()61{
62}
63bool ObHeartbeatService::is_service_enabled_ = false;64int ObHeartbeatService::init()65{
66int ret = OB_SUCCESS;67int BUCKET_NUM = 1024; // ** FIXME: (linqiucen.lqc) temp. value68sql_proxy_ = GCTX.sql_proxy_;69srv_rpc_proxy_ = GCTX.srv_rpc_proxy_;70lib::ObMemAttr attr(MTL_ID(), "HB_SERVICE");71if (OB_UNLIKELY(is_inited_)) {72ret = OB_INIT_TWICE;73LOG_WARN("has already inited", KR(ret), K(is_inited_));74} else if (MTL_ID() != OB_SYS_TENANT_ID) {75// only create hb service threads in sys tenant76} else if (OB_ISNULL(srv_rpc_proxy_)) {77ret = OB_ERR_UNEXPECTED;78HBS_LOG_ERROR("srv_rpc_proxy_ is null", KR(ret), KP(srv_rpc_proxy_));79} else if (OB_FAIL(ObTenantThreadHelper::create(80"HBService",81lib::TGDefIDs::HeartbeatService,82*this))) {83LOG_WARN("fail to create thread", KR(ret));84} else if (OB_FAIL(ObTenantThreadHelper::start())) {85LOG_WARN("failed to start thread", KR(ret));86} else if (OB_FAIL(all_servers_hb_info_.create(BUCKET_NUM, attr))) {87LOG_WARN("fail to create all_servers_hb_info_", KR(ret));88} else {89{90SpinWLockGuard guard_for_hb_responses(hb_responses_rwlock_);91hb_responses_.reset();92hb_responses_epoch_id_ = palf::INVALID_PROPOSAL_ID;93need_process_hb_responses_ = false;94}95{96SpinWLockGuard guard_for_servers_info(all_servers_info_in_table_rwlock_);97all_servers_info_in_table_.reset();98inactive_zone_list_.reset();99whitelist_epoch_id_ = palf::INVALID_PROPOSAL_ID;100}101all_servers_hb_info_.clear();102all_servers_info_in_table_.set_attr(attr);103inactive_zone_list_.set_attr(attr);104hb_responses_.set_attr(attr);105set_epoch_id_(palf::INVALID_PROPOSAL_ID);106is_inited_ = true;107HBS_LOG_INFO("ObHeartbeatService is inited");108}109// we do not need the returned error code when init110// only try to confirm whether the heartbeat service is enabled as early as possible,111(void) check_is_service_enabled_();112return ret;113}
114int ObHeartbeatService::check_is_service_enabled_()115{
116int ret = OB_SUCCESS;117uint64_t sys_tenant_data_version = 0;118if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, sys_tenant_data_version))) {119LOG_WARN("fail to get sys tenant's min data version", KR(ret));120} else if (sys_tenant_data_version >= DATA_VERSION_4_2_0_0) {121is_service_enabled_ = true;122HBS_LOG_INFO("the heartbeart service is enabled now", K(sys_tenant_data_version), K(is_service_enabled_));123}124return ret;125}
126void ObHeartbeatService::destroy()127{
128{129SpinWLockGuard guard_for_hb_responses(hb_responses_rwlock_);130hb_responses_.reset();131hb_responses_epoch_id_ = palf::INVALID_PROPOSAL_ID;132need_process_hb_responses_ = false;133}134{135SpinWLockGuard guard_for_servers_info(all_servers_info_in_table_rwlock_);136all_servers_info_in_table_.reset();137inactive_zone_list_.reset();138whitelist_epoch_id_ = palf::INVALID_PROPOSAL_ID;139}140is_inited_ = false;141sql_proxy_ = NULL;142srv_rpc_proxy_ = NULL;143set_epoch_id_(palf::INVALID_PROPOSAL_ID);144all_servers_hb_info_.destroy();145HBS_LOG_INFO("ObHeartbeatService is destroyed");146ObTenantThreadHelper::destroy();147}
148
149int ObHeartbeatService::switch_to_leader()150{
151int ret = OB_SUCCESS;152int64_t epoch_id = palf::INVALID_PROPOSAL_ID;153ObRole role;154if (OB_FAIL(ObRootUtils::get_proposal_id_from_sys_ls(epoch_id, role))) {155LOG_WARN("fail to get proposal id from sys ls", KR(ret));156} else if (ObRole::LEADER != role) {157ret = OB_NOT_MASTER;158HBS_LOG_WARN("not master ls", KR(ret), K(epoch_id), K(role));159} else {160if (OB_LIKELY((palf::INVALID_PROPOSAL_ID == epoch_id_ || epoch_id_ < epoch_id)161&& palf::INVALID_PROPOSAL_ID != epoch_id)) {162set_epoch_id_(epoch_id);163} else {164ret = OB_INVALID_ARGUMENT;165LOG_WARN("invalid epoch id", KR(ret), K(epoch_id), K(epoch_id_));166}167}168if (FAILEDx(ObTenantThreadHelper::switch_to_leader())) {169HBS_LOG_WARN("fail to switch to leader", KR(ret));170} else {171HBS_LOG_INFO("switch to leader", KR(ret), K(epoch_id_));172}173return ret;174}
175void ObHeartbeatService::do_work()176{
177int ret = OB_SUCCESS;178if (OB_UNLIKELY(!is_inited_)) {179ret = OB_NOT_INIT;180LOG_WARN("not init", KR(ret), K(is_inited_));181} else if (OB_FAIL(check_upgrade_compat_())) {182LOG_WARN("fail to check upgrade compatibility", KR(ret));183} else {184while (!has_set_stop()) {185uint64_t thread_idx = get_thread_idx();186int64_t thread_cnt = THREAD_COUNT;187if (OB_UNLIKELY(thread_idx >= thread_cnt)) {188ret = OB_ERR_UNEXPECTED;189HBS_LOG_ERROR("unexpected thread_idx", KR(ret), K(thread_idx), K(thread_cnt));190} else {191if (0 == thread_idx) {192ObCurTraceId::init(GCONF.self_addr_);193if (OB_FAIL(send_heartbeat_())) {194LOG_WARN("fail to send heartbeat", KR(ret));195}196} else { // 1 == thread_idx197ObCurTraceId::init(GCONF.self_addr_);198if (OB_FAIL(manage_heartbeat_())) {199LOG_WARN("fail to manage heartbeat", KR(ret));200}201}202if(OB_FAIL(ret)) {203idle(HB_FAILED_IDLE_TIME_US);204} else {205idle(HB_IDLE_TIME_US);206}207}208} // end while209}210}
211int ObHeartbeatService::check_upgrade_compat_()212{
213int ret = OB_SUCCESS;214while (!is_service_enabled_ && !has_set_stop()) {215if (OB_FAIL(check_is_service_enabled_())) {216LOG_WARN("fail to check whether the heartbeat service is enabled", KR(ret));217}218idle(HB_IDLE_TIME_US);219}220if (has_set_stop()) {221ret = OB_NOT_MASTER;222LOG_WARN("not leader", KR(ret));223}224return ret;225}
226int ObHeartbeatService::send_heartbeat_()227{
228int ret = OB_SUCCESS;229ObHBRequestArray hb_requests;230int64_t tmp_whitelist_epoch_id = palf::INVALID_PROPOSAL_ID;231if (OB_UNLIKELY(!is_inited_)) {232ret = OB_NOT_INIT;233LOG_WARN("not init", KR(ret), K(is_inited_));234} else if (OB_ISNULL(srv_rpc_proxy_)) {235ret = OB_ERR_UNEXPECTED;236HBS_LOG_ERROR("srv_rpc_proxy_ is null", KR(ret), KP(srv_rpc_proxy_));237} else {238ObTimeGuard time_guard("ObHeartbeatService::send_heartbeat_", 2 * 1000 * 1000);239// step 1: prepare hb_requests based on the whitelist240if (OB_FAIL(prepare_hb_requests_(hb_requests, tmp_whitelist_epoch_id))) {241LOG_WARN("fail to prepare heartbeat requests", KR(ret));242} else if (hb_requests.count() <= 0) {243LOG_INFO("no heartbeat request needs to be sent");244} else {245time_guard.click("end prepare_hb_requests");246ObSendHeartbeatProxy proxy(*srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::handle_heartbeat);247int64_t timeout = GCONF.rpc_timeout; // default value is 2s248int tmp_ret = OB_SUCCESS;249ObArray<int> return_ret_array;250// step 2: send hb_requests to all servers in the whitelist251for (int64_t i = 0; i < hb_requests.count(); i++) {252if (OB_TMP_FAIL(proxy.call(253hb_requests.at(i).get_server(),254timeout,255GCONF.cluster_id,256OB_SYS_TENANT_ID,257hb_requests.at(i)))) {258// error code will be ignored here.259// send rpc to some offline servers will return error, however, it's acceptable260LOG_WARN("fail to send heartbeat rpc", KR(ret), KR(tmp_ret), K(hb_requests.at(i)));261}262}263// step 3: wait hb_responses264if (OB_TMP_FAIL(proxy.wait_all(return_ret_array))) {265LOG_WARN("fail to wait all batch result", KR(ret), KR(tmp_ret));266ret = OB_SUCC(ret) ? tmp_ret : ret;267}268time_guard.click("end wait_hb_responses");269// step 4: save hb_responses270if (FAILEDx(set_hb_responses_(tmp_whitelist_epoch_id, &proxy))) {271LOG_WARN("fail to set hb_responses", KR(ret));272}273time_guard.click("end set_hb_responses");274}275}276FLOG_INFO("send_heartbeat_ has finished one round", KR(ret));277return ret;278}
279int ObHeartbeatService::set_hb_responses_(const int64_t whitelist_epoch_id, ObSendHeartbeatProxy *proxy)280{
281int ret = OB_SUCCESS;282if (OB_UNLIKELY(!is_inited_)) {283ret = OB_NOT_INIT;284LOG_WARN("not init", KR(ret), K(is_inited_));285} else if (OB_ISNULL(proxy)) {286ret = OB_ERR_UNEXPECTED;287LOG_WARN("proxy is null", KR(ret), KP(proxy));288} else if (OB_UNLIKELY(proxy->get_dests().count() != proxy->get_results().count())) {289ret = OB_ERR_UNEXPECTED;290LOG_WARN("dest addr count != result count", KR(ret), "dest addr count", proxy->get_dests().count(),291"result count", proxy->get_results().count());292} else {293int tmp_ret = OB_SUCCESS;294SpinWLockGuard guard_for_hb_responses(hb_responses_rwlock_);295need_process_hb_responses_ = true;296hb_responses_epoch_id_ = whitelist_epoch_id;297hb_responses_.reset();298// don't use arg/dest here because call() may has failue.299ARRAY_FOREACH_X(proxy->get_results(), idx, cnt, OB_SUCC(ret)) {300const ObHBResponse *hb_response = proxy->get_results().at(idx);301const ObAddr &dest_addr = proxy->get_dests().at(idx);302if (OB_ISNULL(hb_response)) {303tmp_ret = OB_ERR_UNEXPECTED;304LOG_WARN("hb_response is null", KR(ret), KR(tmp_ret), KP(hb_response));305} else if (OB_UNLIKELY(!hb_response->is_valid())) {306// if an observer does not reply the rpc, we will get an invalid hb_response.307tmp_ret = OB_INVALID_ARGUMENT;308LOG_WARN("There exists a server not responding to the hb service",309KR(ret), KR(tmp_ret), KPC(hb_response), K(dest_addr));310} else if (OB_FAIL(hb_responses_.push_back(*hb_response))) {311LOG_WARN("fail to push an element into hb_responses_", KR(ret), KPC(hb_response));312} else {313LOG_TRACE("receive a heartbeat response", KPC(hb_response));314}315}316}317return ret;318}
319int ObHeartbeatService::get_and_reset_hb_responses_(320ObHBResponseArray &hb_responses,321int64_t &hb_responses_epoch_id)322{
323int ret = OB_SUCCESS;324// set hb_responses = hb_responses_325// locking hb_responses_ too long will block send_heartbeat()326// therefore we process hb_responses rather than hb_responses_327hb_responses.reset();328hb_responses_epoch_id = palf::INVALID_PROPOSAL_ID;329SpinWLockGuard guard_for_hb_responses(hb_responses_rwlock_);330if (need_process_hb_responses_) {331if (OB_FAIL(hb_responses.assign(hb_responses_))) {332LOG_WARN("fail to assign tmp_hb_responses", KR(ret), K(hb_responses_));333} else {334need_process_hb_responses_ = false;335hb_responses_epoch_id = hb_responses_epoch_id_;336hb_responses_epoch_id_ = palf::INVALID_PROPOSAL_ID;337hb_responses_.reset();338}339} else {340ret = OB_NEED_WAIT;341LOG_WARN("currently there are no hb_responses need to be proccessed", KR(ret));342}343return ret;344}
345int ObHeartbeatService::prepare_hb_requests_(ObHBRequestArray &hb_requests, int64_t &whitelist_epoch_id)346{
347int ret = OB_SUCCESS;348hb_requests.reset();349if (OB_UNLIKELY(!is_inited_)) {350ret = OB_NOT_INIT;351LOG_WARN("not init", KR(ret), K(is_inited_));352} else {353// ensure when we prepare hb_requests,354// we should mark these hb_requests are based on which whitelist.355// In other words, we should mark the whitelist's corresponding whitelist_epoch_id_.356SpinRLockGuard guard_for_servers_info(all_servers_info_in_table_rwlock_);357ObHBRequest hb_request;358whitelist_epoch_id = whitelist_epoch_id_;359ARRAY_FOREACH_X(all_servers_info_in_table_, idx, cnt, OB_SUCC(ret)) {360const ObServerInfoInTable &server_info = all_servers_info_in_table_.at(idx);361bool is_stopped = false;362if (OB_UNLIKELY(!server_info.is_valid())) {363ret = OB_ERR_UNEXPECTED;364HBS_LOG_WARN("invalid server info in table", KR(ret), K(server_info));365} else {366if (server_info.is_stopped() || has_exist_in_array(inactive_zone_list_, server_info.get_zone())) {367is_stopped = true;368}369}370if (OB_SUCC(ret)) {371hb_request.reset();372if (OB_FAIL(hb_request.init(373server_info.get_server(),374server_info.get_server_id(),375GCTX.self_addr(),376is_stopped ? RSS_IS_STOPPED : RSS_IS_WORKING,377whitelist_epoch_id))) {378LOG_WARN("fail to init hb_request", KR(ret), K(server_info), K(is_stopped),379K(GCTX.self_addr()), K(whitelist_epoch_id));380} else if (OB_FAIL(hb_requests.push_back(hb_request))) {381LOG_WARN("fail to push an element into hb_requests", KR(ret), K(hb_request));382} else {}383}384}385}386return ret;387}
388int ObHeartbeatService::manage_heartbeat_()389{
390int ret = OB_SUCCESS;391if (OB_UNLIKELY(!is_inited_)) {392ret = OB_NOT_INIT;393LOG_WARN("not init", KR(ret), K(is_inited_));394} else {395ObTimeGuard time_guard("ObHeartbeatService::manage_heartbeat_", 2 * 1000 * 1000);396int tmp_ret = OB_SUCCESS;397if (OB_TMP_FAIL(prepare_whitelist_())) {398ret = OB_SUCC(ret) ? tmp_ret : ret;399LOG_WARN("fail to prepare whitelist", KR(ret), KR(tmp_ret));400}401time_guard.click("end prepare_whitelist");402if (OB_TMP_FAIL(process_hb_responses_())) {403ret = OB_SUCC(ret) ? tmp_ret : ret;404LOG_WARN("fail to prepare heartbeat response", KR(ret), KR(tmp_ret));405}406time_guard.click("end process_hb_responses");407}408FLOG_INFO("manage_heartbeat_ has finished one round", KR(ret));409return ret;410}
411int ObHeartbeatService::prepare_whitelist_()412{
413int ret = OB_SUCCESS;414int64_t epoch_id = get_epoch_id_();415int64_t persistent_epoch_id = palf::INVALID_PROPOSAL_ID;416ObServerInfoInTableArray tmp_all_servers_info_in_table;417ObArray<ObZone> tmp_inactive_zone_list;418if (OB_UNLIKELY(!is_inited_)) {419ret = OB_NOT_INIT;420LOG_WARN("not init", KR(ret), K(is_inited_));421} else if (OB_ISNULL(sql_proxy_)) {422ret = OB_ERR_UNEXPECTED;423LOG_WARN("sql_proxy_ is null", KR(ret), KP(sql_proxy_));424} else if (OB_FAIL(check_or_update_service_epoch_(epoch_id))) {425LOG_WARN("fail to check or update service epoch", KR(ret), K(epoch_id));426} else if (OB_FAIL(ObServerTableOperator::get(*sql_proxy_, tmp_all_servers_info_in_table))) {427// It is possible that heartbeat_service_epoch is changed while we are reading __all_server table428// It's acceptable, since we cannot update __all_server table when we hold the old heartbeat_service_epoch429LOG_WARN("fail to read __all_server table", KR(ret), KP(sql_proxy_));430} else if (OB_FAIL(ObZoneTableOperation::get_inactive_zone_list(*sql_proxy_, tmp_inactive_zone_list))) {431LOG_WARN("fail to get inactive zone list", KR(ret), KP(sql_proxy_));432} else {433SpinWLockGuard guard_for_servers_info(all_servers_info_in_table_rwlock_);434whitelist_epoch_id_ = epoch_id;435if (OB_FAIL(all_servers_info_in_table_.assign(tmp_all_servers_info_in_table))) {436all_servers_info_in_table_.reset();437whitelist_epoch_id_ = palf::INVALID_PROPOSAL_ID;438LOG_WARN("fail to assign all_servers_info_in_table_", KR(ret), K(tmp_all_servers_info_in_table));439} else if (OB_FAIL(inactive_zone_list_.assign(tmp_inactive_zone_list))) {440LOG_WARN("fail to assign inactive_zone_list_",KR(ret), K(tmp_inactive_zone_list));441}442}443return ret;444}
445int ObHeartbeatService::check_or_update_service_epoch_(const int64_t epoch_id)446{
447// if persistent_epoch_id == epoch_id: check ok.448// if persistent_epoch_id < epoch_id: update heartbeat_service_epoch in __all_service_epoch table449// if the updation is successful, check ok.450// if persistent_epoch_id > epoch_id: return error OB_NOT_MASTER451int ret = OB_SUCCESS;452int64_t persistent_epoch_id = palf::INVALID_PROPOSAL_ID;453if (OB_UNLIKELY(!is_inited_)) {454ret = OB_NOT_INIT;455LOG_WARN("not init", KR(ret), K(is_inited_));456} else if (OB_ISNULL(sql_proxy_)) {457ret = OB_ERR_UNEXPECTED;458LOG_WARN("sql_proxy_ is null", KR(ret), KP(sql_proxy_));459} else if (OB_FAIL(ObServiceEpochProxy::get_service_epoch(460*sql_proxy_,461OB_SYS_TENANT_ID,462ObServiceEpochProxy::HEARTBEAT_SERVICE_EPOCH,463persistent_epoch_id))) {464LOG_WARN("fail to get heartbeat service epoch", KR(ret),KP(sql_proxy_));465} else if (palf::INVALID_PROPOSAL_ID == persistent_epoch_id || palf::INVALID_PROPOSAL_ID == epoch_id) {466ret = OB_ERR_UNEXPECTED;467LOG_WARN("epoch id is unexpectedly invalid", KR(ret), K(persistent_epoch_id), K(epoch_id));468} else if (persistent_epoch_id > epoch_id) {469ret = OB_NOT_MASTER;470HBS_LOG_WARN("persistent_epoch_id is greater than epoch_id, which means this server is not leader",471KR(ret), K(persistent_epoch_id), K(epoch_id));472} else if (persistent_epoch_id < epoch_id) {473HBS_LOG_INFO("persistent_epoch_id is smaller than epoch_id", K(persistent_epoch_id), K(epoch_id));474common::ObMySQLTransaction trans;475if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {476LOG_WARN("fail to start trans", KR(ret));477} else if (OB_FAIL(ObServiceEpochProxy::check_and_update_service_epoch(478trans,479OB_SYS_TENANT_ID,480ObServiceEpochProxy::HEARTBEAT_SERVICE_EPOCH,481epoch_id))) {482LOG_WARN("fail to check and update service epoch", KR(ret), KP(sql_proxy_), K(epoch_id));483}484if (OB_UNLIKELY(!trans.is_started())) {485LOG_WARN("the transaction is not started");486} else {487int tmp_ret = OB_SUCCESS;488if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {489LOG_WARN("fail to commit the transaction", KR(ret), KR(tmp_ret));490ret = OB_SUCC(ret) ? tmp_ret : ret;491}492if (OB_FAIL(ret)) {493LOG_WARN("fail to update __all_service_epoch table", KR(ret));494}495}496// we do not care whether the table is updated successfully497// we always reset all_servers_info_in_table_ and all_servers_hb_info_498SpinWLockGuard guard_for_servers_info(all_servers_info_in_table_rwlock_);499all_servers_info_in_table_.reset();500whitelist_epoch_id_ = palf::INVALID_PROPOSAL_ID;501all_servers_hb_info_.clear();502} else {} // persistent_epoch_id = epoch_id, do nothing.503return ret;504}
505int ObHeartbeatService::process_hb_responses_()506{
507int ret = OB_SUCCESS;508ObHBResponseArray tmp_hb_responses;509const int64_t now = ObTimeUtility::current_time();510int64_t tmp_hb_responses_epoch_id = palf::INVALID_PROPOSAL_ID;511common::ObArray<common::ObZone> zone_list;512if (OB_UNLIKELY(!is_inited_)) {513ret = OB_NOT_INIT;514LOG_WARN("not init", KR(ret), K(is_inited_));515} else if (OB_ISNULL(sql_proxy_)) {516ret = OB_ERR_UNEXPECTED;517HBS_LOG_ERROR("sql_proxy_ is null", KR(ret), KP(sql_proxy_));518} else if (OB_FAIL(get_and_reset_hb_responses_(tmp_hb_responses, tmp_hb_responses_epoch_id))) {519LOG_WARN("fail to get and reset hb_responses", KR(ret));520} else if (OB_FAIL(ObZoneTableOperation::get_zone_list(*sql_proxy_, zone_list))) {521LOG_WARN("fail to get zone list", KR(ret));522} else {523// Here we do not need to lock all_servers_info_in_table_.524// There are two threads in heartbeat service.525// Prepare_whitelist() will modify all_servers_info_in_table_,526// But prepare_whitelist() and this func. are in the same thread.527// In another thread, send_heartbeat() only reads server_ and server_id_ in all_servers_info_in_table_528int tmp_ret = OB_SUCCESS;529for (int64_t i = 0; i < all_servers_info_in_table_.count(); i++) {530// note: we can only update __all_server table successfully when hb_responses_epoch_id is531// equal to current heartbeat_service_epoch in __all_service_epoch table.532// It means that our whitelist (all_servers_info_in_table_) is not outdated.533if (OB_TMP_FAIL(check_server_(534tmp_hb_responses,535all_servers_info_in_table_.at(i),536zone_list,537now,538tmp_hb_responses_epoch_id))) {539LOG_WARN("fail to check server", KR(ret), KR(tmp_ret),540K(all_servers_info_in_table_.at(i)), K(now), K(tmp_hb_responses_epoch_id));541}542}543if (FAILEDx(clear_deleted_servers_in_all_servers_hb_info_())) {544LOG_WARN("fail to clear deleted servers in all_servers_hb_info_", KR(ret));545}546}547return ret;548}
549int ObHeartbeatService::check_server_(550const ObHBResponseArray &hb_responses,551const share::ObServerInfoInTable &server_info_in_table,552const common::ObArray<common::ObZone> &zone_list,553const int64_t now,554const int64_t hb_responses_epoch_id)555{
556int ret = OB_SUCCESS;557ObServerHBInfo server_hb_info;558if (OB_UNLIKELY(!is_inited_)) {559ret = OB_NOT_INIT;560LOG_WARN("not init", KR(ret), K(is_inited_));561} else if (OB_UNLIKELY(!server_info_in_table.is_valid()562|| now <= 0563|| palf::INVALID_PROPOSAL_ID == hb_responses_epoch_id)) {564ret = OB_INVALID_ARGUMENT;565LOG_WARN("invalied argument", KR(ret), K(server_info_in_table), K(now), K(hb_responses_epoch_id));566} else if (OB_FAIL(all_servers_hb_info_.get_refactored(server_info_in_table.get_server(), server_hb_info))) {567LOG_WARN("fail to get server_hb_info, or get an old server_hb_info", KR(ret),568K(server_info_in_table.get_server()), K(server_hb_info));569if (OB_HASH_NOT_EXIST == ret) {570if (OB_FAIL(init_server_hb_info_(now, server_info_in_table, server_hb_info))) {571LOG_WARN("fail to init server_hb_info", KR(ret), K(server_info_in_table), K(now));572} else if (OB_FAIL(all_servers_hb_info_.set_refactored(573server_hb_info.get_server(),574server_hb_info,5750 /* flag: 0 shows that not cover existing object. */))) {576LOG_WARN("fail to push an element into all_servers_hb_info_", KR(ret), K(server_hb_info));577} else {}578}579}580if (OB_SUCC(ret)) {581// check whether the heartbeat response from server_info_in_table.get_server() is received582int64_t idx = OB_INVALID_INDEX_INT64;583if (!has_server_exist_in_array_(hb_responses, server_info_in_table.get_server(), idx)) {584// heartbeat response is not received585if (OB_FAIL(check_server_without_hb_response_(586now,587server_info_in_table,588hb_responses_epoch_id,589server_hb_info))) {590LOG_WARN("fail to check the server without heartbeat response", KR(ret),591K(server_info_in_table), K(now), K(hb_responses_epoch_id));592}593} else if (OB_UNLIKELY(!hb_responses.at(idx).is_valid())) {594ret = OB_ERR_UNEXPECTED;595HBS_LOG_WARN("there exists an invalid element in hb_responses", KR(ret),596K(hb_responses.at(idx)));597} else if (OB_FAIL(check_server_with_hb_response_(598hb_responses.at(idx),599server_info_in_table,600zone_list,601now,602hb_responses_epoch_id,603server_hb_info))) { // heartbeat response is received604LOG_WARN("fail to check the server with heartbeat response", KR(ret),605K(hb_responses.at(idx)), K(server_info_in_table), K(now), K(hb_responses_epoch_id));606}607}608return ret;609}
610int ObHeartbeatService::check_server_without_hb_response_(611const int64_t now,612const share::ObServerInfoInTable &server_info_in_table,613const int64_t hb_responses_epoch_id,614ObServerHBInfo &server_hb_info)615{
616int ret = OB_SUCCESS;617if (OB_UNLIKELY(!is_inited_)) {618ret = OB_NOT_INIT;619LOG_WARN("not init", KR(ret), K(is_inited_));620} else if (OB_UNLIKELY(now <= 0621|| !server_info_in_table.is_valid()622|| !server_hb_info.is_valid()623|| palf::INVALID_PROPOSAL_ID == hb_responses_epoch_id)) {624ret = OB_INVALID_ARGUMENT;625LOG_WARN("invalid argument", KR(ret), K(now), K(server_info_in_table),626K(server_hb_info), K(hb_responses_epoch_id));627} else if (OB_FAIL(update_server_hb_info_(628now,629false, /* hb_response_exists */630server_hb_info))) {631LOG_WARN("fail to update server_hb_info", KR(ret), K(now), K(server_info_in_table),632K(server_hb_info));633} else if ((now - server_hb_info.get_last_hb_time() > GCONF.lease_time634&& 0 == server_info_in_table.get_last_offline_time())) {635if (OB_FAIL(update_table_for_online_to_offline_server_(636server_info_in_table,637now,638hb_responses_epoch_id))) {639LOG_WARN("fail to update table for online to offline server",640KR(ret), K(server_info_in_table), K(now), K(hb_responses_epoch_id));641} else {642const ObAddr &server = server_info_in_table.get_server();643ROOTSERVICE_EVENT_ADD("server", "last_offline_time set", "server", server);644}645} else {}646return ret;647}
648int ObHeartbeatService::update_table_for_online_to_offline_server_(649const share::ObServerInfoInTable &server_info_in_table,650const int64_t now,651const int64_t hb_responses_epoch_id)652{
653int ret = OB_SUCCESS;654common::ObMySQLTransaction trans;655bool is_match = false;656if (OB_UNLIKELY(!is_inited_)) {657ret = OB_NOT_INIT;658LOG_WARN("not init", KR(ret), K(is_inited_));659} else if (OB_ISNULL(sql_proxy_)) {660ret = OB_ERR_UNEXPECTED;661LOG_ERROR("sql_proxy_ is null", KR(ret), KP(sql_proxy_));662} else if (OB_UNLIKELY(!server_info_in_table.is_valid()663|| now <= 0664|| palf::INVALID_PROPOSAL_ID == hb_responses_epoch_id)) {665ret = OB_INVALID_ARGUMENT;666LOG_WARN("invalid argument", KR(ret), K(server_info_in_table), K(now), K(hb_responses_epoch_id));667} else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {668LOG_WARN("fail to start trans", KR(ret));669} else if (OB_FAIL(ObServiceEpochProxy::check_service_epoch_with_trans(670trans,671OB_SYS_TENANT_ID,672ObServiceEpochProxy::HEARTBEAT_SERVICE_EPOCH,673hb_responses_epoch_id,674is_match))) {675LOG_WARN("fail to check and update service epoch", KR(ret), K(hb_responses_epoch_id));676} else if (OB_UNLIKELY(!is_match)) {677ret = OB_NOT_MASTER;678LOG_WARN("hb_responses_epoch_id is not the same as persistent heartbeat service epoch id", KR(ret));679} else {680if (OB_FAIL(ObServerTableOperator::update_table_for_online_to_offline_server(681trans,682server_info_in_table.get_server(),683ObServerStatus::OB_SERVER_DELETING == server_info_in_table.get_status(), /* is_deleting */684now /*last_offline_time */))) {685LOG_WARN("fail to update __all_server table for online to offline server", KR(ret),686K(server_info_in_table), K(now));687}688}689int tmp_ret = OB_SUCCESS;690if (OB_TMP_FAIL(end_trans_and_refresh_server_(server_info_in_table.get_server(),691OB_SUCC(ret), trans))) {692LOG_WARN("failed to end trans", KR(ret), K(tmp_ret), K(server_info_in_table));693ret = OB_SUCC(ret) ? tmp_ret : ret;694}695return ret;696}
697
698int ObHeartbeatService::end_trans_and_refresh_server_(699const ObAddr &server,700const bool commit,701common::ObMySQLTransaction &trans)702{
703int ret = OB_SUCCESS;704if (OB_UNLIKELY(!server.is_valid())) {705ret = OB_INVALID_ARGUMENT;706LOG_WARN("server is invalid", KR(ret), K(server));707} else if (!trans.is_started()) {708LOG_WARN("the transaction is not started");709} else {710int tmp_ret = OB_SUCCESS;711if (OB_FAIL(trans.end(commit))) {712HBS_LOG_WARN("fail to commit the transaction", KR(ret),713K(server), K(commit));714}715//ignore error of refresh and on server_status_change716if (OB_TMP_FAIL(SVR_TRACER.refresh())) {717LOG_WARN("fail to refresh server tracer", KR(ret), KR(tmp_ret));718}719if (OB_ISNULL(GCTX.root_service_)) {720tmp_ret = OB_ERR_UNEXPECTED;721LOG_WARN("GCTX.root_service_ is null", KR(ret), KR(tmp_ret), KP(GCTX.root_service_));722} else if (OB_TMP_FAIL(GCTX.root_service_->get_status_change_cb().on_server_status_change(server))) {723LOG_WARN("fail to execute on_server_status_change", KR(ret), KR(tmp_ret), K(server));724}725}726return ret;727}
728
729int ObHeartbeatService::init_server_hb_info_(730const int64_t now,731const share::ObServerInfoInTable &server_info_in_table,732ObServerHBInfo &server_hb_info)733{
734int ret = OB_SUCCESS;735const ObServerStatus::DisplayStatus &display_status = server_info_in_table.get_status();736const int64_t last_offline_time = server_info_in_table.get_last_offline_time();737const ObAddr &server = server_info_in_table.get_server();738int64_t last_hb_time = 0;739ObServerStatus::HeartBeatStatus hb_status = ObServerStatus::OB_HEARTBEAT_MAX;740
741server_hb_info.reset();742if (OB_UNLIKELY(!is_inited_)) {743ret = OB_NOT_INIT;744LOG_WARN("not init", KR(ret), K(is_inited_));745} else if (OB_UNLIKELY(!server_info_in_table.is_valid()746|| now - last_offline_time < 0)) {747ret = OB_INVALID_ARGUMENT;748LOG_WARN("invalid argument", KR(ret), K(now), K(server_info_in_table));749} else {750if (0 == last_offline_time) { // online, the status is active or deleting751last_hb_time = now;752hb_status = ObServerStatus::OB_HEARTBEAT_ALIVE;753} else { // last_offline_time > 0, offline, the status is inactive or deleting754last_hb_time = last_offline_time - GCONF.lease_time;755hb_status = ObServerStatus::OB_HEARTBEAT_LEASE_EXPIRED;756if (now - last_hb_time >= GCONF.server_permanent_offline_time) {757hb_status = ObServerStatus::OB_HEARTBEAT_PERMANENT_OFFLINE;758}759}760if (FAILEDx(server_hb_info.init(server, last_hb_time, hb_status))) {761LOG_WARN("fail to init server_hb_info", KR(ret), K(server), K(last_hb_time), K(hb_status));762} else {763LOG_INFO("new server_hb_info is generated", K(server_hb_info));764}765}766return ret;767}
768int ObHeartbeatService::check_server_with_hb_response_(769const ObHBResponse &hb_response,770const share::ObServerInfoInTable &server_info_in_table,771const common::ObArray<common::ObZone> &zone_list,772const int64_t now,773const int64_t hb_responses_epoch_id,774ObServerHBInfo &server_hb_info)775{
776int ret = OB_SUCCESS;777if (OB_UNLIKELY(!is_inited_)) {778ret = OB_NOT_INIT;779LOG_WARN("not init", KR(ret), K(is_inited_));780} else if (OB_UNLIKELY(now <= 0781|| !server_hb_info.is_valid())782|| palf::INVALID_PROPOSAL_ID == hb_responses_epoch_id) {783ret = OB_INVALID_ARGUMENT;784LOG_WARN("invalid argument", KR(ret), K(server_info_in_table), K(hb_response),785K(now), K(server_hb_info), K(hb_responses_epoch_id));786} else if (OB_FAIL(check_if_hb_response_can_be_processed_(787hb_response,788server_info_in_table,789zone_list))) {790// the validity of hb_response and server_info_in_table is also checked here791LOG_WARN("hb_response cannot be processed", KR(ret), K(hb_response),792K(server_info_in_table), K(zone_list));793}794if (OB_SUCC(ret)) {795if ((!server_info_in_table.get_with_rootserver() && hb_response.get_server() == GCTX.self_addr())796|| 0 != server_info_in_table.get_last_offline_time()797|| server_info_in_table.get_build_version() != hb_response.get_build_version()798|| server_info_in_table.get_start_service_time() != hb_response.get_start_service_time()) {799if (OB_FAIL(update_table_for_server_with_hb_response_(800hb_response,801server_info_in_table,802hb_responses_epoch_id))) {803LOG_WARN("fail to update table for server with hb_response", KR(ret), K(hb_response),804K(server_info_in_table), K(hb_responses_epoch_id));805}806}807if (FAILEDx(check_and_execute_start_or_stop_server_(808hb_response,809server_hb_info,810server_info_in_table))) {811LOG_WARN("fail to check and execute start or stop server", KR(ret),812K(hb_response), K(server_info_in_table));813}814if (FAILEDx(server_hb_info.set_server_health_status(hb_response.get_server_health_status()))) {815LOG_WARN("fail to set server_health_status", KR(ret), K(hb_response.get_server_health_status()));816} else if (OB_FAIL(update_server_hb_info_(817now,818true, /* hb_response_exists*/819server_hb_info))) {820LOG_WARN("fail to get and update server_hb_info", KR(ret), K(hb_response),821K(server_info_in_table), K(now));822}823}824return ret;825}
826int ObHeartbeatService::check_if_hb_response_can_be_processed_(827const ObHBResponse &hb_response,828const share::ObServerInfoInTable &server_info_in_table,829const common::ObArray<common::ObZone> &zone_list) const830{
831int ret = OB_SUCCESS;832if (OB_UNLIKELY(!is_inited_)) {833ret = OB_NOT_INIT;834LOG_WARN("not init", KR(ret), K(is_inited_));835} else if (OB_UNLIKELY(!server_info_in_table.is_valid()836|| !hb_response.is_valid()837|| server_info_in_table.get_server() != hb_response.get_server())) {838ret = OB_INVALID_ARGUMENT;839LOG_WARN("invalid argument", KR(ret), K(server_info_in_table), K(hb_response));840} else if (server_info_in_table.get_zone() != hb_response.get_zone()) {841ret = OB_SERVER_ZONE_NOT_MATCH;842HBS_LOG_ERROR("server's zone does not match", KR(ret), K(server_info_in_table.get_zone()),843K(hb_response.get_zone()));844} else if (server_info_in_table.get_sql_port() != hb_response.get_sql_port()) {845ret = OB_ERR_UNEXPECTED;846HBS_LOG_ERROR("unexpexted error: server's sql port has changed!", KR(ret),847K(server_info_in_table), K(hb_response));848} else {849bool zone_exists = false;850for (int64_t i = 0; !zone_exists && i < zone_list.count(); i++) {851if (zone_list.at(i) == hb_response.get_zone()) {852zone_exists = true;853}854}855if (OB_UNLIKELY(!zone_exists)) {856ret = OB_ZONE_INFO_NOT_EXIST;857HBS_LOG_ERROR("zone info not exist", KR(ret), K(hb_response.get_zone()), K(zone_list));858}859}860return ret;861}
862int ObHeartbeatService::check_and_execute_start_or_stop_server_(863const ObHBResponse &hb_response,864const ObServerHBInfo &server_hb_info,865const share::ObServerInfoInTable &server_info_in_table)866{
867int ret = OB_SUCCESS;868char ip[OB_MAX_SERVER_ADDR_SIZE] = "";869const ObAddr &server = hb_response.get_server();870if (OB_UNLIKELY(!is_inited_)) {871ret = OB_NOT_INIT;872LOG_WARN("not init", KR(ret), K(is_inited_));873} else if (OB_ISNULL(sql_proxy_)) {874ret = OB_ERR_UNEXPECTED;875HBS_LOG_ERROR("sql_proxy_ is null", KR(ret), KP(sql_proxy_));876} else if (OB_UNLIKELY(!hb_response.is_valid()877|| !server_info_in_table.is_valid()878|| hb_response.get_server() != server_info_in_table.get_server()879|| !server.is_valid()880|| !server.ip_to_string(ip, sizeof(ip)))) {881ret = OB_INVALID_ARGUMENT;882LOG_WARN("invalid argument", KR(ret), K(server), K(hb_response), K(server_info_in_table));883} else {884const ObServerHealthStatus &health_status = hb_response.get_server_health_status();885bool need_start_or_stop_server = false;886bool is_start = false;887int64_t affected_rows = 0;888ObSqlString sql;889if (server_hb_info.get_server_health_status() != hb_response.get_server_health_status()) {890if (0 == server_info_in_table.get_stop_time() && !health_status.is_healthy()) {891is_start = false;892need_start_or_stop_server = true;893}894if (0 != server_info_in_table.get_stop_time() && health_status.is_healthy()) {895is_start = true;896need_start_or_stop_server = true;897}898}899if (OB_SUCC(ret) && need_start_or_stop_server) {900if (is_start) {901ROOTSERVICE_EVENT_ADD("server", "disk error repaired, start server", "server", server);902HBS_LOG_INFO("disk error repaired, try to start server", K(server), K(health_status));903ret = sql.assign_fmt("ALTER SYSTEM START SERVER '%s:%d'", ip, server.get_port());904} else {905ROOTSERVICE_EVENT_ADD("server", "disk error, stop server", "server", server);906HBS_LOG_INFO("disk error, try to stop server", K(server), K(health_status));907ret = sql.assign_fmt("ALTER SYSTEM STOP SERVER '%s:%d'", ip, server.get_port());908}909if (OB_FAIL(ret)) {910LOG_WARN("fail to assign fmt", KR(ret), K(server), K(is_start));911} else if (OB_FAIL(sql_proxy_->write(sql.ptr(), affected_rows))) {912LOG_WARN("fail to write sql", KR(ret),K(server), K(sql));913} else {914HBS_LOG_INFO("start or stop server successfully", K(server), K(is_start));915}916}917}918return ret;919}
920int ObHeartbeatService::update_server_hb_info_(921const int64_t now,922const bool hb_response_exists,923ObServerHBInfo &server_hb_info)924{
925int ret = OB_SUCCESS;926if (OB_UNLIKELY(!is_inited_)) {927ret = OB_NOT_INIT;928LOG_WARN("not init", KR(ret), K(is_inited_));929} else if (OB_UNLIKELY(now <= 0930|| !server_hb_info.is_valid())) {931ret = OB_INVALID_ARGUMENT;932LOG_WARN("invalid argument", KR(ret), K(now), K(server_hb_info));933} else {934const ObServerStatus::HeartBeatStatus& hb_status = server_hb_info.get_hb_status();935const ObAddr& server = server_hb_info.get_server();936// step 1: update last_hb_time937if (hb_response_exists) {938server_hb_info.set_last_hb_time(now);939}940int64_t time_diff = now - server_hb_info.get_last_hb_time();941// step 2: update hb_status942if (time_diff >= GCONF.server_permanent_offline_time943&& ObServerStatus::OB_HEARTBEAT_PERMANENT_OFFLINE != hb_status) {944server_hb_info.set_hb_status(ObServerStatus::OB_HEARTBEAT_PERMANENT_OFFLINE);945ROOTSERVICE_EVENT_ADD("server", "permanent_offline", "server", server);946HBS_LOG_INFO("the server becomes permanent offline", K(server), K(time_diff));947} else if (time_diff >= GCONF.lease_time948&& ObServerStatus::OB_HEARTBEAT_LEASE_EXPIRED != hb_status949&& ObServerStatus::OB_HEARTBEAT_PERMANENT_OFFLINE != hb_status) {950server_hb_info.set_hb_status(ObServerStatus::OB_HEARTBEAT_LEASE_EXPIRED);951ROOTSERVICE_EVENT_ADD("server", "lease_expire", "server", server);952HBS_LOG_INFO("the server's lease becomes expired'", K(server), K(time_diff));953} else if (time_diff < GCONF.lease_time954&& ObServerStatus::OB_HEARTBEAT_ALIVE != hb_status) {955server_hb_info.set_hb_status(ObServerStatus::OB_HEARTBEAT_ALIVE);956ROOTSERVICE_EVENT_ADD("server", "online", "server", server);957HBS_LOG_INFO("the server's lease becomes online'", K(server), K(time_diff));958} else {}959// step 3: update server_hb_info960if (FAILEDx(all_servers_hb_info_.set_refactored(961server_hb_info.get_server(),962server_hb_info,9631 /* flag: 0 shows that not cover existing object. */))) {964LOG_WARN("fail to push an element into all_servers_hb_info_", KR(ret), K(server_hb_info));965}966}967return ret;968}
969
970int ObHeartbeatService::clear_deleted_servers_in_all_servers_hb_info_()971{
972int ret = OB_SUCCESS;973ObAddr server;974hash::ObHashMap<ObAddr, ObServerHBInfo>::iterator iter = all_servers_hb_info_.begin();975if (OB_UNLIKELY(!is_inited_)) { // return false976ret = OB_NOT_INIT;977LOG_WARN("not init", KR(ret), K(is_inited_));978} else {979while (OB_SUCC(ret) && iter != all_servers_hb_info_.end()) {980int64_t idx = OB_INVALID_INDEX_INT64;981server.reset();982server = iter->first;983iter++;984if (!has_server_exist_in_array_(all_servers_info_in_table_, server, idx)) {985HBS_LOG_INFO("the server is deleted, it can be removed from all_servers_hb_info", K(server));986if (OB_FAIL(all_servers_hb_info_.erase_refactored(server))) {987LOG_WARN("fail to remove the server from all_servers_hb_info", KR(ret), K(server));988}989}990}991}992return ret;993}
994
995int ObHeartbeatService::update_table_for_server_with_hb_response_(996const ObHBResponse &hb_response,997const share::ObServerInfoInTable &server_info_in_table,998const int64_t hb_responses_epoch_id)999{
1000int ret = OB_SUCCESS;1001common::ObMySQLTransaction trans;1002bool is_match = false;1003if (OB_UNLIKELY(!is_inited_)) {1004ret = OB_NOT_INIT;1005LOG_WARN("not init", KR(ret), K(is_inited_));1006} else if (OB_ISNULL(sql_proxy_)) {1007ret = OB_ERR_UNEXPECTED;1008HBS_LOG_ERROR("sql_proxy_ is null", KR(ret), KP(sql_proxy_));1009} else if (OB_UNLIKELY(!hb_response.is_valid()1010|| !server_info_in_table.is_valid()1011|| hb_response.get_server() != server_info_in_table.get_server()1012|| palf::INVALID_PROPOSAL_ID == hb_responses_epoch_id)) {1013ret = OB_INVALID_ARGUMENT;1014// return false1015LOG_WARN("invalid argument", KR(ret), K(hb_response), K(server_info_in_table), K(hb_responses_epoch_id));1016} else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {1017LOG_WARN("fail to start trans", KR(ret));1018} else if (OB_FAIL(ObServiceEpochProxy::check_service_epoch_with_trans(1019trans,1020OB_SYS_TENANT_ID,1021ObServiceEpochProxy::HEARTBEAT_SERVICE_EPOCH,1022hb_responses_epoch_id,1023is_match))) {1024LOG_WARN("fail to check heartbeat service epoch", KR(ret), K(hb_responses_epoch_id));1025} else if (OB_UNLIKELY(!is_match)) {1026ret = OB_NOT_MASTER;1027LOG_WARN("hb_responses_epoch_id is not the same as persistent heartbeat service epoch id", KR(ret));1028} else {1029const ObAddr &server = server_info_in_table.get_server();1030// ********* check with_rootserver ********* //1031if (OB_SUCC(ret)1032&& !server_info_in_table.get_with_rootserver() && server == GCTX.self_addr()) {1033if (OB_FAIL(ObServerTableOperator::update_with_rootserver(trans, server))) {1034HBS_LOG_WARN("fail to update_with_rootserver", KR(ret), K(server));1035} else {1036ROOTSERVICE_EVENT_ADD("server", "rootserver", "server", server);1037HBS_LOG_INFO("server becomes rootserver", K(server));1038}1039}1040
1041// ********* check if offline to online, then update last_offline_time and status ********* //1042if (OB_SUCC(ret) && 0 != server_info_in_table.get_last_offline_time()) {1043if (OB_FAIL(ObServerTableOperator::update_table_for_offline_to_online_server(1044trans,1045ObServerStatus::OB_SERVER_DELETING == server_info_in_table.get_status(), /* is_deleting */1046server))) {1047HBS_LOG_WARN("fail to reset last_offline_time", KR(ret), K(server));1048} else {1049ROOTSERVICE_EVENT_ADD("server", "last_offline_time reset", "server", server);1050HBS_LOG_INFO("server becomes online", K(server));1051}1052}1053// ********* check build_version ********* //1054if (OB_SUCC(ret) && server_info_in_table.get_build_version() != hb_response.get_build_version()) {1055if (OB_FAIL(ObServerTableOperator::update_build_version(1056trans,1057server,1058server_info_in_table.get_build_version(), // old value1059hb_response.get_build_version()))) {1060HBS_LOG_WARN("fail to update build_version", KR(ret),1061K(server_info_in_table), K(hb_response));1062} else {1063ROOTSERVICE_EVENT_ADD("server", hb_response.get_build_version().ptr(), "server", server);1064HBS_LOG_INFO("build_version is updated", K(server),1065K(hb_response.get_build_version()), K(hb_response.get_build_version()));1066}1067}1068// ********* check start_service_time ********* //1069if (OB_SUCC(ret) && server_info_in_table.get_start_service_time() != hb_response.get_start_service_time()) {1070if (OB_FAIL(ObServerTableOperator::update_start_service_time(1071trans,1072server,1073server_info_in_table.get_start_service_time(), // old value1074hb_response.get_start_service_time()))) {1075HBS_LOG_WARN("fail to update start service time", KR(ret), K(server),1076K(server_info_in_table.get_start_service_time()), K(hb_response.get_start_service_time()));1077} else {1078ROOTSERVICE_EVENT_ADD("server", "start_service", "server", server);1079HBS_LOG_INFO("start service time is updated", K(server),1080K(server_info_in_table.get_start_service_time()), K(hb_response.get_start_service_time()));1081}1082}1083}1084int tmp_ret = OB_SUCCESS;1085if (OB_TMP_FAIL(end_trans_and_refresh_server_(server_info_in_table.get_server(),1086OB_SUCC(ret), trans))) {1087LOG_WARN("failed to end trans", KR(ret), K(tmp_ret), K(server_info_in_table));1088ret = OB_SUCC(ret) ? tmp_ret : ret;1089}1090return ret;1091}
1092template <typename T>1093bool ObHeartbeatService::has_server_exist_in_array_(1094const ObIArray<T> &array,1095const common::ObAddr &server,1096int64_t &idx)1097{
1098bool bret = false;1099idx = OB_INVALID_INDEX_INT64;1100for (int64_t i = 0; i < array.count(); i++) {1101if (server == array.at(i).get_server()) {1102bret = true;1103idx = i;1104break;1105}1106}1107return bret;1108}
1109}
1110}
1111