2
* Copyright (c) 2022 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
12
#define USING_LOG_PREFIX SERVER
13
#include "observer/ob_heartbeat_handler.h"
15
#include "observer/ob_server.h"
16
#include "share/ob_version.h"
17
#include "observer/ob_service.h"
23
static const char *OB_DATA_DISK_STATUS_STR[] = {"INVALID", "NORMAL", "ERROR"};
28
ObServerHealthStatus::ObServerHealthStatus()
29
: data_disk_status_(ObDataDiskStatus::DATA_DISK_STATUS_INVALID)
32
ObServerHealthStatus::~ObServerHealthStatus()
35
int ObServerHealthStatus::init(ObDataDiskStatus data_disk_status)
38
if (data_disk_status <= DATA_DISK_STATUS_INVALID || data_disk_status >= DATA_DISK_STATUS_MAX) {
39
ret = OB_INVALID_ARGUMENT;
40
LOG_WARN("invalid argument", KR(ret), K(data_disk_status));
42
data_disk_status_ = data_disk_status;
46
int ObServerHealthStatus::assign(const ObServerHealthStatus server_health_status)
49
data_disk_status_ = server_health_status.data_disk_status_;
52
void ObServerHealthStatus::reset()
54
data_disk_status_ = ObDataDiskStatus::DATA_DISK_STATUS_INVALID;
56
bool ObServerHealthStatus::is_valid() const
58
return data_disk_status_ > ObDataDiskStatus::DATA_DISK_STATUS_INVALID
59
&& data_disk_status_ < ObDataDiskStatus::DATA_DISK_STATUS_MAX;
61
bool ObServerHealthStatus::is_healthy() const
63
return ObDataDiskStatus::DATA_DISK_STATUS_NORMAL == data_disk_status_;
65
const char *ObServerHealthStatus::data_disk_status_to_str(const ObDataDiskStatus data_disk_status)
67
STATIC_ASSERT(ARRAYSIZEOF(OB_DATA_DISK_STATUS_STR) == DATA_DISK_STATUS_MAX, "array size mismatch");
68
const char *str = "UNKNOWN";
69
if (OB_UNLIKELY(data_disk_status >= ARRAYSIZEOF(OB_DATA_DISK_STATUS_STR)
70
|| data_disk_status < DATA_DISK_STATUS_INVALID)) {
71
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "fatal error, unknown data disk status", K(data_disk_status));
73
str = OB_DATA_DISK_STATUS_STR[data_disk_status];
77
ObHeartbeatHandler::ObHeartbeatHandler()
80
ObHeartbeatHandler::~ObHeartbeatHandler()
83
int64_t ObHeartbeatHandler::rs_epoch_id_ = palf::INVALID_PROPOSAL_ID;
84
bool ObHeartbeatHandler::is_rs_epoch_id_valid()
86
return palf::INVALID_PROPOSAL_ID != ATOMIC_LOAD(&rs_epoch_id_);
88
int ObHeartbeatHandler::handle_heartbeat(
89
const share::ObHBRequest &hb_request,
90
share::ObHBResponse &hb_response)
94
int64_t rs_epoch_id = ATOMIC_LOAD(&rs_epoch_id_);
95
if (OB_UNLIKELY(!hb_request.is_valid())) {
96
ret = OB_INVALID_ARGUMENT;
97
LOG_WARN("receive an invalid heartbeat request", KR(ret), K(hb_request));
98
} else if (OB_ISNULL(GCTX.rs_mgr_)) {
99
ret = OB_ERR_UNEXPECTED;
100
LOG_WARN("rs manager is null", KR(ret), KP(GCTX.rs_mgr_));
102
const int64_t epoch_id = hb_request.get_epoch_id();
103
if (rs_epoch_id < epoch_id || palf::INVALID_PROPOSAL_ID == rs_epoch_id) {
104
LOG_INFO("receive new rs epoch", "old rs_epoch_id", rs_epoch_id, "new rs_epoch_id", epoch_id);
105
int64_t current_epoch_id = ATOMIC_CAS(&rs_epoch_id_, rs_epoch_id, epoch_id);
106
if (rs_epoch_id != current_epoch_id) {
108
LOG_WARN("set rs_epoch_id_failed", KR(ret), K(rs_epoch_id), K(epoch_id), K(current_epoch_id));
110
} else if (rs_epoch_id > epoch_id) {
111
ret = OB_RS_NOT_MASTER;
112
LOG_WARN("this rs is not the newest leader", KR(ret), K(rs_epoch_id), K(epoch_id));
115
if (FAILEDx(GCTX.rs_mgr_->force_set_master_rs(hb_request.get_rs_addr()))) {
116
LOG_WARN("fail to set master rs", KR(ret), K(hb_request.get_rs_addr()));
117
} else if (OB_FAIL(init_hb_response_(hb_response))) {
118
LOG_WARN("fail to init hb response", KR(ret));
120
// const uint64_t server_id = hb_request.get_server_id();
121
const share::RSServerStatus rs_server_status = hb_request.get_rs_server_status();
122
// if (GCTX.server_id_ != server_id) {
123
// LOG_INFO("receive new server id", "old server_id_", GCTX.server_id_, "new server_id_", server_id);
124
// GCTX.server_id_ = server_id;
126
if (GCTX.rs_server_status_ != rs_server_status) {
127
LOG_INFO("receive new server status recorded in rs",
128
"old_status", GCTX.rs_server_status_,
129
"new_status", rs_server_status);
130
GCTX.rs_server_status_ = rs_server_status;
135
int ObHeartbeatHandler::check_disk_status_(ObServerHealthStatus &server_health_status)
137
int ret = OB_SUCCESS;
138
int tmp_ret = OB_SUCCESS;
139
ObDeviceHealthStatus dhs = DEVICE_HEALTH_NORMAL;
140
int64_t abnormal_time = 0;
141
server_health_status.reset();
142
if (OB_TMP_FAIL(ObIOManager::get_instance().get_device_health_status(dhs, abnormal_time))) {
143
LOG_WARN("fail to get device health status", KR(ret), KR(tmp_ret));
144
} else if (OB_UNLIKELY(DEVICE_HEALTH_ERROR == dhs)) {
145
const int64_t PRINT_LOG_INTERVAL_IN_US = 60 * 1000 * 1000; // 1min
146
if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL_IN_US)) {
147
LOG_WARN("error occurs on data disk, ",
148
"data_disk_health_status", device_health_status_to_str(dhs), K(abnormal_time));
151
const bool is_data_disk_error = (DEVICE_HEALTH_ERROR == dhs);
152
if (is_data_disk_error) {
153
server_health_status.init(ObServerHealthStatus::DATA_DISK_STATUS_ERROR);
155
server_health_status.init(ObServerHealthStatus::DATA_DISK_STATUS_NORMAL);
159
ERRSIM_POINT_DEF(ERRSIM_DISK_ERROR);
160
int ObHeartbeatHandler::init_hb_response_(share::ObHBResponse &hb_response)
162
int ret = OB_SUCCESS;
163
ObServerHealthStatus server_health_status;
164
if (OB_FAIL(check_disk_status_(server_health_status))) {
165
LOG_WARN("fail to check disk status", KR(ret));
167
int64_t sql_port = GCONF.mysql_port;
168
share::ObServerInfoInTable::ObBuildVersion build_version;
170
int64_t test_id = ERRSIM_DISK_ERROR ? 2 : OB_INVALID_ID;
171
if (test_id == GCTX.server_id_) {
172
server_health_status.reset();
173
server_health_status.init(ObServerHealthStatus::DATA_DISK_STATUS_ERROR);
175
if (OB_FAIL(zone.assign(GCONF.zone.str()))) {
176
LOG_WARN("fail to assign zone", KR(ret), K(GCONF.zone.str()));
177
} else if (OB_FAIL(ObService::get_build_version(build_version))) {
178
LOG_WARN("fail to get build_version", KR(ret), K(build_version));
179
} else if (OB_FAIL(hb_response.init(
184
GCTX.start_service_time_,
185
server_health_status))) {
186
LOG_WARN("fail to init the heartbeat response", KR(ret), K(zone), K(GCTX.self_addr()),
187
K(sql_port), K(build_version), K(GCTX.start_service_time_), K(server_health_status));