oceanbase

Форк
0
/
ob_heartbeat_service.cpp 
1110 строк · 45.5 Кб
1
/**
2
 * Copyright (c) 2022 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14
#include "ob_heartbeat_service.h"
15

16
#include "share/ob_define.h"
17
#include "share/ob_service_epoch_proxy.h"
18
#include "share/ob_version.h"
19
#include "share/ob_zone_table_operation.h"
20
#include "lib/thread/threads.h"               // set_run_wrapper
21
#include "lib/mysqlclient/ob_mysql_transaction.h"  // ObMySQLTransaction
22
#include "lib/utility/ob_unify_serialize.h"
23
#include "lib/time/ob_time_utility.h"
24
#include "observer/ob_server_struct.h"              // GCTX
25
#include "logservice/ob_log_base_header.h"          // ObLogBaseHeader
26
#include "logservice/ob_log_handler.h"
27
#include "storage/tx_storage/ob_ls_service.h"
28
#include "storage/tx_storage/ob_ls_handle.h"
29
#include "rootserver/ob_root_utils.h"            // get_proposal_id_from_sys_ls
30
#include "rootserver/ob_rs_event_history_table_operator.h" // ROOTSERVICE_EVENT_ADD
31
#include "rootserver/ob_root_service.h"
32
#include "lib/utility/utility.h"
33

34
namespace oceanbase
35
{
36
using namespace common;
37
using namespace share;
38
using observer::ObServerHealthStatus;
39
namespace rootserver
40
{
41
#define HBS_LOG_INFO(fmt, args...) FLOG_INFO("[HEARTBEAT_SERVICE] " fmt, ##args)
42
#define HBS_LOG_WARN(fmt, args...) FLOG_WARN("[HEARTBEAT_SERVICE] " fmt, ##args)
43
#define HBS_LOG_ERROR(fmt, args...) FLOG_ERROR("[HEARTBEAT_SERVICE] " fmt, ##args)
44
ObHeartbeatService::ObHeartbeatService()
45
    : is_inited_(false),
46
      sql_proxy_(NULL),
47
      srv_rpc_proxy_(NULL),
48
      epoch_id_(palf::INVALID_PROPOSAL_ID),
49
      whitelist_epoch_id_(palf::INVALID_PROPOSAL_ID),
50
      hb_responses_epoch_id_(palf::INVALID_PROPOSAL_ID),
51
      hb_responses_rwlock_(ObLatchIds::HB_RESPONSES_LOCK),
52
      all_servers_info_in_table_rwlock_(ObLatchIds::ALL_SERVERS_INFO_IN_TABLE_LOCK),
53
      all_servers_hb_info_(),
54
      all_servers_info_in_table_(),
55
      inactive_zone_list_(),
56
      hb_responses_(),
57
      need_process_hb_responses_(false)
58
{
59
}
60
ObHeartbeatService::~ObHeartbeatService()
61
{
62
}
63
bool ObHeartbeatService::is_service_enabled_ = false;
64
int ObHeartbeatService::init()
65
{
66
  int ret = OB_SUCCESS;
67
  int BUCKET_NUM  = 1024; // ** FIXME: (linqiucen.lqc) temp. value
68
  sql_proxy_ = GCTX.sql_proxy_;
69
  srv_rpc_proxy_ = GCTX.srv_rpc_proxy_;
70
  lib::ObMemAttr attr(MTL_ID(), "HB_SERVICE");
71
  if (OB_UNLIKELY(is_inited_)) {
72
    ret = OB_INIT_TWICE;
73
    LOG_WARN("has already inited", KR(ret), K(is_inited_));
74
  } else if (MTL_ID() != OB_SYS_TENANT_ID) {
75
    // only create hb service threads in sys tenant
76
  } else if (OB_ISNULL(srv_rpc_proxy_)) {
77
    ret = OB_ERR_UNEXPECTED;
78
    HBS_LOG_ERROR("srv_rpc_proxy_ is null", KR(ret), KP(srv_rpc_proxy_));
79
  } else if (OB_FAIL(ObTenantThreadHelper::create(
80
      "HBService",
81
      lib::TGDefIDs::HeartbeatService,
82
      *this))) {
83
    LOG_WARN("fail to create thread", KR(ret));
84
  } else if (OB_FAIL(ObTenantThreadHelper::start())) {
85
    LOG_WARN("failed to start thread", KR(ret));
86
  } else if (OB_FAIL(all_servers_hb_info_.create(BUCKET_NUM, attr))) {
87
    LOG_WARN("fail to create all_servers_hb_info_", KR(ret));
88
  } else {
89
    {
90
      SpinWLockGuard guard_for_hb_responses(hb_responses_rwlock_);
91
      hb_responses_.reset();
92
      hb_responses_epoch_id_ = palf::INVALID_PROPOSAL_ID;
93
      need_process_hb_responses_ = false;
94
    }
95
    {
96
      SpinWLockGuard guard_for_servers_info(all_servers_info_in_table_rwlock_);
97
      all_servers_info_in_table_.reset();
98
      inactive_zone_list_.reset();
99
      whitelist_epoch_id_ = palf::INVALID_PROPOSAL_ID;
100
    }
101
    all_servers_hb_info_.clear();
102
    all_servers_info_in_table_.set_attr(attr);
103
    inactive_zone_list_.set_attr(attr);
104
    hb_responses_.set_attr(attr);
105
    set_epoch_id_(palf::INVALID_PROPOSAL_ID);
106
    is_inited_ = true;
107
    HBS_LOG_INFO("ObHeartbeatService is inited");
108
  }
109
  // we do not need the returned error code when init
110
  // only try to confirm whether the heartbeat service is enabled as early as possible,
111
  (void) check_is_service_enabled_();
112
  return ret;
113
}
114
int ObHeartbeatService::check_is_service_enabled_()
115
{
116
  int ret = OB_SUCCESS;
117
  uint64_t sys_tenant_data_version = 0;
118
  if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, sys_tenant_data_version))) {
119
    LOG_WARN("fail to get sys tenant's min data version", KR(ret));
120
  } else if (sys_tenant_data_version >= DATA_VERSION_4_2_0_0) {
121
    is_service_enabled_ = true;
122
    HBS_LOG_INFO("the heartbeart service is enabled now", K(sys_tenant_data_version), K(is_service_enabled_));
123
  }
124
  return ret;
125
}
126
void ObHeartbeatService::destroy()
127
{
128
  {
129
    SpinWLockGuard guard_for_hb_responses(hb_responses_rwlock_);
130
    hb_responses_.reset();
131
    hb_responses_epoch_id_ = palf::INVALID_PROPOSAL_ID;
132
    need_process_hb_responses_ = false;
133
  }
134
  {
135
    SpinWLockGuard guard_for_servers_info(all_servers_info_in_table_rwlock_);
136
    all_servers_info_in_table_.reset();
137
    inactive_zone_list_.reset();
138
    whitelist_epoch_id_ = palf::INVALID_PROPOSAL_ID;
139
  }
140
  is_inited_ = false;
141
  sql_proxy_ = NULL;
142
  srv_rpc_proxy_ = NULL;
143
  set_epoch_id_(palf::INVALID_PROPOSAL_ID);
144
  all_servers_hb_info_.destroy();
145
  HBS_LOG_INFO("ObHeartbeatService is destroyed");
146
  ObTenantThreadHelper::destroy();
147
}
148

149
int ObHeartbeatService::switch_to_leader()
150
{
151
  int ret = OB_SUCCESS;
152
  int64_t epoch_id = palf::INVALID_PROPOSAL_ID;
153
  ObRole role;
154
  if (OB_FAIL(ObRootUtils::get_proposal_id_from_sys_ls(epoch_id, role))) {
155
    LOG_WARN("fail to get proposal id from sys ls", KR(ret));
156
  } else if (ObRole::LEADER != role) {
157
    ret = OB_NOT_MASTER;
158
    HBS_LOG_WARN("not master ls", KR(ret), K(epoch_id), K(role));
159
  } else {
160
    if (OB_LIKELY((palf::INVALID_PROPOSAL_ID == epoch_id_ || epoch_id_ < epoch_id)
161
        && palf::INVALID_PROPOSAL_ID != epoch_id)) {
162
      set_epoch_id_(epoch_id);
163
    } else {
164
      ret = OB_INVALID_ARGUMENT;
165
      LOG_WARN("invalid epoch id", KR(ret), K(epoch_id), K(epoch_id_));
166
    }
167
  }
168
  if (FAILEDx(ObTenantThreadHelper::switch_to_leader())) {
169
    HBS_LOG_WARN("fail to switch to leader", KR(ret));
170
  } else {
171
    HBS_LOG_INFO("switch to leader", KR(ret), K(epoch_id_));
172
  }
173
  return ret;
174
}
175
void ObHeartbeatService::do_work()
176
{
177
  int ret = OB_SUCCESS;
178
  if (OB_UNLIKELY(!is_inited_)) {
179
    ret = OB_NOT_INIT;
180
    LOG_WARN("not init", KR(ret), K(is_inited_));
181
  } else if (OB_FAIL(check_upgrade_compat_())) {
182
    LOG_WARN("fail to check upgrade compatibility", KR(ret));
183
  } else {
184
    while (!has_set_stop()) {
185
      uint64_t thread_idx = get_thread_idx();
186
      int64_t thread_cnt = THREAD_COUNT;
187
      if (OB_UNLIKELY(thread_idx >= thread_cnt)) {
188
        ret = OB_ERR_UNEXPECTED;
189
        HBS_LOG_ERROR("unexpected thread_idx", KR(ret), K(thread_idx), K(thread_cnt));
190
      } else {
191
        if (0 == thread_idx) {
192
          ObCurTraceId::init(GCONF.self_addr_);
193
          if (OB_FAIL(send_heartbeat_())) {
194
            LOG_WARN("fail to send heartbeat", KR(ret));
195
          }
196
        } else { // 1 == thread_idx
197
          ObCurTraceId::init(GCONF.self_addr_);
198
          if (OB_FAIL(manage_heartbeat_())) {
199
            LOG_WARN("fail to manage heartbeat", KR(ret));
200
          }
201
        }
202
        if(OB_FAIL(ret)) {
203
          idle(HB_FAILED_IDLE_TIME_US);
204
        } else {
205
          idle(HB_IDLE_TIME_US);
206
        }
207
      }
208
    } // end while
209
  }
210
}
211
int ObHeartbeatService::check_upgrade_compat_()
212
{
213
  int ret = OB_SUCCESS;
214
  while (!is_service_enabled_ && !has_set_stop()) {
215
    if (OB_FAIL(check_is_service_enabled_())) {
216
      LOG_WARN("fail to check whether the heartbeat service is enabled", KR(ret));
217
    }
218
    idle(HB_IDLE_TIME_US);
219
  }
220
  if (has_set_stop()) {
221
    ret = OB_NOT_MASTER;
222
    LOG_WARN("not leader", KR(ret));
223
  }
224
  return ret;
225
}
226
int ObHeartbeatService::send_heartbeat_()
227
{
228
  int ret = OB_SUCCESS;
229
  ObHBRequestArray hb_requests;
230
  int64_t tmp_whitelist_epoch_id = palf::INVALID_PROPOSAL_ID;
231
  if (OB_UNLIKELY(!is_inited_)) {
232
    ret = OB_NOT_INIT;
233
    LOG_WARN("not init", KR(ret), K(is_inited_));
234
  } else if (OB_ISNULL(srv_rpc_proxy_)) {
235
    ret = OB_ERR_UNEXPECTED;
236
    HBS_LOG_ERROR("srv_rpc_proxy_ is null", KR(ret), KP(srv_rpc_proxy_));
237
  } else {
238
    ObTimeGuard time_guard("ObHeartbeatService::send_heartbeat_", 2 * 1000 * 1000);
239
    // step 1: prepare hb_requests based on the whitelist
240
    if (OB_FAIL(prepare_hb_requests_(hb_requests, tmp_whitelist_epoch_id))) {
241
      LOG_WARN("fail to prepare heartbeat requests", KR(ret));
242
    } else if (hb_requests.count() <= 0) {
243
      LOG_INFO("no heartbeat request needs to be sent");
244
    } else {
245
      time_guard.click("end prepare_hb_requests");
246
      ObSendHeartbeatProxy proxy(*srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::handle_heartbeat);
247
      int64_t timeout = GCONF.rpc_timeout;  // default value is 2s
248
      int tmp_ret = OB_SUCCESS;
249
      ObArray<int> return_ret_array;
250
      // step 2: send hb_requests to all servers in the whitelist
251
      for (int64_t i = 0; i < hb_requests.count(); i++) {
252
        if (OB_TMP_FAIL(proxy.call(
253
            hb_requests.at(i).get_server(),
254
            timeout,
255
            GCONF.cluster_id,
256
            OB_SYS_TENANT_ID,
257
            hb_requests.at(i)))) {
258
          // error code will be ignored here.
259
          // send rpc to some offline servers will return error, however, it's acceptable
260
          LOG_WARN("fail to send heartbeat rpc",  KR(ret), KR(tmp_ret), K(hb_requests.at(i)));
261
        }
262
      }
263
      // step 3: wait hb_responses
264
      if (OB_TMP_FAIL(proxy.wait_all(return_ret_array))) {
265
        LOG_WARN("fail to wait all batch result", KR(ret), KR(tmp_ret));
266
        ret = OB_SUCC(ret) ? tmp_ret : ret;
267
      }
268
      time_guard.click("end wait_hb_responses");
269
      // step 4: save hb_responses
270
      if (FAILEDx(set_hb_responses_(tmp_whitelist_epoch_id, &proxy))) {
271
        LOG_WARN("fail to set hb_responses", KR(ret));
272
      }
273
      time_guard.click("end set_hb_responses");
274
    }
275
  }
276
  FLOG_INFO("send_heartbeat_ has finished one round", KR(ret));
277
  return ret;
278
}
279
int ObHeartbeatService::set_hb_responses_(const int64_t whitelist_epoch_id, ObSendHeartbeatProxy *proxy)
280
{
281
  int ret = OB_SUCCESS;
282
  if (OB_UNLIKELY(!is_inited_)) {
283
    ret = OB_NOT_INIT;
284
    LOG_WARN("not init", KR(ret), K(is_inited_));
285
  } else if (OB_ISNULL(proxy)) {
286
    ret = OB_ERR_UNEXPECTED;
287
    LOG_WARN("proxy is null", KR(ret), KP(proxy));
288
  } else if (OB_UNLIKELY(proxy->get_dests().count() != proxy->get_results().count())) {
289
    ret = OB_ERR_UNEXPECTED;
290
    LOG_WARN("dest addr count != result count", KR(ret), "dest addr count", proxy->get_dests().count(),
291
        "result count", proxy->get_results().count());
292
  } else {
293
    int tmp_ret = OB_SUCCESS;
294
    SpinWLockGuard guard_for_hb_responses(hb_responses_rwlock_);
295
    need_process_hb_responses_ = true;
296
    hb_responses_epoch_id_ = whitelist_epoch_id;
297
    hb_responses_.reset();
298
    // don't use arg/dest here because call() may has failue.
299
    ARRAY_FOREACH_X(proxy->get_results(), idx, cnt, OB_SUCC(ret)) {
300
      const ObHBResponse *hb_response = proxy->get_results().at(idx);
301
      const ObAddr &dest_addr = proxy->get_dests().at(idx);
302
      if (OB_ISNULL(hb_response)) {
303
        tmp_ret = OB_ERR_UNEXPECTED;
304
        LOG_WARN("hb_response is null", KR(ret), KR(tmp_ret), KP(hb_response));
305
      } else if (OB_UNLIKELY(!hb_response->is_valid())) {
306
        // if an observer does not reply the rpc, we will get an invalid hb_response.
307
        tmp_ret = OB_INVALID_ARGUMENT;
308
        LOG_WARN("There exists a server not responding to the hb service",
309
            KR(ret), KR(tmp_ret), KPC(hb_response), K(dest_addr));
310
      } else if (OB_FAIL(hb_responses_.push_back(*hb_response))) {
311
        LOG_WARN("fail to push an element into hb_responses_", KR(ret), KPC(hb_response));
312
      } else {
313
        LOG_TRACE("receive a heartbeat response", KPC(hb_response));
314
      }
315
    }
316
  }
317
  return ret;
318
}
319
int ObHeartbeatService::get_and_reset_hb_responses_(
320
    ObHBResponseArray &hb_responses,
321
    int64_t &hb_responses_epoch_id)
322
{
323
  int ret = OB_SUCCESS;
324
  // set hb_responses = hb_responses_
325
  // locking hb_responses_ too long will block send_heartbeat()
326
  // therefore we process hb_responses rather than hb_responses_
327
  hb_responses.reset();
328
  hb_responses_epoch_id = palf::INVALID_PROPOSAL_ID;
329
  SpinWLockGuard guard_for_hb_responses(hb_responses_rwlock_);
330
  if (need_process_hb_responses_) {
331
    if (OB_FAIL(hb_responses.assign(hb_responses_))) {
332
      LOG_WARN("fail to assign tmp_hb_responses", KR(ret), K(hb_responses_));
333
    } else {
334
      need_process_hb_responses_ = false;
335
      hb_responses_epoch_id = hb_responses_epoch_id_;
336
      hb_responses_epoch_id_ = palf::INVALID_PROPOSAL_ID;
337
      hb_responses_.reset();
338
    }
339
  } else {
340
    ret = OB_NEED_WAIT;
341
    LOG_WARN("currently there are no hb_responses need to be proccessed", KR(ret));
342
  }
343
  return ret;
344
}
345
int ObHeartbeatService::prepare_hb_requests_(ObHBRequestArray &hb_requests, int64_t &whitelist_epoch_id)
346
{
347
  int ret = OB_SUCCESS;
348
  hb_requests.reset();
349
  if (OB_UNLIKELY(!is_inited_)) {
350
    ret = OB_NOT_INIT;
351
    LOG_WARN("not init", KR(ret), K(is_inited_));
352
  } else {
353
    // ensure when we prepare hb_requests,
354
    // we should mark these hb_requests are based on which whitelist.
355
    // In other words, we should mark the whitelist's corresponding whitelist_epoch_id_.
356
    SpinRLockGuard guard_for_servers_info(all_servers_info_in_table_rwlock_);
357
    ObHBRequest hb_request;
358
    whitelist_epoch_id = whitelist_epoch_id_;
359
    ARRAY_FOREACH_X(all_servers_info_in_table_, idx, cnt, OB_SUCC(ret)) {
360
      const ObServerInfoInTable &server_info = all_servers_info_in_table_.at(idx);
361
      bool is_stopped = false;
362
      if (OB_UNLIKELY(!server_info.is_valid())) {
363
        ret = OB_ERR_UNEXPECTED;
364
        HBS_LOG_WARN("invalid server info in table", KR(ret), K(server_info));
365
      } else {
366
        if (server_info.is_stopped() || has_exist_in_array(inactive_zone_list_, server_info.get_zone())) {
367
          is_stopped = true;
368
        }
369
      }
370
      if (OB_SUCC(ret)) {
371
        hb_request.reset();
372
        if (OB_FAIL(hb_request.init(
373
            server_info.get_server(),
374
            server_info.get_server_id(),
375
            GCTX.self_addr(),
376
            is_stopped ? RSS_IS_STOPPED : RSS_IS_WORKING,
377
            whitelist_epoch_id))) {
378
          LOG_WARN("fail to init hb_request", KR(ret), K(server_info), K(is_stopped),
379
              K(GCTX.self_addr()), K(whitelist_epoch_id));
380
        } else if (OB_FAIL(hb_requests.push_back(hb_request))) {
381
          LOG_WARN("fail to push an element into hb_requests", KR(ret), K(hb_request));
382
        } else {}
383
      }
384
    }
385
  }
386
  return ret;
387
}
388
int ObHeartbeatService::manage_heartbeat_()
389
{
390
  int ret = OB_SUCCESS;
391
  if (OB_UNLIKELY(!is_inited_)) {
392
    ret = OB_NOT_INIT;
393
    LOG_WARN("not init", KR(ret), K(is_inited_));
394
  } else {
395
    ObTimeGuard time_guard("ObHeartbeatService::manage_heartbeat_", 2 * 1000 * 1000);
396
    int tmp_ret = OB_SUCCESS;
397
    if (OB_TMP_FAIL(prepare_whitelist_())) {
398
      ret = OB_SUCC(ret) ? tmp_ret : ret;
399
      LOG_WARN("fail to prepare whitelist", KR(ret), KR(tmp_ret));
400
    }
401
    time_guard.click("end prepare_whitelist");
402
    if (OB_TMP_FAIL(process_hb_responses_())) {
403
      ret = OB_SUCC(ret) ? tmp_ret : ret;
404
      LOG_WARN("fail to prepare heartbeat response", KR(ret), KR(tmp_ret));
405
    }
406
    time_guard.click("end process_hb_responses");
407
  }
408
  FLOG_INFO("manage_heartbeat_ has finished one round", KR(ret));
409
  return ret;
410
}
411
int ObHeartbeatService::prepare_whitelist_()
412
{
413
  int ret = OB_SUCCESS;
414
  int64_t epoch_id = get_epoch_id_();
415
  int64_t persistent_epoch_id = palf::INVALID_PROPOSAL_ID;
416
  ObServerInfoInTableArray tmp_all_servers_info_in_table;
417
  ObArray<ObZone> tmp_inactive_zone_list;
418
  if (OB_UNLIKELY(!is_inited_)) {
419
    ret = OB_NOT_INIT;
420
    LOG_WARN("not init", KR(ret), K(is_inited_));
421
  } else if (OB_ISNULL(sql_proxy_)) {
422
    ret = OB_ERR_UNEXPECTED;
423
    LOG_WARN("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
424
  } else if (OB_FAIL(check_or_update_service_epoch_(epoch_id))) {
425
    LOG_WARN("fail to check or update service epoch", KR(ret), K(epoch_id));
426
  } else if (OB_FAIL(ObServerTableOperator::get(*sql_proxy_, tmp_all_servers_info_in_table))) {
427
    // It is possible that heartbeat_service_epoch is changed while we are reading __all_server table
428
    // It's acceptable, since we cannot update __all_server table when we hold the old heartbeat_service_epoch
429
    LOG_WARN("fail to read __all_server table", KR(ret), KP(sql_proxy_));
430
  } else if (OB_FAIL(ObZoneTableOperation::get_inactive_zone_list(*sql_proxy_, tmp_inactive_zone_list))) {
431
    LOG_WARN("fail to get inactive zone list", KR(ret), KP(sql_proxy_));
432
  } else {
433
    SpinWLockGuard guard_for_servers_info(all_servers_info_in_table_rwlock_);
434
    whitelist_epoch_id_ = epoch_id;
435
    if (OB_FAIL(all_servers_info_in_table_.assign(tmp_all_servers_info_in_table))) {
436
      all_servers_info_in_table_.reset();
437
      whitelist_epoch_id_ = palf::INVALID_PROPOSAL_ID;
438
      LOG_WARN("fail to assign all_servers_info_in_table_", KR(ret), K(tmp_all_servers_info_in_table));
439
    } else if (OB_FAIL(inactive_zone_list_.assign(tmp_inactive_zone_list))) {
440
      LOG_WARN("fail to assign inactive_zone_list_",KR(ret), K(tmp_inactive_zone_list));
441
    }
442
  }
443
  return ret;
444
}
445
int ObHeartbeatService::check_or_update_service_epoch_(const int64_t epoch_id)
446
{
447
  // if persistent_epoch_id == epoch_id: check ok.
448
  // if persistent_epoch_id < epoch_id: update heartbeat_service_epoch in __all_service_epoch table
449
  //                                    if the updation is successful, check ok.
450
  // if persistent_epoch_id > epoch_id: return error OB_NOT_MASTER
451
  int ret = OB_SUCCESS;
452
  int64_t persistent_epoch_id = palf::INVALID_PROPOSAL_ID;
453
  if (OB_UNLIKELY(!is_inited_)) {
454
    ret = OB_NOT_INIT;
455
    LOG_WARN("not init", KR(ret), K(is_inited_));
456
  } else if (OB_ISNULL(sql_proxy_)) {
457
    ret = OB_ERR_UNEXPECTED;
458
    LOG_WARN("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
459
  } else if (OB_FAIL(ObServiceEpochProxy::get_service_epoch(
460
      *sql_proxy_,
461
      OB_SYS_TENANT_ID,
462
      ObServiceEpochProxy::HEARTBEAT_SERVICE_EPOCH,
463
      persistent_epoch_id))) {
464
    LOG_WARN("fail to get heartbeat service epoch", KR(ret),KP(sql_proxy_));
465
  } else if (palf::INVALID_PROPOSAL_ID == persistent_epoch_id || palf::INVALID_PROPOSAL_ID == epoch_id) {
466
    ret = OB_ERR_UNEXPECTED;
467
    LOG_WARN("epoch id is unexpectedly invalid", KR(ret), K(persistent_epoch_id), K(epoch_id));
468
  } else if (persistent_epoch_id > epoch_id) {
469
    ret = OB_NOT_MASTER;
470
    HBS_LOG_WARN("persistent_epoch_id is greater than epoch_id, which means this server is not leader",
471
        KR(ret), K(persistent_epoch_id), K(epoch_id));
472
  } else if (persistent_epoch_id < epoch_id) {
473
    HBS_LOG_INFO("persistent_epoch_id is smaller than epoch_id", K(persistent_epoch_id), K(epoch_id));
474
    common::ObMySQLTransaction trans;
475
    if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
476
      LOG_WARN("fail to start trans", KR(ret));
477
    } else if (OB_FAIL(ObServiceEpochProxy::check_and_update_service_epoch(
478
        trans,
479
        OB_SYS_TENANT_ID,
480
        ObServiceEpochProxy::HEARTBEAT_SERVICE_EPOCH,
481
        epoch_id))) {
482
      LOG_WARN("fail to check and update service epoch", KR(ret), KP(sql_proxy_), K(epoch_id));
483
    }
484
    if (OB_UNLIKELY(!trans.is_started())) {
485
      LOG_WARN("the transaction is not started");
486
    } else {
487
      int tmp_ret = OB_SUCCESS;
488
      if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
489
        LOG_WARN("fail to commit the transaction", KR(ret), KR(tmp_ret));
490
        ret = OB_SUCC(ret) ? tmp_ret : ret;
491
      }
492
      if (OB_FAIL(ret)) {
493
        LOG_WARN("fail to update __all_service_epoch table", KR(ret));
494
      }
495
    }
496
    // we do not care whether the table is updated successfully
497
    // we always reset all_servers_info_in_table_ and all_servers_hb_info_
498
    SpinWLockGuard guard_for_servers_info(all_servers_info_in_table_rwlock_);
499
    all_servers_info_in_table_.reset();
500
    whitelist_epoch_id_ = palf::INVALID_PROPOSAL_ID;
501
    all_servers_hb_info_.clear();
502
  } else {} // persistent_epoch_id = epoch_id, do nothing.
503
  return ret;
504
}
505
int ObHeartbeatService::process_hb_responses_()
506
{
507
  int ret = OB_SUCCESS;
508
  ObHBResponseArray tmp_hb_responses;
509
  const int64_t now = ObTimeUtility::current_time();
510
  int64_t tmp_hb_responses_epoch_id = palf::INVALID_PROPOSAL_ID;
511
  common::ObArray<common::ObZone> zone_list;
512
  if (OB_UNLIKELY(!is_inited_)) {
513
    ret = OB_NOT_INIT;
514
    LOG_WARN("not init", KR(ret), K(is_inited_));
515
  } else if (OB_ISNULL(sql_proxy_)) {
516
    ret = OB_ERR_UNEXPECTED;
517
    HBS_LOG_ERROR("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
518
  } else if (OB_FAIL(get_and_reset_hb_responses_(tmp_hb_responses, tmp_hb_responses_epoch_id))) {
519
    LOG_WARN("fail to get and reset hb_responses", KR(ret));
520
  } else if (OB_FAIL(ObZoneTableOperation::get_zone_list(*sql_proxy_, zone_list))) {
521
    LOG_WARN("fail to get zone list", KR(ret));
522
  } else {
523
    // Here we do not need to lock all_servers_info_in_table_.
524
    // There are two threads in heartbeat service.
525
    // Prepare_whitelist() will modify all_servers_info_in_table_,
526
    // But prepare_whitelist() and this func. are in the same thread.
527
    // In another thread, send_heartbeat() only reads server_ and server_id_ in all_servers_info_in_table_
528
    int tmp_ret = OB_SUCCESS;
529
    for (int64_t i = 0; i < all_servers_info_in_table_.count(); i++) {
530
      // note: we can only update __all_server table successfully when hb_responses_epoch_id is
531
      //       equal to current heartbeat_service_epoch in __all_service_epoch table.
532
      //       It means that our whitelist (all_servers_info_in_table_) is not outdated.
533
      if (OB_TMP_FAIL(check_server_(
534
          tmp_hb_responses,
535
          all_servers_info_in_table_.at(i),
536
          zone_list,
537
          now,
538
          tmp_hb_responses_epoch_id))) {
539
        LOG_WARN("fail to check server", KR(ret), KR(tmp_ret),
540
            K(all_servers_info_in_table_.at(i)), K(now), K(tmp_hb_responses_epoch_id));
541
      }
542
    }
543
    if (FAILEDx(clear_deleted_servers_in_all_servers_hb_info_())) {
544
      LOG_WARN("fail to clear deleted servers in all_servers_hb_info_", KR(ret));
545
    }
546
  }
547
  return ret;
548
}
549
int ObHeartbeatService::check_server_(
550
    const ObHBResponseArray &hb_responses,
551
    const share::ObServerInfoInTable &server_info_in_table,
552
    const common::ObArray<common::ObZone> &zone_list,
553
    const int64_t now,
554
    const int64_t hb_responses_epoch_id)
555
{
556
  int ret = OB_SUCCESS;
557
  ObServerHBInfo server_hb_info;
558
  if (OB_UNLIKELY(!is_inited_)) {
559
    ret = OB_NOT_INIT;
560
    LOG_WARN("not init", KR(ret), K(is_inited_));
561
  } else if (OB_UNLIKELY(!server_info_in_table.is_valid()
562
      || now <= 0
563
      || palf::INVALID_PROPOSAL_ID == hb_responses_epoch_id)) {
564
    ret = OB_INVALID_ARGUMENT;
565
    LOG_WARN("invalied argument", KR(ret), K(server_info_in_table), K(now), K(hb_responses_epoch_id));
566
  } else if (OB_FAIL(all_servers_hb_info_.get_refactored(server_info_in_table.get_server(), server_hb_info))) {
567
    LOG_WARN("fail to get server_hb_info, or get an old server_hb_info", KR(ret),
568
        K(server_info_in_table.get_server()), K(server_hb_info));
569
    if (OB_HASH_NOT_EXIST == ret) {
570
      if (OB_FAIL(init_server_hb_info_(now, server_info_in_table, server_hb_info))) {
571
        LOG_WARN("fail to init server_hb_info", KR(ret), K(server_info_in_table), K(now));
572
      } else if (OB_FAIL(all_servers_hb_info_.set_refactored(
573
          server_hb_info.get_server(),
574
          server_hb_info,
575
          0 /* flag:  0 shows that not cover existing object. */))) {
576
        LOG_WARN("fail to push an element into all_servers_hb_info_", KR(ret), K(server_hb_info));
577
      } else {}
