oceanbase

Форк
0
/
ob_lease_state_mgr.cpp 
458 строк · 15.1 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "observer/ob_lease_state_mgr.h"
16
#include "share/ob_common_rpc_proxy.h"
17
#include "share/ob_global_merge_table_operator.h"
18
#include "share/ob_zone_merge_table_operator.h"
19
#include "share/ob_zone_merge_info.h"
20
#include "share/rc/ob_tenant_base.h"
21
#include "observer/ob_server.h"
22
#include "storage/compaction/ob_tenant_tablet_scheduler.h"
23
#include "storage/tx_storage/ob_ls_service.h"
24
#ifdef OB_BUILD_TDE_SECURITY
25
#include "share/ob_master_key_getter.h"
26
#endif
27

28
namespace oceanbase
29
{
30
using namespace common;
31
using namespace obrpc;
32
using namespace share;
33
namespace observer
34
{
35
ObRefreshSchemaStatusTimerTask::ObRefreshSchemaStatusTimerTask()
36
{}
37

38
void ObRefreshSchemaStatusTimerTask::destroy()
39
{
40
}
41

42
void ObRefreshSchemaStatusTimerTask::runTimerTask()
43
{
44
  int ret = OB_SUCCESS;
45
  ObSchemaStatusProxy *schema_status_proxy = GCTX.schema_status_proxy_;
46
  if (OB_ISNULL(schema_status_proxy)) {
47
    ret = OB_ERR_UNEXPECTED;
48
    LOG_WARN("get invalid schema status proxy", KR(ret));
49
  } else if (OB_FAIL(schema_status_proxy->load_refresh_schema_status())) {
50
    LOG_WARN("fail to load refresh schema status", KR(ret));
51
  } else {
52
    LOG_INFO("refresh schema status success");
53
  }
54
}
55

56
//////////////////////////////////////
57

58
ObLeaseStateMgr::ObLeaseStateMgr()
59
  : inited_(false), stopped_(false), lease_response_(), lease_expire_time_(0),
60
    hb_timer_(), cluster_info_timer_(), merge_timer_(), rs_mgr_(NULL), rpc_proxy_(NULL), heartbeat_process_(NULL),
61
    hb_(), renew_timeout_(RENEW_TIMEOUT), ob_service_(NULL),
62
    baseline_schema_version_(0), heartbeat_expire_time_(0)
63
{
64
}
65

66
ObLeaseStateMgr::~ObLeaseStateMgr()
67
{
68
  destroy();
69
}
70

71
void ObLeaseStateMgr::destroy()
72
{
73
  if (inited_) {
74
    stopped_ = false;
75
    hb_timer_.destroy();
76
    cluster_info_timer_.destroy();
77
    merge_timer_.destroy();
78
    rs_mgr_ = NULL;
79
    rpc_proxy_ = NULL;
80
    heartbeat_process_ = NULL;
81
    inited_ = false;
82
  }
83
}
84

85
// ObRsMgr should be inited by local config before call ObLeaseStateMgr.init
86
int ObLeaseStateMgr::init(
87
    ObCommonRpcProxy *rpc_proxy, ObRsMgr *rs_mgr,
88
    IHeartBeatProcess *heartbeat_process,
89
    ObService &service,
90
    const int64_t renew_timeout) //default RENEW_TIMEOUT = 2s
91
{
92
  int ret = OB_SUCCESS;
93
  if (inited_) {
94
    ret = OB_INIT_TWICE;
95
    LOG_WARN("init twice", K(ret));
96
  } else if (NULL == rpc_proxy || NULL == rs_mgr
97
      || NULL == heartbeat_process || renew_timeout < 0) {
98
    ret = OB_INVALID_ARGUMENT;
99
    LOG_WARN("invalid argument", KP(rpc_proxy), KP(rs_mgr),
100
        KP(heartbeat_process), K(renew_timeout), K(ret));
101
  } else if (!rs_mgr->is_inited()) {
102
    ret = OB_INVALID_ARGUMENT;
103
    LOG_WARN("rs_mgr not inited", "rs_mgr inited", rs_mgr->is_inited(), K(ret));
104
  } else if (OB_FAIL(hb_timer_.init("LeaseHB"))) {
105
    LOG_WARN("hb_timer_ init failed", KR(ret));
106
  } else if (OB_FAIL(cluster_info_timer_.init("ClusterTimer"))) {
107
    LOG_WARN("cluster_info_timer_ init failed", KR(ret));
108
  } else if (OB_FAIL(merge_timer_.init("MergeTimer"))) {
109
    LOG_WARN("merge_timer_ init failed", KR(ret));
110
  } else {
111
    rs_mgr_ = rs_mgr;
112
    rpc_proxy_ = rpc_proxy;
113
    heartbeat_process_ = heartbeat_process;
114
    renew_timeout_ = renew_timeout;
115
    ob_service_ = &service;
116
    if (OB_FAIL(hb_.init(this))) {
117
      LOG_WARN("hb_.init failed", K(ret));
118
    } else {
119
      inited_ = true;
120
    }
121
  }
122
  return ret;
123
}
124

125
int ObLeaseStateMgr::register_self()
126
{
127
  int ret = OB_SUCCESS;
128
  LOG_INFO("begin register_self");
129
  if (!inited_) {
130
    ret = OB_NOT_INIT;
131
    LOG_WARN("not init", K(ret));
132
  } else {
133
    if (OB_FAIL(do_renew_lease())) {
134
      LOG_WARN("do_renew_lease failed", K(ret));
135
    }
136

137
    LOG_INFO("start_heartbeat anyway");
138
    // ignore ret overwrite
139
    if (OB_FAIL(start_heartbeat())) {
140
      LOG_ERROR("start_heartbeat failed", K(ret));
141
    }
142
  }
143
  return ret;
144
}
145

146
int ObLeaseStateMgr::register_self_busy_wait()
147
{
148
  int ret = OB_SUCCESS;
149

150
  ObCurTraceId::init(GCONF.self_addr_);
151
  LOG_INFO("begin register_self_busy_wait");
152
  if (!inited_) {
153
    ret = OB_NOT_INIT;
154
    LOG_WARN("not init", K(ret));
155
  } else {
156
    while (!stopped_) {
157
      if (OB_FAIL(try_report_sys_ls())) {
158
        LOG_WARN("fail to try report sys log stream");
159
      } else if (OB_FAIL(do_renew_lease())) {
160
        LOG_WARN("fail to do_renew_lease", KR(ret));
161
      }
162

163
      if (OB_FAIL(ret)) {
164
        LOG_WARN("register failed, will try again", KR(ret),
165
            "retry latency", REGISTER_TIME_SLEEP / 1000000);
166
        ob_usleep(static_cast<useconds_t>(REGISTER_TIME_SLEEP));
167
        int tmp_ret = OB_SUCCESS;
168
        if (OB_SUCCESS != (tmp_ret = rs_mgr_->renew_master_rootserver())) {
169
          LOG_WARN("renew_master_rootserver failed", K(tmp_ret));
170
          if (OB_SUCCESS != (tmp_ret = ob_service_->refresh_sys_tenant_ls())) {
171
            LOG_WARN("fail to refresh core partition", K(tmp_ret));
172
          }
173
        } else {
174
          LOG_INFO("renew_master_rootserver successfully, try register again");
175
        }
176
      } else {
177
        LOG_INFO("register self successfully!");
178
        if (OB_FAIL(start_heartbeat())) {
179
          LOG_ERROR("start_heartbeat failed", K(ret));
180
        }
181
        break;
182
      }
183
    }
184
  }
185
  if (stopped_) {
186
    ret = OB_CANCELED;
187
    LOG_WARN("fail to register_self_busy_wait", KR(ret));
188
  }
189
  LOG_INFO("end register_self_busy_wait");
190
  return ret;
191
}
192

193
int ObLeaseStateMgr::try_report_sys_ls()
194
{
195
  int ret = OB_SUCCESS;
196
  if (OB_UNLIKELY(!inited_)) {
197
    ret = OB_NOT_INIT;
198
    LOG_WARN("not init", KR(ret));
199
  } else if (OB_UNLIKELY(stopped_)) {
200
    ret = OB_SERVER_IS_STOPPING;
201
    LOG_WARN("lease manager is stopped", KR(ret));
202
  } else {
203
    const uint64_t tenant_id = OB_SYS_TENANT_ID;
204
    const ObLSID ls_id = SYS_LS;
205
    MTL_SWITCH(tenant_id) {
206
      bool ls_exist = false;
207
      ObLSService *ls_svr = NULL;
208
      if (OB_ISNULL(ls_svr = MTL(ObLSService*))) {
209
        ret = OB_ERR_UNEXPECTED;
210
        LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id));
211
      } else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) {
212
        LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id));
213
      } else if (!ls_exist) {
214
        // core log stream not exist
215
      } else {
216
        share::ObLSTableOperator *lst_operator = GCTX.lst_operator_;
217
        share::ObLSReplica ls_replica;
218
        if (OB_ISNULL(ob_service_) || OB_ISNULL(lst_operator)) {
219
          ret = OB_ERR_UNEXPECTED;
220
          LOG_WARN("ob_service or lst_operator ptr is null",
221
                   KR(ret), KP(ob_service_), KP(lst_operator));
222
        } else if (OB_FAIL(ob_service_->fill_ls_replica(
223
                   tenant_id, ls_id, ls_replica))) {
224
          LOG_WARN("fail to fill log stream replica", KR(ret),
225
                   K(tenant_id), K(ls_replica));
226
        } else if (OB_FAIL(lst_operator->update(ls_replica, false/*inner_table_only*/))) {
227
          LOG_WARN("fail to report sys log stream", KR(ret), K(ls_replica));
228
        } else if (OB_FAIL(ob_service_->submit_ls_update_task(tenant_id, ls_id))) {
229
          LOG_WARN("fail to add async update task", KR(ret), K(tenant_id), K(ls_id));
230
        } else {
231
          LOG_INFO("try report sys log stream succeed");
232
        }
233
      }
