oceanbase

Форк
0
/
ob_heartbeat_handler.cpp 
193 строки · 7.2 Кб
1
/**
2
 * Copyright (c) 2022 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12
#define USING_LOG_PREFIX SERVER
13
#include "observer/ob_heartbeat_handler.h"
14

15
#include "observer/ob_server.h"
16
#include "share/ob_version.h"
17
#include "observer/ob_service.h"
18

19
namespace oceanbase
20
{
21
namespace observer
22
{
23
static const char *OB_DATA_DISK_STATUS_STR[] = {"INVALID", "NORMAL", "ERROR"};
24
OB_SERIALIZE_MEMBER(
25
    ObServerHealthStatus,
26
    data_disk_status_
27
)
28
ObServerHealthStatus::ObServerHealthStatus()
29
    : data_disk_status_(ObDataDiskStatus::DATA_DISK_STATUS_INVALID)
30
{
31
}
32
ObServerHealthStatus::~ObServerHealthStatus()
33
{
34
}
35
int ObServerHealthStatus::init(ObDataDiskStatus data_disk_status)
36
{
37
  int ret = OB_SUCCESS;
38
  if (data_disk_status <= DATA_DISK_STATUS_INVALID || data_disk_status >= DATA_DISK_STATUS_MAX) {
39
    ret = OB_INVALID_ARGUMENT;
40
    LOG_WARN("invalid argument", KR(ret), K(data_disk_status));
41
  } else {
42
    data_disk_status_ = data_disk_status;
43
  }
44
  return ret;
45
}
46
int ObServerHealthStatus::assign(const ObServerHealthStatus server_health_status)
47
{
48
  int ret = OB_SUCCESS;
49
  data_disk_status_ = server_health_status.data_disk_status_;
50
  return ret;
51
}
52
void ObServerHealthStatus::reset()
53
{
54
  data_disk_status_ = ObDataDiskStatus::DATA_DISK_STATUS_INVALID;
55
}
56
bool ObServerHealthStatus::is_valid() const
57
{
58
  return data_disk_status_ > ObDataDiskStatus::DATA_DISK_STATUS_INVALID
59
      && data_disk_status_ < ObDataDiskStatus::DATA_DISK_STATUS_MAX;
60
}
61
bool ObServerHealthStatus::is_healthy() const
62
{
63
  return ObDataDiskStatus::DATA_DISK_STATUS_NORMAL == data_disk_status_;
64
}
65
const char *ObServerHealthStatus::data_disk_status_to_str(const ObDataDiskStatus data_disk_status)
66
{
67
  STATIC_ASSERT(ARRAYSIZEOF(OB_DATA_DISK_STATUS_STR) == DATA_DISK_STATUS_MAX, "array size mismatch");
68
  const char *str = "UNKNOWN";
69
  if (OB_UNLIKELY(data_disk_status >= ARRAYSIZEOF(OB_DATA_DISK_STATUS_STR)
70
                  || data_disk_status < DATA_DISK_STATUS_INVALID)) {
71
    LOG_ERROR_RET(OB_ERR_UNEXPECTED, "fatal error, unknown data disk status", K(data_disk_status));
72
  } else {
73
    str = OB_DATA_DISK_STATUS_STR[data_disk_status];
74
  }
75
  return str;
76
}
77
ObHeartbeatHandler::ObHeartbeatHandler()
78
{
79
}
80
ObHeartbeatHandler::~ObHeartbeatHandler()
81
{
82
}
83
int64_t ObHeartbeatHandler::rs_epoch_id_ = palf::INVALID_PROPOSAL_ID;
84
bool ObHeartbeatHandler::is_rs_epoch_id_valid()
85
{
86
  return palf::INVALID_PROPOSAL_ID != ATOMIC_LOAD(&rs_epoch_id_);
87
}
88
int ObHeartbeatHandler::handle_heartbeat(
89
    const share::ObHBRequest &hb_request,
90
    share::ObHBResponse &hb_response)
91
{
92
  int ret = OB_SUCCESS;
93
  hb_response.reset();
94
  int64_t rs_epoch_id = ATOMIC_LOAD(&rs_epoch_id_);
95
  if (OB_UNLIKELY(!hb_request.is_valid())) {
96
    ret = OB_INVALID_ARGUMENT;
97
    LOG_WARN("receive an invalid heartbeat request", KR(ret), K(hb_request));
98
  } else if (OB_ISNULL(GCTX.rs_mgr_)) {
99
    ret = OB_ERR_UNEXPECTED;
100
    LOG_WARN("rs manager is null", KR(ret), KP(GCTX.rs_mgr_));
101
  } else {
102
    const int64_t epoch_id = hb_request.get_epoch_id();
103
    if (rs_epoch_id < epoch_id || palf::INVALID_PROPOSAL_ID == rs_epoch_id) {
104
      LOG_INFO("receive new rs epoch", "old rs_epoch_id", rs_epoch_id, "new rs_epoch_id", epoch_id);
105
      int64_t current_epoch_id = ATOMIC_CAS(&rs_epoch_id_, rs_epoch_id, epoch_id);
106
      if (rs_epoch_id != current_epoch_id) {
107
        ret = OB_NEED_RETRY;
108
        LOG_WARN("set rs_epoch_id_failed", KR(ret), K(rs_epoch_id), K(epoch_id), K(current_epoch_id));
109
      }
110
    } else if (rs_epoch_id > epoch_id) {
111
      ret = OB_RS_NOT_MASTER;
112
      LOG_WARN("this rs is not the newest leader", KR(ret), K(rs_epoch_id), K(epoch_id));
113
    }
114
  }
115
  if (FAILEDx(GCTX.rs_mgr_->force_set_master_rs(hb_request.get_rs_addr()))) {
116
    LOG_WARN("fail to set master rs", KR(ret), K(hb_request.get_rs_addr()));
117
  } else if (OB_FAIL(init_hb_response_(hb_response))) {
118
    LOG_WARN("fail to init hb response", KR(ret));
119
  } else {
120
    // const uint64_t server_id = hb_request.get_server_id();
121
    const share::RSServerStatus rs_server_status = hb_request.get_rs_server_status();
122
    // if (GCTX.server_id_ != server_id) {
123
    //   LOG_INFO("receive new server id", "old server_id_", GCTX.server_id_, "new server_id_", server_id);
124
    //   GCTX.server_id_ = server_id;
125
    // }
126
    if (GCTX.rs_server_status_ != rs_server_status) {
127
      LOG_INFO("receive new server status recorded in rs",
128
          "old_status", GCTX.rs_server_status_,
129
          "new_status", rs_server_status);
130
      GCTX.rs_server_status_ = rs_server_status;
131
    }
132
  }
133
  return ret;
134
}
135
int ObHeartbeatHandler::check_disk_status_(ObServerHealthStatus &server_health_status)
136
{
137
  int ret = OB_SUCCESS;
138
  int tmp_ret = OB_SUCCESS;
139
  ObDeviceHealthStatus dhs = DEVICE_HEALTH_NORMAL;
140
  int64_t abnormal_time = 0;
141
  server_health_status.reset();
142
  if (OB_TMP_FAIL(ObIOManager::get_instance().get_device_health_status(dhs, abnormal_time))) {
143
    LOG_WARN("fail to get device health status", KR(ret), KR(tmp_ret));
144
  } else if (OB_UNLIKELY(DEVICE_HEALTH_ERROR == dhs)) {
145
    const int64_t PRINT_LOG_INTERVAL_IN_US = 60 * 1000 * 1000; // 1min
146
    if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL_IN_US)) {
147
      LOG_WARN("error occurs on data disk, ",
148
          "data_disk_health_status", device_health_status_to_str(dhs), K(abnormal_time));
149
    }
150
  }
151
  const bool is_data_disk_error = (DEVICE_HEALTH_ERROR == dhs);
152
  if (is_data_disk_error) {
153
    server_health_status.init(ObServerHealthStatus::DATA_DISK_STATUS_ERROR);
154
  } else {
155
    server_health_status.init(ObServerHealthStatus::DATA_DISK_STATUS_NORMAL);
156
  }
157
  return ret;
158
}
159
ERRSIM_POINT_DEF(ERRSIM_DISK_ERROR);
160
int ObHeartbeatHandler::init_hb_response_(share::ObHBResponse &hb_response)
161
{
162
  int ret = OB_SUCCESS;
163
  ObServerHealthStatus server_health_status;
164
  if (OB_FAIL(check_disk_status_(server_health_status))) {
165
    LOG_WARN("fail to check disk status", KR(ret));
166
  } else {
167
    int64_t sql_port = GCONF.mysql_port;
168
    share::ObServerInfoInTable::ObBuildVersion build_version;
169
    common::ObZone zone;
170
    int64_t test_id = ERRSIM_DISK_ERROR ? 2 : OB_INVALID_ID;
171
    if (test_id == GCTX.server_id_) {
172
      server_health_status.reset();
173
      server_health_status.init(ObServerHealthStatus::DATA_DISK_STATUS_ERROR);
174
    }
175
    if (OB_FAIL(zone.assign(GCONF.zone.str()))) {
176
      LOG_WARN("fail to assign zone", KR(ret), K(GCONF.zone.str()));
177
    } else if (OB_FAIL(ObService::get_build_version(build_version))) {
178
      LOG_WARN("fail to get build_version", KR(ret), K(build_version));
179
    } else if (OB_FAIL(hb_response.init(
180
        zone,
181
        GCTX.self_addr(),
182
        sql_port,
183
        build_version,
184
        GCTX.start_service_time_,
185
        server_health_status))) {
186
      LOG_WARN("fail to init the heartbeat response", KR(ret), K(zone), K(GCTX.self_addr()),
187
          K(sql_port), K(build_version), K(GCTX.start_service_time_), K(server_health_status));
188
    } else {}
189
  }
190
  return ret;
191
}
192
} // observer
193
} // oceanbase

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.