578
    }
579
  }
580
  if (OB_SUCC(ret)) {
581
    // check whether the heartbeat response from server_info_in_table.get_server() is received
582
    int64_t idx = OB_INVALID_INDEX_INT64;
583
    if (!has_server_exist_in_array_(hb_responses, server_info_in_table.get_server(), idx)) {
584
      //  heartbeat response is not received
585
      if (OB_FAIL(check_server_without_hb_response_(
586
          now,
587
          server_info_in_table,
588
          hb_responses_epoch_id,
589
          server_hb_info))) {
590
        LOG_WARN("fail to check the server without heartbeat response", KR(ret),
591
            K(server_info_in_table), K(now), K(hb_responses_epoch_id));
592
      }
593
    } else if (OB_UNLIKELY(!hb_responses.at(idx).is_valid())) {
594
      ret = OB_ERR_UNEXPECTED;
595
      HBS_LOG_WARN("there exists an invalid element in hb_responses", KR(ret),
596
          K(hb_responses.at(idx)));
597
    } else if (OB_FAIL(check_server_with_hb_response_(
598
        hb_responses.at(idx),
599
        server_info_in_table,
600
        zone_list,
601
        now,
602
        hb_responses_epoch_id,
603
        server_hb_info))) { //  heartbeat response is received
604
      LOG_WARN("fail to check the server with heartbeat response", KR(ret),
605
          K(hb_responses.at(idx)), K(server_info_in_table), K(now), K(hb_responses_epoch_id));
606
    }
607
  }