234
    } else {
235
      if (OB_TENANT_NOT_IN_SERVER == ret) {
236
        ret = OB_SUCCESS;
237
      } else {
238
        LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
239
      }
240
    }
241
  }
242
  return ret;
243
}
244

245
int ObLeaseStateMgr::renew_lease()
246
{
247
  int ret = OB_SUCCESS;
248
  if (OB_NOT_NULL(THE_TRACE)) {
249
    THE_TRACE->reset();
250
  }
251
  NG_TRACE(renew_lease_begin);
252
  const int64_t start = ObTimeUtility::fast_current_time();
253
  if (!inited_) {
254
    ret = OB_NOT_INIT;
255
    LOG_WARN("not init", K(ret));
256
  } else if (stopped_) {
257
    ret = OB_SERVER_IS_STOPPING;
258
    LOG_WARN("lease manager is stopped", K(ret));
259
  } else {
260
    if (OB_FAIL(do_renew_lease())) {
261
      LOG_WARN("do_renew_lease failed", K(ret));
262
      NG_TRACE(renew_master_rs_begin);
263
      if (OB_FAIL(rs_mgr_->renew_master_rootserver())) {
264
        LOG_WARN("renew_master_rootserver failed", K(ret));
265
        int tmp_ret = OB_SUCCESS;
266
        if (OB_SUCCESS != (tmp_ret = ob_service_->refresh_sys_tenant_ls())) {
267
          LOG_WARN("fail to refresh core partition", K(tmp_ret));
268
        }
269
      } else {
270
        NG_TRACE(renew_lease_end);
271
        LOG_INFO("renew_master_rootserver successfully, try renew lease again");
272
        if (OB_FAIL(try_report_sys_ls())) {
273
          LOG_WARN("fail to try report all core table partition");
274
        } else if (OB_FAIL(do_renew_lease())) {
275
          LOG_WARN("try do_renew_lease again failed, will do it no next heartbeat", K(ret));
276
        }
277
      }
278
   }
279

280
    if (OB_SUCC(ret)) {
281
      LOG_DEBUG("renew_lease successfully!");
282
    }
283
    NG_TRACE_EXT(renew_lease_end, OB_ID(ret), ret);
284
    const int64_t cost = ObTimeUtility::fast_current_time() - start;
285
    if (OB_UNLIKELY(cost > DELAY_TIME || OB_FAIL(ret))
286
        && OB_NOT_NULL(THE_TRACE)) {
287
      FORCE_PRINT_TRACE(THE_TRACE, "[slow heartbeat]");
288
    }
289
    const bool repeat = false;
290
    if (OB_FAIL(hb_timer_.schedule(hb_, DELAY_TIME, repeat))) {
291
      LOG_WARN("schedule failed", LITERAL_K(DELAY_TIME), K(repeat), K(ret));
292
    }
293
  }
294
  return ret;
295
}
296

