13
#define USING_LOG_PREFIX SERVER
15
#include "observer/ob_lease_state_mgr.h"
16
#include "share/ob_common_rpc_proxy.h"
17
#include "share/ob_global_merge_table_operator.h"
18
#include "share/ob_zone_merge_table_operator.h"
19
#include "share/ob_zone_merge_info.h"
20
#include "share/rc/ob_tenant_base.h"
21
#include "observer/ob_server.h"
22
#include "storage/compaction/ob_tenant_tablet_scheduler.h"
23
#include "storage/tx_storage/ob_ls_service.h"
24
#ifdef OB_BUILD_TDE_SECURITY
25
#include "share/ob_master_key_getter.h"
30
using namespace common;
35
ObRefreshSchemaStatusTimerTask::ObRefreshSchemaStatusTimerTask()
38
void ObRefreshSchemaStatusTimerTask::destroy()
42
void ObRefreshSchemaStatusTimerTask::runTimerTask()
45
ObSchemaStatusProxy *schema_status_proxy = GCTX.schema_status_proxy_;
46
if (OB_ISNULL(schema_status_proxy)) {
47
ret = OB_ERR_UNEXPECTED;
48
LOG_WARN("get invalid schema status proxy", KR(ret));
49
} else if (OB_FAIL(schema_status_proxy->load_refresh_schema_status())) {
50
LOG_WARN("fail to load refresh schema status", KR(ret));
52
LOG_INFO("refresh schema status success");
58
ObLeaseStateMgr::ObLeaseStateMgr()
59
: inited_(false), stopped_(false), lease_response_(), lease_expire_time_(0),
60
hb_timer_(), cluster_info_timer_(), merge_timer_(), rs_mgr_(NULL), rpc_proxy_(NULL), heartbeat_process_(NULL),
61
hb_(), renew_timeout_(RENEW_TIMEOUT), ob_service_(NULL),
62
baseline_schema_version_(0), heartbeat_expire_time_(0)
66
ObLeaseStateMgr::~ObLeaseStateMgr()
71
void ObLeaseStateMgr::destroy()
76
cluster_info_timer_.destroy();
77
merge_timer_.destroy();
80
heartbeat_process_ = NULL;
86
int ObLeaseStateMgr::init(
87
ObCommonRpcProxy *rpc_proxy, ObRsMgr *rs_mgr,
88
IHeartBeatProcess *heartbeat_process,
90
const int64_t renew_timeout)
95
LOG_WARN("init twice", K(ret));
96
} else if (NULL == rpc_proxy || NULL == rs_mgr
97
|| NULL == heartbeat_process || renew_timeout < 0) {
98
ret = OB_INVALID_ARGUMENT;
99
LOG_WARN("invalid argument", KP(rpc_proxy), KP(rs_mgr),
100
KP(heartbeat_process), K(renew_timeout), K(ret));
101
} else if (!rs_mgr->is_inited()) {
102
ret = OB_INVALID_ARGUMENT;
103
LOG_WARN("rs_mgr not inited", "rs_mgr inited", rs_mgr->is_inited(), K(ret));
104
} else if (OB_FAIL(hb_timer_.init("LeaseHB"))) {
105
LOG_WARN("hb_timer_ init failed", KR(ret));
106
} else if (OB_FAIL(cluster_info_timer_.init("ClusterTimer"))) {
107
LOG_WARN("cluster_info_timer_ init failed", KR(ret));
108
} else if (OB_FAIL(merge_timer_.init("MergeTimer"))) {
109
LOG_WARN("merge_timer_ init failed", KR(ret));
112
rpc_proxy_ = rpc_proxy;
113
heartbeat_process_ = heartbeat_process;
114
renew_timeout_ = renew_timeout;
115
ob_service_ = &service;
116
if (OB_FAIL(hb_.init(this))) {
117
LOG_WARN("hb_.init failed", K(ret));
125
int ObLeaseStateMgr::register_self()
127
int ret = OB_SUCCESS;
128
LOG_INFO("begin register_self");
131
LOG_WARN("not init", K(ret));
133
if (OB_FAIL(do_renew_lease())) {
134
LOG_WARN("do_renew_lease failed", K(ret));
137
LOG_INFO("start_heartbeat anyway");
139
if (OB_FAIL(start_heartbeat())) {
140
LOG_ERROR("start_heartbeat failed", K(ret));
146
int ObLeaseStateMgr::register_self_busy_wait()
148
int ret = OB_SUCCESS;
150
ObCurTraceId::init(GCONF.self_addr_);
151
LOG_INFO("begin register_self_busy_wait");
154
LOG_WARN("not init", K(ret));
157
if (OB_FAIL(try_report_sys_ls())) {
158
LOG_WARN("fail to try report sys log stream");
159
} else if (OB_FAIL(do_renew_lease())) {
160
LOG_WARN("fail to do_renew_lease", KR(ret));
164
LOG_WARN("register failed, will try again", KR(ret),
165
"retry latency", REGISTER_TIME_SLEEP / 1000000);
166
ob_usleep(static_cast<useconds_t>(REGISTER_TIME_SLEEP));
167
int tmp_ret = OB_SUCCESS;
168
if (OB_SUCCESS != (tmp_ret = rs_mgr_->renew_master_rootserver())) {
169
LOG_WARN("renew_master_rootserver failed", K(tmp_ret));
170
if (OB_SUCCESS != (tmp_ret = ob_service_->refresh_sys_tenant_ls())) {
171
LOG_WARN("fail to refresh core partition", K(tmp_ret));
174
LOG_INFO("renew_master_rootserver successfully, try register again");
177
LOG_INFO("register self successfully!");
178
if (OB_FAIL(start_heartbeat())) {
179
LOG_ERROR("start_heartbeat failed", K(ret));
187
LOG_WARN("fail to register_self_busy_wait", KR(ret));
189
LOG_INFO("end register_self_busy_wait");
193
int ObLeaseStateMgr::try_report_sys_ls()
195
int ret = OB_SUCCESS;
196
if (OB_UNLIKELY(!inited_)) {
198
LOG_WARN("not init", KR(ret));
199
} else if (OB_UNLIKELY(stopped_)) {
200
ret = OB_SERVER_IS_STOPPING;
201
LOG_WARN("lease manager is stopped", KR(ret));
203
const uint64_t tenant_id = OB_SYS_TENANT_ID;
204
const ObLSID ls_id = SYS_LS;
205
MTL_SWITCH(tenant_id) {
206
bool ls_exist = false;
207
ObLSService *ls_svr = NULL;
208
if (OB_ISNULL(ls_svr = MTL(ObLSService*))) {
209
ret = OB_ERR_UNEXPECTED;
210
LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id));
211
} else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) {
212
LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id));
213
} else if (!ls_exist) {
216
share::ObLSTableOperator *lst_operator = GCTX.lst_operator_;
217
share::ObLSReplica ls_replica;
218
if (OB_ISNULL(ob_service_) || OB_ISNULL(lst_operator)) {
219
ret = OB_ERR_UNEXPECTED;
220
LOG_WARN("ob_service or lst_operator ptr is null",
221
KR(ret), KP(ob_service_), KP(lst_operator));
222
} else if (OB_FAIL(ob_service_->fill_ls_replica(
223
tenant_id, ls_id, ls_replica))) {
224
LOG_WARN("fail to fill log stream replica", KR(ret),
225
K(tenant_id), K(ls_replica));
226
} else if (OB_FAIL(lst_operator->update(ls_replica, false))) {
227
LOG_WARN("fail to report sys log stream", KR(ret), K(ls_replica));
228
} else if (OB_FAIL(ob_service_->submit_ls_update_task(tenant_id, ls_id))) {
229
LOG_WARN("fail to add async update task", KR(ret), K(tenant_id), K(ls_id));
231
LOG_INFO("try report sys log stream succeed");
235
if (OB_TENANT_NOT_IN_SERVER == ret) {
238
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
245
int ObLeaseStateMgr::renew_lease()
247
int ret = OB_SUCCESS;
248
if (OB_NOT_NULL(THE_TRACE)) {
251
NG_TRACE(renew_lease_begin);
252
const int64_t start = ObTimeUtility::fast_current_time();
255
LOG_WARN("not init", K(ret));
256
} else if (stopped_) {
257
ret = OB_SERVER_IS_STOPPING;
258
LOG_WARN("lease manager is stopped", K(ret));
260
if (OB_FAIL(do_renew_lease())) {
261
LOG_WARN("do_renew_lease failed", K(ret));
262
NG_TRACE(renew_master_rs_begin);
263
if (OB_FAIL(rs_mgr_->renew_master_rootserver())) {
264
LOG_WARN("renew_master_rootserver failed", K(ret));
265
int tmp_ret = OB_SUCCESS;
266
if (OB_SUCCESS != (tmp_ret = ob_service_->refresh_sys_tenant_ls())) {
267
LOG_WARN("fail to refresh core partition", K(tmp_ret));
270
NG_TRACE(renew_lease_end);
271
LOG_INFO("renew_master_rootserver successfully, try renew lease again");
272
if (OB_FAIL(try_report_sys_ls())) {
273
LOG_WARN("fail to try report all core table partition");
274
} else if (OB_FAIL(do_renew_lease())) {
275
LOG_WARN("try do_renew_lease again failed, will do it no next heartbeat", K(ret));
281
LOG_DEBUG("renew_lease successfully!");
283
NG_TRACE_EXT(renew_lease_end, OB_ID(ret), ret);
284
const int64_t cost = ObTimeUtility::fast_current_time() - start;
285
if (OB_UNLIKELY(cost > DELAY_TIME || OB_FAIL(ret))
286
&& OB_NOT_NULL(THE_TRACE)) {
287
FORCE_PRINT_TRACE(THE_TRACE, "[slow heartbeat]");
289
const bool repeat = false;
290
if (OB_FAIL(hb_timer_.schedule(hb_, DELAY_TIME, repeat))) {
291
LOG_WARN("schedule failed", LITERAL_K(DELAY_TIME), K(repeat), K(ret));
297
int ObLeaseStateMgr::start_heartbeat()
299
int ret = OB_SUCCESS;
302
LOG_WARN("not init", K(ret));
304
const bool repeat = false;
305
if (OB_FAIL(hb_timer_.schedule(hb_, DELAY_TIME, repeat))) {
306
LOG_WARN("schedule failed", LITERAL_K(DELAY_TIME), K(repeat), K(ret));
312
#ifdef OB_BUILD_TDE_SECURITY
313
int ObLeaseStateMgr::update_master_key_info(
314
const share::ObLeaseResponse &lease_response)
316
int ret = OB_SUCCESS;
317
if (OB_UNLIKELY(!inited_)) {
319
LOG_WARN("not init", KR(ret));
321
const common::ObIArray<std::pair<uint64_t, ObLeaseResponse::TLRpKeyVersion> > &master_key_array
322
= lease_response.tenant_max_key_version_;
323
common::ObArray<std::pair<uint64_t, uint64_t> > max_key_version_array;
324
common::ObArray<std::pair<uint64_t, uint64_t> > max_available_key_version_array;
325
for (int64_t i = 0; OB_SUCC(ret) && i < master_key_array.count(); ++i) {
326
const std::pair<uint64_t, ObLeaseResponse::TLRpKeyVersion> &key = master_key_array.at(i);
327
std::pair<uint64_t, uint64_t> max_key_version
328
= std::pair<uint64_t, uint64_t>(key.first, key.second.max_key_version_);
329
std::pair<uint64_t, uint64_t> available_key_version
330
= std::pair<uint64_t, uint64_t>(key.first, key.second.max_available_key_version_);
331
if (OB_FAIL(max_key_version_array.push_back(max_key_version))) {
332
LOG_WARN("fail to push back", KR(ret));
334
if (available_key_version.second > 0) {
335
if (OB_FAIL(max_available_key_version_array.push_back(available_key_version))) {
336
LOG_WARN("fail to push back", KR(ret));
342
if (OB_FAIL(ObMasterKeyGetter::instance().got_versions(
343
max_key_version_array))) {
344
LOG_WARN("fail to update got versions", KR(ret));
345
} else if (OB_FAIL(ObMasterKeyGetter::instance().update_active_versions(
346
max_available_key_version_array))) {
347
LOG_WARN("fail to update active versions", KR(ret));
355
int ObLeaseStateMgr::do_renew_lease()
357
int ret = OB_SUCCESS;
358
ObLeaseRequest lease_request;
359
ObLeaseResponse lease_response;
361
NG_TRACE(do_renew_lease_begin);
364
LOG_WARN("not init", K(ret));
365
} else if (OB_FAIL(heartbeat_process_->init_lease_request(lease_request))) {
366
LOG_WARN("init lease request failed", K(ret));
367
} else if (OB_FAIL(rs_mgr_->get_master_root_server(rs_addr))) {
368
LOG_WARN("get master root service failed", K(ret));
370
NG_TRACE(send_heartbeat_begin);
371
ret = rpc_proxy_->to(rs_addr).timeout(renew_timeout_)
372
.renew_lease(lease_request, lease_response);
373
if (lease_response.lease_expire_time_ > 0) {
375
lease_response.heartbeat_expire_time_ = lease_response.lease_expire_time_;
377
NG_TRACE_EXT(send_heartbeat_end, OB_ID(ret), ret);
379
int tmp_ret = OB_SUCCESS;
380
if (OB_UNLIKELY(!lease_response.is_valid())) {
381
tmp_ret = OB_INVALID_ARGUMENT;
382
LOG_WARN("invalid argument", KR(tmp_ret), K(lease_response));
385
if (baseline_schema_version_ < lease_response.baseline_schema_version_) {
386
if (OB_SUCCESS != (tmp_ret = GCTX.schema_service_->update_baseline_schema_version(
387
OB_SYS_TENANT_ID, lease_response.baseline_schema_version_))) {
388
LOG_WARN("fail to update baseline schema version", KR(ret), KR(tmp_ret), K(lease_response));
390
LOG_INFO("update baseline schema version", KR(ret), "old_version", baseline_schema_version_,
391
"new_version", lease_response.baseline_schema_version_);
392
baseline_schema_version_ = lease_response.baseline_schema_version_;
395
#ifdef OB_BUILD_TDE_SECURITY
397
if (OB_SUCCESS != (tmp_ret = update_master_key_info(lease_response))) {
398
LOG_WARN("fail to update master key info", KR(ret), K(tmp_ret), K(lease_response));
401
NG_TRACE_EXT(update_master_key_info, OB_ID(ret), tmp_ret);
403
const int64_t now = ObTimeUtility::current_time();
404
if (OB_SUCC(ret) && lease_response.heartbeat_expire_time_ > now) {
405
LOG_DEBUG("renew_lease from master_rs successfully", K(rs_addr));
406
if (OB_FAIL(set_lease_response(lease_response))) {
407
LOG_WARN("fail to set lease response", K(ret));
408
} else if (OB_FAIL(heartbeat_process_->do_heartbeat_event(lease_response_))) {
409
LOG_WARN("fail to process new lease info", K_(lease_response), K(ret));
411
NG_TRACE_EXT(do_heartbeat_event, OB_ID(ret), ret);
414
LOG_WARN("can't get lease from rs", K(rs_addr), K(ret));
417
NG_TRACE_EXT(do_renew_lease_end, OB_ID(ret), ret);
421
ObLeaseStateMgr::HeartBeat::HeartBeat()
422
: inited_(false), lease_state_mgr_(NULL)
426
ObLeaseStateMgr::HeartBeat::~HeartBeat()
430
int ObLeaseStateMgr::HeartBeat::init(ObLeaseStateMgr *lease_state_mgr)
432
int ret = OB_SUCCESS;
435
LOG_WARN("init twice", K(ret));
436
} else if (NULL == lease_state_mgr) {
437
ret = OB_INVALID_ARGUMENT;
438
LOG_WARN("invalid argument", KP(lease_state_mgr), K(ret));
440
lease_state_mgr_ = lease_state_mgr;
446
void ObLeaseStateMgr::HeartBeat::runTimerTask()
448
int ret = OB_SUCCESS;
451
LOG_WARN("not init", K(ret));
452
} else if (OB_FAIL(lease_state_mgr_->renew_lease())) {
453
LOG_WARN("fail to renew lease", K(ret));