608
  return ret;
609
}
610
int ObHeartbeatService::check_server_without_hb_response_(
611
    const int64_t now,
612
    const share::ObServerInfoInTable &server_info_in_table,
613
    const int64_t hb_responses_epoch_id,
614
    ObServerHBInfo &server_hb_info)
615
{
616
  int ret = OB_SUCCESS;
617
  if (OB_UNLIKELY(!is_inited_)) {
618
    ret = OB_NOT_INIT;
619
    LOG_WARN("not init", KR(ret), K(is_inited_));
620
  } else if (OB_UNLIKELY(now <= 0
621
      || !server_info_in_table.is_valid()
622
      || !server_hb_info.is_valid()
623
      || palf::INVALID_PROPOSAL_ID == hb_responses_epoch_id)) {
624
    ret = OB_INVALID_ARGUMENT;
625
    LOG_WARN("invalid argument", KR(ret),  K(now), K(server_info_in_table),
626
        K(server_hb_info), K(hb_responses_epoch_id));
627
  } else if (OB_FAIL(update_server_hb_info_(
628
      now,
629
      false, /* hb_response_exists */
630
      server_hb_info))) {
631
    LOG_WARN("fail to update server_hb_info", KR(ret), K(now), K(server_info_in_table),
632
        K(server_hb_info));
633
  } else if ((now - server_hb_info.get_last_hb_time() > GCONF.lease_time
634
          && 0 == server_info_in_table.get_last_offline_time())) {
635
    if (OB_FAIL(update_table_for_online_to_offline_server_(
636
        server_info_in_table,
637
        now,
638
        hb_responses_epoch_id))) {
639
      LOG_WARN("fail to update table for online to offline server",
640
          KR(ret), K(server_info_in_table), K(now), K(hb_responses_epoch_id));
641
    } else {
642
      const ObAddr &server = server_info_in_table.get_server();
643
      ROOTSERVICE_EVENT_ADD("server", "last_offline_time set", "server", server);
644
    }
645
  } else {}
646
  return ret;
647
}
648
int ObHeartbeatService::update_table_for_online_to_offline_server_(
649
    const share::ObServerInfoInTable &server_info_in_table,
650
    const int64_t now,
651
    const int64_t hb_responses_epoch_id)