297
int ObLeaseStateMgr::start_heartbeat()
298
{
299
  int ret = OB_SUCCESS;
300
  if (!inited_) {
301
    ret = OB_NOT_INIT;
302
    LOG_WARN("not init", K(ret));
303
  } else {
304
    const bool repeat = false;
305
    if (OB_FAIL(hb_timer_.schedule(hb_, DELAY_TIME, repeat))) {
306
      LOG_WARN("schedule failed", LITERAL_K(DELAY_TIME), K(repeat), K(ret));
307
    }
308
  }
309
  return ret;
310
}
311

312
#ifdef OB_BUILD_TDE_SECURITY
313
int ObLeaseStateMgr::update_master_key_info(
314
    const share::ObLeaseResponse &lease_response)
315
{
316
  int ret = OB_SUCCESS;
317
  if (OB_UNLIKELY(!inited_)) {
318
    ret = OB_NOT_INIT;
319
    LOG_WARN("not init", KR(ret));
320
  } else {
321
    const common::ObIArray<std::pair<uint64_t, ObLeaseResponse::TLRpKeyVersion> > &master_key_array
322
      = lease_response.tenant_max_key_version_;
323
    common::ObArray<std::pair<uint64_t, uint64_t> > max_key_version_array;
324
    common::ObArray<std::pair<uint64_t, uint64_t> > max_available_key_version_array;
325
    for (int64_t i = 0; OB_SUCC(ret) && i < master_key_array.count(); ++i) {
326
      const std::pair<uint64_t, ObLeaseResponse::TLRpKeyVersion> &key = master_key_array.at(i);
327
      std::pair<uint64_t, uint64_t> max_key_version
328
        = std::pair<uint64_t, uint64_t>(key.first, key.second.max_key_version_);
329
      std::pair<uint64_t, uint64_t> available_key_version
330
        = std::pair<uint64_t, uint64_t>(key.first, key.second.max_available_key_version_);
331
      if (OB_FAIL(max_key_version_array.push_back(max_key_version))) {
332
        LOG_WARN("fail to push back", KR(ret));
333
      } else {
334
        if (available_key_version.second > 0) {
335
          if (OB_FAIL(max_available_key_version_array.push_back(available_key_version))) {
336
            LOG_WARN("fail to push back", KR(ret));
337
          }
338
        }
339
      }
340
    }
341
    if (OB_SUCC(ret)) {
342
      if (OB_FAIL(ObMasterKeyGetter::instance().got_versions(
343
              max_key_version_array))) {
344
        LOG_WARN("fail to update got versions", KR(ret));
345
      } else if (OB_FAIL(ObMasterKeyGetter::instance().update_active_versions(
346
              max_available_key_version_array))) {
347
        LOG_WARN("fail to update active versions", KR(ret));
348
      }
349
    }
350
  }
351
  return ret;
352
}
353
#endif
354