652
{
653
  int ret = OB_SUCCESS;
654
  common::ObMySQLTransaction trans;
655
  bool is_match = false;
656
  if (OB_UNLIKELY(!is_inited_)) {
657
    ret = OB_NOT_INIT;
658
    LOG_WARN("not init", KR(ret), K(is_inited_));
659
  } else if (OB_ISNULL(sql_proxy_)) {
660
    ret = OB_ERR_UNEXPECTED;
661
    LOG_ERROR("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
662
  } else if (OB_UNLIKELY(!server_info_in_table.is_valid()
663
      || now <= 0
664
      || palf::INVALID_PROPOSAL_ID == hb_responses_epoch_id)) {
665
    ret = OB_INVALID_ARGUMENT;
666
    LOG_WARN("invalid argument", KR(ret), K(server_info_in_table), K(now), K(hb_responses_epoch_id));
667
  } else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
668
    LOG_WARN("fail to start trans", KR(ret));
669
  } else if (OB_FAIL(ObServiceEpochProxy::check_service_epoch_with_trans(
670
      trans,
671
      OB_SYS_TENANT_ID,
672
      ObServiceEpochProxy::HEARTBEAT_SERVICE_EPOCH,
673
      hb_responses_epoch_id,
674
      is_match))) {
675
    LOG_WARN("fail to check and update service epoch", KR(ret), K(hb_responses_epoch_id));
676
  } else if (OB_UNLIKELY(!is_match)) {
677
    ret = OB_NOT_MASTER;
678
    LOG_WARN("hb_responses_epoch_id is not the same as persistent heartbeat service epoch id", KR(ret));
679
  } else {
680
    if (OB_FAIL(ObServerTableOperator::update_table_for_online_to_offline_server(
681
        trans,
682
        server_info_in_table.get_server(),
683
        ObServerStatus::OB_SERVER_DELETING == server_info_in_table.get_status(), /* is_deleting */
684
        now /*last_offline_time */))) {
685
      LOG_WARN("fail to update __all_server table for online to offline server", KR(ret),
686
          K(server_info_in_table), K(now));
687
    }
688
  }
689
  int tmp_ret = OB_SUCCESS;
690
  if (OB_TMP_FAIL(end_trans_and_refresh_server_(server_info_in_table.get_server(),
691
          OB_SUCC(ret), trans))) {
692
    LOG_WARN("failed to end trans", KR(ret), K(tmp_ret), K(server_info_in_table));
693
    ret = OB_SUCC(ret) ? tmp_ret : ret;
694
  }
695
  return ret;
696
}
697

698
int ObHeartbeatService::end_trans_and_refresh_server_(
699
      const ObAddr &server,
700
      const bool commit,
701
      common::ObMySQLTransaction &trans)
702
{
703
  int ret = OB_SUCCESS;
704
  if (OB_UNLIKELY(!server.is_valid())) {
705
    ret = OB_INVALID_ARGUMENT;
706
    LOG_WARN("server is invalid", KR(ret), K(server));
707
  } else if (!trans.is_started()) {
708
    LOG_WARN("the transaction is not started");
709
  } else {
710
    int tmp_ret = OB_SUCCESS;
711
    if (OB_FAIL(trans.end(commit))) {
712
      HBS_LOG_WARN("fail to commit the transaction", KR(ret),
713
          K(server), K(commit));
714
    }
715
    //ignore error of refresh and on server_status_change
716
    if (OB_TMP_FAIL(SVR_TRACER.refresh())) {
717
      LOG_WARN("fail to refresh server tracer", KR(ret), KR(tmp_ret));
718
    }
719
    if (OB_ISNULL(GCTX.root_service_)) {
720
      tmp_ret = OB_ERR_UNEXPECTED;
721
      LOG_WARN("GCTX.root_service_ is null", KR(ret), KR(tmp_ret), KP(GCTX.root_service_));
722
    } else if (OB_TMP_FAIL(GCTX.root_service_->get_status_change_cb().on_server_status_change(server))) {
723
      LOG_WARN("fail to execute on_server_status_change", KR(ret), KR(tmp_ret), K(server));
724
    }
725
  }
726
  return ret;
727
}
728