355
int ObLeaseStateMgr::do_renew_lease()
356
{
357
  int ret = OB_SUCCESS;
358
  ObLeaseRequest lease_request;
359
  ObLeaseResponse lease_response;
360
  ObAddr rs_addr;
361
  NG_TRACE(do_renew_lease_begin);
362
  if (!inited_) {
363
    ret = OB_NOT_INIT;
364
    LOG_WARN("not init", K(ret));
365
  } else if (OB_FAIL(heartbeat_process_->init_lease_request(lease_request))) {
366
    LOG_WARN("init lease request failed", K(ret));
367
  } else if (OB_FAIL(rs_mgr_->get_master_root_server(rs_addr))) {
368
    LOG_WARN("get master root service failed", K(ret));
369
  } else {
370
    NG_TRACE(send_heartbeat_begin);
371
    ret = rpc_proxy_->to(rs_addr).timeout(renew_timeout_)
372
        .renew_lease(lease_request, lease_response);
373
    if (lease_response.lease_expire_time_ > 0) {
374
      // for compatible with old version
375
      lease_response.heartbeat_expire_time_ = lease_response.lease_expire_time_;
376
    }
377
    NG_TRACE_EXT(send_heartbeat_end, OB_ID(ret), ret);
378
    if (OB_SUCC(ret)) {
379
      int tmp_ret = OB_SUCCESS;
380
      if (OB_UNLIKELY(!lease_response.is_valid())) {
381
        tmp_ret = OB_INVALID_ARGUMENT;
382
        LOG_WARN("invalid argument", KR(tmp_ret), K(lease_response));
383
      }
384

385
      if (baseline_schema_version_ < lease_response.baseline_schema_version_) {
386
        if (OB_SUCCESS != (tmp_ret = GCTX.schema_service_->update_baseline_schema_version(
387
            OB_SYS_TENANT_ID, lease_response.baseline_schema_version_))) {
388
          LOG_WARN("fail to update baseline schema version", KR(ret), KR(tmp_ret), K(lease_response));
389
        } else {
390
          LOG_INFO("update baseline schema version", KR(ret), "old_version", baseline_schema_version_,
391
                   "new_version", lease_response.baseline_schema_version_);
392
          baseline_schema_version_ = lease_response.baseline_schema_version_;
393
        }
394
      }
395
#ifdef OB_BUILD_TDE_SECURITY
396
      if (OB_SUCC(ret)) {
397
        if (OB_SUCCESS != (tmp_ret = update_master_key_info(lease_response))) {
398
          LOG_WARN("fail to update master key info", KR(ret), K(tmp_ret), K(lease_response));
399
        }
400
      }
401
      NG_TRACE_EXT(update_master_key_info, OB_ID(ret), tmp_ret);
402
#endif
403
      const int64_t now = ObTimeUtility::current_time();
404
      if (OB_SUCC(ret) && lease_response.heartbeat_expire_time_ > now) {
405
        LOG_DEBUG("renew_lease from  master_rs successfully", K(rs_addr));
406
        if (OB_FAIL(set_lease_response(lease_response))) {
407
          LOG_WARN("fail to set lease response", K(ret));
408
        } else if (OB_FAIL(heartbeat_process_->do_heartbeat_event(lease_response_))) {
409
          LOG_WARN("fail to process new lease info", K_(lease_response), K(ret));
410
        }
411
        NG_TRACE_EXT(do_heartbeat_event, OB_ID(ret), ret);
412
      }
413
    } else {
414
      LOG_WARN("can't get lease from rs", K(rs_addr), K(ret));
415
    }
416
  }
417
  NG_TRACE_EXT(do_renew_lease_end, OB_ID(ret), ret);
418
  return ret;
419
}
420