729
int ObHeartbeatService::init_server_hb_info_(
730
    const int64_t now,
731
    const share::ObServerInfoInTable &server_info_in_table,
732
    ObServerHBInfo &server_hb_info)
733
{
734
  int ret = OB_SUCCESS;
735
  const ObServerStatus::DisplayStatus &display_status = server_info_in_table.get_status();
736
  const int64_t last_offline_time = server_info_in_table.get_last_offline_time();
737
  const ObAddr &server = server_info_in_table.get_server();
738
  int64_t last_hb_time = 0;
739
  ObServerStatus::HeartBeatStatus hb_status = ObServerStatus::OB_HEARTBEAT_MAX;
740

741
  server_hb_info.reset();
742
  if (OB_UNLIKELY(!is_inited_)) {
743
    ret = OB_NOT_INIT;
744
    LOG_WARN("not init", KR(ret), K(is_inited_));
745
  } else if (OB_UNLIKELY(!server_info_in_table.is_valid()
746
      || now - last_offline_time < 0)) {
747
    ret = OB_INVALID_ARGUMENT;
748
    LOG_WARN("invalid argument", KR(ret), K(now), K(server_info_in_table));
749
  } else {
750
    if (0 == last_offline_time) { // online, the status is active or deleting
751
      last_hb_time = now;
752
      hb_status = ObServerStatus::OB_HEARTBEAT_ALIVE;
753
    } else { // last_offline_time > 0, offline, the status is inactive or deleting
754
      last_hb_time = last_offline_time - GCONF.lease_time;
755
      hb_status = ObServerStatus::OB_HEARTBEAT_LEASE_EXPIRED;
756
      if (now - last_hb_time >= GCONF.server_permanent_offline_time) {
757
        hb_status = ObServerStatus::OB_HEARTBEAT_PERMANENT_OFFLINE;
758
      }
759
    }
760
    if (FAILEDx(server_hb_info.init(server, last_hb_time, hb_status))) {
761
      LOG_WARN("fail to init server_hb_info", KR(ret), K(server), K(last_hb_time), K(hb_status));
762
    } else {
763
      LOG_INFO("new server_hb_info is generated", K(server_hb_info));
764
    }
765
  }
766
  return ret;
767
}
768
int ObHeartbeatService::check_server_with_hb_response_(
769
    const ObHBResponse &hb_response,
770
    const share::ObServerInfoInTable &server_info_in_table,
771
    const common::ObArray<common::ObZone> &zone_list,
772
    const int64_t now,
773
    const int64_t hb_responses_epoch_id,
774
    ObServerHBInfo &server_hb_info)
775
{
776
  int ret = OB_SUCCESS;
777
  if (OB_UNLIKELY(!is_inited_)) {
778
    ret = OB_NOT_INIT;
779
    LOG_WARN("not init", KR(ret), K(is_inited_));
780
  } else if (OB_UNLIKELY(now <= 0
781
      || !server_hb_info.is_valid())
782
      || palf::INVALID_PROPOSAL_ID == hb_responses_epoch_id) {
783
    ret = OB_INVALID_ARGUMENT;
784
    LOG_WARN("invalid argument", KR(ret), K(server_info_in_table), K(hb_response),
785
        K(now), K(server_hb_info), K(hb_responses_epoch_id));
786
  } else if (OB_FAIL(check_if_hb_response_can_be_processed_(
787
      hb_response,
788
      server_info_in_table,
789
      zone_list))) {
790
    // the validity of hb_response and server_info_in_table is also checked here
791
    LOG_WARN("hb_response cannot be processed", KR(ret), K(hb_response),
792
        K(server_info_in_table), K(zone_list));
793
  }
794
  if (OB_SUCC(ret)) {
795
    if ((!server_info_in_table.get_with_rootserver() && hb_response.get_server() == GCTX.self_addr())
796
        || 0 != server_info_in_table.get_last_offline_time()
797
        || server_info_in_table.get_build_version() != hb_response.get_build_version()
798
        || server_info_in_table.get_start_service_time() != hb_response.get_start_service_time()) {
799
      if (OB_FAIL(update_table_for_server_with_hb_response_(
800
          hb_response,
801
          server_info_in_table,
802
          hb_responses_epoch_id))) {
803
        LOG_WARN("fail to update table for server with hb_response", KR(ret), K(hb_response),
804
            K(server_info_in_table), K(hb_responses_epoch_id));
805
      }
806
    }
807
    if (FAILEDx(check_and_execute_start_or_stop_server_(
808
        hb_response,
809
        server_hb_info,
810
        server_info_in_table))) {
811
      LOG_WARN("fail to check and execute start or stop server", KR(ret),
812
          K(hb_response), K(server_info_in_table));
813
    }
814
    if (FAILEDx(server_hb_info.set_server_health_status(hb_response.get_server_health_status()))) {
815
        LOG_WARN("fail to set server_health_status", KR(ret), K(hb_response.get_server_health_status()));
816
    } else if (OB_FAIL(update_server_hb_info_(
817
        now,
818
        true, /* hb_response_exists*/
819
        server_hb_info))) {
820
      LOG_WARN("fail to get and update server_hb_info", KR(ret), K(hb_response),
821
          K(server_info_in_table), K(now));
822
    }
823
  }
824
  return ret;
825
}
826
int ObHeartbeatService::check_if_hb_response_can_be_processed_(
827
    const ObHBResponse &hb_response,
828
    const share::ObServerInfoInTable &server_info_in_table,
829
    const common::ObArray<common::ObZone> &zone_list) const
830
{
831
  int ret = OB_SUCCESS;
832
  if (OB_UNLIKELY(!is_inited_)) {
833
    ret = OB_NOT_INIT;
834
    LOG_WARN("not init", KR(ret), K(is_inited_));
835
  } else if (OB_UNLIKELY(!server_info_in_table.is_valid()
836
      || !hb_response.is_valid()
837
      || server_info_in_table.get_server() != hb_response.get_server())) {
838
    ret = OB_INVALID_ARGUMENT;
839
    LOG_WARN("invalid argument", KR(ret), K(server_info_in_table), K(hb_response));
840
  } else if (server_info_in_table.get_zone() != hb_response.get_zone()) {
841
    ret = OB_SERVER_ZONE_NOT_MATCH;
842
    HBS_LOG_ERROR("server's zone does not match", KR(ret), K(server_info_in_table.get_zone()),
843
        K(hb_response.get_zone()));
844
  } else if (server_info_in_table.get_sql_port() != hb_response.get_sql_port()) {
845
    ret = OB_ERR_UNEXPECTED;
846
    HBS_LOG_ERROR("unexpexted error: server's sql port has changed!", KR(ret),
847
        K(server_info_in_table), K(hb_response));
848
  } else {
849
    bool zone_exists = false;
850
    for (int64_t i = 0; !zone_exists && i < zone_list.count(); i++) {
851
      if (zone_list.at(i) == hb_response.get_zone()) {
852
        zone_exists = true;
853
      }
854
    }
855
    if (OB_UNLIKELY(!zone_exists)) {
856
      ret = OB_ZONE_INFO_NOT_EXIST;
857
      HBS_LOG_ERROR("zone info not exist", KR(ret), K(hb_response.get_zone()), K(zone_list));
858
    }
859
  }
860
  return ret;
861
}
862
int ObHeartbeatService::check_and_execute_start_or_stop_server_(
863
    const ObHBResponse &hb_response,
864
    const ObServerHBInfo &server_hb_info,
865
    const share::ObServerInfoInTable &server_info_in_table)