421
ObLeaseStateMgr::HeartBeat::HeartBeat()
422
  : inited_(false), lease_state_mgr_(NULL)
423
{
424
}
425

426
ObLeaseStateMgr::HeartBeat::~HeartBeat()
427
{
428
}
429

430
int ObLeaseStateMgr::HeartBeat::init(ObLeaseStateMgr *lease_state_mgr)
431
{
432
  int ret = OB_SUCCESS;
433
  if (inited_) {
434
    ret = OB_INIT_TWICE;
435
    LOG_WARN("init twice", K(ret));
436
  } else if (NULL == lease_state_mgr) {
437
    ret = OB_INVALID_ARGUMENT;
438
    LOG_WARN("invalid argument", KP(lease_state_mgr), K(ret));
439
  } else {
440
    lease_state_mgr_ = lease_state_mgr;
441
    inited_ = true;
442
  }
443
  return ret;
444
}
445

446
void ObLeaseStateMgr::HeartBeat::runTimerTask()
447
{
448
  int ret = OB_SUCCESS;
449
  if (!inited_) {
450
    ret = OB_NOT_INIT;
451
    LOG_WARN("not init", K(ret));
452
  } else if (OB_FAIL(lease_state_mgr_->renew_lease())) {
453
    LOG_WARN("fail to renew lease", K(ret));
454
  }
455
}
456

457
}//end namespace observer
458
}//end namespace oceanbase
459

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.