866
{
867
  int ret = OB_SUCCESS;
868
  char ip[OB_MAX_SERVER_ADDR_SIZE] = "";
869
  const ObAddr &server = hb_response.get_server();
870
  if (OB_UNLIKELY(!is_inited_)) {
871
    ret = OB_NOT_INIT;
872
    LOG_WARN("not init", KR(ret), K(is_inited_));
873
  } else if (OB_ISNULL(sql_proxy_)) {
874
    ret = OB_ERR_UNEXPECTED;
875
    HBS_LOG_ERROR("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
876
  } else if (OB_UNLIKELY(!hb_response.is_valid()
877
      || !server_info_in_table.is_valid()
878
      || hb_response.get_server() != server_info_in_table.get_server()
879
      || !server.is_valid()
880
      || !server.ip_to_string(ip, sizeof(ip)))) {
881
    ret = OB_INVALID_ARGUMENT;
882
    LOG_WARN("invalid argument", KR(ret), K(server), K(hb_response), K(server_info_in_table));
883
  } else {
884
    const ObServerHealthStatus &health_status = hb_response.get_server_health_status();
885
    bool need_start_or_stop_server = false;
886
    bool is_start = false;
887
    int64_t affected_rows = 0;
888
    ObSqlString sql;
889
    if (server_hb_info.get_server_health_status() != hb_response.get_server_health_status()) {
890
      if (0 == server_info_in_table.get_stop_time() && !health_status.is_healthy()) {
891
        is_start = false;
892
        need_start_or_stop_server = true;
893
      }
894
      if (0 != server_info_in_table.get_stop_time() && health_status.is_healthy()) {
895
        is_start = true;
896
        need_start_or_stop_server = true;
897
      }
898
    }
899
    if (OB_SUCC(ret) && need_start_or_stop_server) {
900
      if (is_start) {
901
        ROOTSERVICE_EVENT_ADD("server", "disk error repaired, start server", "server", server);
902
        HBS_LOG_INFO("disk error repaired, try to start server", K(server), K(health_status));
903
        ret = sql.assign_fmt("ALTER SYSTEM START SERVER '%s:%d'", ip, server.get_port());
904
      } else {
905
        ROOTSERVICE_EVENT_ADD("server", "disk error, stop server", "server", server);
906
        HBS_LOG_INFO("disk error, try to stop server", K(server), K(health_status));
907
        ret = sql.assign_fmt("ALTER SYSTEM STOP SERVER '%s:%d'", ip, server.get_port());
908
      }
909
      if (OB_FAIL(ret)) {
910
        LOG_WARN("fail to assign fmt", KR(ret), K(server), K(is_start));
911
      } else if (OB_FAIL(sql_proxy_->write(sql.ptr(), affected_rows))) {
912
        LOG_WARN("fail to write sql", KR(ret),K(server), K(sql));
913
      } else {
914
        HBS_LOG_INFO("start or stop server successfully", K(server), K(is_start));
915
      }
916
    }
917
  }
918
  return ret;
919
}
920
int ObHeartbeatService::update_server_hb_info_(
921
      const int64_t now,
922
      const bool hb_response_exists,
923
      ObServerHBInfo &server_hb_info)
924
{
925
  int ret = OB_SUCCESS;
926
  if (OB_UNLIKELY(!is_inited_)) {
927
    ret = OB_NOT_INIT;
928
    LOG_WARN("not init", KR(ret), K(is_inited_));
929
  } else if (OB_UNLIKELY(now <= 0
930
      || !server_hb_info.is_valid())) {
931
    ret = OB_INVALID_ARGUMENT;
932
    LOG_WARN("invalid argument", KR(ret), K(now), K(server_hb_info));
933
  } else {
934
    const ObServerStatus::HeartBeatStatus& hb_status = server_hb_info.get_hb_status();
935
    const ObAddr& server = server_hb_info.get_server();
936
    // step 1: update last_hb_time
937
    if (hb_response_exists) {
938
      server_hb_info.set_last_hb_time(now);
939
    }
940
    int64_t time_diff = now - server_hb_info.get_last_hb_time();
941
    // step 2: update hb_status
942
    if (time_diff >= GCONF.server_permanent_offline_time
943
        && ObServerStatus::OB_HEARTBEAT_PERMANENT_OFFLINE != hb_status) {
944
      server_hb_info.set_hb_status(ObServerStatus::OB_HEARTBEAT_PERMANENT_OFFLINE);
945
      ROOTSERVICE_EVENT_ADD("server", "permanent_offline", "server", server);
946
      HBS_LOG_INFO("the server becomes permanent offline", K(server), K(time_diff));
947
    } else if (time_diff >= GCONF.lease_time
948
        && ObServerStatus::OB_HEARTBEAT_LEASE_EXPIRED != hb_status
949
        && ObServerStatus::OB_HEARTBEAT_PERMANENT_OFFLINE != hb_status) {
950
      server_hb_info.set_hb_status(ObServerStatus::OB_HEARTBEAT_LEASE_EXPIRED);
951
      ROOTSERVICE_EVENT_ADD("server", "lease_expire", "server", server);
952
      HBS_LOG_INFO("the server's lease becomes expired'", K(server), K(time_diff));
953
    } else if (time_diff < GCONF.lease_time
954
        && ObServerStatus::OB_HEARTBEAT_ALIVE != hb_status) {
955
      server_hb_info.set_hb_status(ObServerStatus::OB_HEARTBEAT_ALIVE);
956
      ROOTSERVICE_EVENT_ADD("server", "online", "server", server);
957
      HBS_LOG_INFO("the server's lease becomes online'", K(server), K(time_diff));
958
    } else {}
959
    // step 3: update server_hb_info
960
    if (FAILEDx(all_servers_hb_info_.set_refactored(
961
        server_hb_info.get_server(),
962
        server_hb_info,
963
        1 /* flag:  0 shows that not cover existing object. */))) {
964
      LOG_WARN("fail to push an element into all_servers_hb_info_", KR(ret), K(server_hb_info));
965
    }
966
  }
967
  return ret;
968
}
969

970
int ObHeartbeatService::clear_deleted_servers_in_all_servers_hb_info_()
971
{
972
  int ret = OB_SUCCESS;
973
  ObAddr server;
974
  hash::ObHashMap<ObAddr, ObServerHBInfo>::iterator iter = all_servers_hb_info_.begin();
975
  if (OB_UNLIKELY(!is_inited_)) { // return false
976
    ret = OB_NOT_INIT;
977
    LOG_WARN("not init", KR(ret), K(is_inited_));
978
  } else {
979
    while (OB_SUCC(ret) && iter != all_servers_hb_info_.end()) {
980
      int64_t idx = OB_INVALID_INDEX_INT64;
981
      server.reset();
982
      server = iter->first;
983
      iter++;
984
      if (!has_server_exist_in_array_(all_servers_info_in_table_, server, idx)) {
985
        HBS_LOG_INFO("the server is deleted, it can be removed from all_servers_hb_info", K(server));
986
        if (OB_FAIL(all_servers_hb_info_.erase_refactored(server))) {
987
          LOG_WARN("fail to remove the server from all_servers_hb_info", KR(ret), K(server));
988
        }
989
      }
990
    }
991
  }
992
  return ret;
993
}
994

995
int ObHeartbeatService::update_table_for_server_with_hb_response_(
996
    const ObHBResponse &hb_response,
997
    const share::ObServerInfoInTable &server_info_in_table,
998
    const int64_t hb_responses_epoch_id)
999
{
1000
  int ret = OB_SUCCESS;
1001
  common::ObMySQLTransaction trans;
1002
  bool is_match = false;
1003
  if (OB_UNLIKELY(!is_inited_)) {
1004
    ret = OB_NOT_INIT;
1005
    LOG_WARN("not init", KR(ret), K(is_inited_));
1006
  } else if (OB_ISNULL(sql_proxy_)) {
1007
    ret = OB_ERR_UNEXPECTED;
1008
    HBS_LOG_ERROR("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
1009
  } else if (OB_UNLIKELY(!hb_response.is_valid()
1010
      || !server_info_in_table.is_valid()
1011
      || hb_response.get_server() != server_info_in_table.get_server()
1012
      || palf::INVALID_PROPOSAL_ID == hb_responses_epoch_id)) {
1013
    ret = OB_INVALID_ARGUMENT;
1014
    // return false
1015
    LOG_WARN("invalid argument", KR(ret), K(hb_response), K(server_info_in_table), K(hb_responses_epoch_id));
1016
  } else if (OB_FAIL(trans.start(sql_proxy_, OB_SYS_TENANT_ID))) {
1017
    LOG_WARN("fail to start trans", KR(ret));
1018
  } else if (OB_FAIL(ObServiceEpochProxy::check_service_epoch_with_trans(
1019
      trans,
1020
      OB_SYS_TENANT_ID,
1021
      ObServiceEpochProxy::HEARTBEAT_SERVICE_EPOCH,
1022
      hb_responses_epoch_id,
1023
      is_match))) {
1024
    LOG_WARN("fail to check heartbeat service epoch", KR(ret), K(hb_responses_epoch_id));
1025
  } else if (OB_UNLIKELY(!is_match)) {
1026
    ret = OB_NOT_MASTER;
1027
    LOG_WARN("hb_responses_epoch_id is not the same as persistent heartbeat service epoch id", KR(ret));
1028
  } else {
1029
    const ObAddr &server = server_info_in_table.get_server();
1030
    // *********  check with_rootserver ********* //
1031
    if (OB_SUCC(ret)
1032
        && !server_info_in_table.get_with_rootserver() && server == GCTX.self_addr()) {
1033
      if (OB_FAIL(ObServerTableOperator::update_with_rootserver(trans, server))) {
1034
        HBS_LOG_WARN("fail to update_with_rootserver", KR(ret), K(server));
1035
      } else {
1036
        ROOTSERVICE_EVENT_ADD("server", "rootserver", "server", server);
1037
        HBS_LOG_INFO("server becomes rootserver",  K(server));
1038
      }
1039
    }
1040

1041
    // ********* check if offline to online, then update last_offline_time and status ********* //
1042
    if (OB_SUCC(ret) && 0 != server_info_in_table.get_last_offline_time()) {
1043
      if (OB_FAIL(ObServerTableOperator::update_table_for_offline_to_online_server(
1044
          trans,
1045
          ObServerStatus::OB_SERVER_DELETING == server_info_in_table.get_status(), /* is_deleting */
1046
          server))) {
1047
        HBS_LOG_WARN("fail to reset last_offline_time", KR(ret), K(server));
1048
      } else {
1049
        ROOTSERVICE_EVENT_ADD("server", "last_offline_time reset", "server", server);
1050
        HBS_LOG_INFO("server becomes online",  K(server));
1051
      }
1052
    }
1053
    // *********  check build_version ********* //
1054
    if (OB_SUCC(ret) && server_info_in_table.get_build_version() != hb_response.get_build_version()) {
1055
      if (OB_FAIL(ObServerTableOperator::update_build_version(
1056
          trans,
1057
          server,
1058
          server_info_in_table.get_build_version(), // old value
1059
          hb_response.get_build_version()))) {
1060
        HBS_LOG_WARN("fail to update build_version", KR(ret),
1061
            K(server_info_in_table), K(hb_response));
1062
      } else {
1063
        ROOTSERVICE_EVENT_ADD("server", hb_response.get_build_version().ptr(), "server", server);
1064
        HBS_LOG_INFO("build_version is updated", K(server),
1065
            K(hb_response.get_build_version()), K(hb_response.get_build_version()));
1066
      }
1067
    }
1068
    // *********  check start_service_time ********* //
1069
    if (OB_SUCC(ret) && server_info_in_table.get_start_service_time() != hb_response.get_start_service_time()) {
1070
      if (OB_FAIL(ObServerTableOperator::update_start_service_time(
1071
          trans,
1072
          server,
1073
          server_info_in_table.get_start_service_time(), // old value
1074
          hb_response.get_start_service_time()))) {
1075
        HBS_LOG_WARN("fail to update start service time", KR(ret), K(server),
1076
            K(server_info_in_table.get_start_service_time()), K(hb_response.get_start_service_time()));
1077
      } else {
1078
        ROOTSERVICE_EVENT_ADD("server", "start_service", "server", server);
1079
        HBS_LOG_INFO("start service time is updated", K(server),
1080
            K(server_info_in_table.get_start_service_time()), K(hb_response.get_start_service_time()));
1081
      }
1082
    }
1083
  }
1084
  int tmp_ret = OB_SUCCESS;
1085
  if (OB_TMP_FAIL(end_trans_and_refresh_server_(server_info_in_table.get_server(),
1086
          OB_SUCC(ret), trans))) {
1087
    LOG_WARN("failed to end trans", KR(ret), K(tmp_ret), K(server_info_in_table));
1088
    ret = OB_SUCC(ret) ? tmp_ret : ret;
1089
  }
1090
  return ret;
1091
}
1092
template <typename T>
1093
bool ObHeartbeatService::has_server_exist_in_array_(
1094
    const ObIArray<T> &array,
1095
    const common::ObAddr &server,
1096
    int64_t &idx)
1097
{
1098
  bool bret = false;
1099
  idx = OB_INVALID_INDEX_INT64;
1100
  for (int64_t i = 0; i < array.count(); i++) {
1101
    if (server == array.at(i).get_server()) {
1102
      bret = true;
1103
      idx = i;
1104
      break;
1105
    }
1106
  }
1107
  return bret;
1108
}
1109
}
1110
}
1111

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.