oceanbase

Форк
0
/
ob_tenant_info_loader.cpp 
696 строк · 24.0 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "lib/trace/ob_trace_event.h"
16
#include "share/rc/ob_tenant_base.h"    // MTL_ID
17
#include "share/scn.h"//SCN
18
#include "share/ob_all_server_tracer.h"       // ObAllServerTracer
19
#include "observer/ob_server_struct.h"          // GCTX
20
#include "rootserver/ob_tenant_info_loader.h"
21
#include "rootserver/ob_rs_async_rpc_proxy.h"
22
#include "logservice/ob_log_service.h"          // ObLogService
23
#include "storage/tx/ob_ts_mgr.h" // OB_TS_MGR
24

25
namespace oceanbase
26
{
27
using namespace share;
28
using namespace common;
29
namespace rootserver
30
{
31
int ObTenantInfoLoader::mtl_init(ObTenantInfoLoader *&ka)
32
{
33
  int ret = OB_SUCCESS;
34

35
  if (OB_ISNULL(ka)) {
36
    ret = OB_ERR_UNEXPECTED;
37
    LOG_WARN("ka is null", KR(ret));
38
  } else if (OB_FAIL(ka->init())) {
39
    LOG_WARN("failed to init", KR(ret), KP(ka));
40
  }
41

42
  return ret;
43
}
44

45
int ObTenantInfoLoader::init()
46
{
47
  int ret = OB_SUCCESS;
48
  lib::ThreadPool::set_run_wrapper(MTL_CTX());
49
  const int64_t thread_cnt = 1;
50

51
  if (IS_INIT) {
52
    ret = OB_INIT_TWICE;
53
    LOG_WARN("init twice", KR(ret));
54
  } else {
55
    sql_proxy_ = GCTX.sql_proxy_;
56
    tenant_id_ = MTL_ID();
57
    tenant_info_cache_.reset();
58
    ATOMIC_STORE(&broadcast_times_, 0);
59
    ATOMIC_STORE(&rpc_update_times_, 0);
60
    ATOMIC_STORE(&sql_update_times_, 0);
61
    ATOMIC_STORE(&last_rpc_update_time_us_, OB_INVALID_TIMESTAMP);
62

63
    if (!is_user_tenant(tenant_id_)) {
64
    } else if (OB_ISNULL(GCTX.sql_proxy_)) {
65
      ret = OB_ERR_UNEXPECTED;
66
      LOG_WARN("sql proxy is null", KR(ret));
67
    } else if (OB_FAIL(create(thread_cnt, "TenantInf"))) {
68
      LOG_WARN("failed to create tenant info loader thread", KR(ret), K(thread_cnt));
69
    }
70
  }
71
  if (OB_SUCC(ret)) {
72
    is_inited_ = true;
73
  }
74
  return ret;
75
}
76

77
void ObTenantInfoLoader::destroy()
78
{
79
  LOG_INFO("tenant info loader destory", KPC(this));
80
  stop();
81
  wait();
82
  is_inited_ = false;
83
  tenant_id_ = OB_INVALID_TENANT_ID;
84
  tenant_info_cache_.reset();
85
  sql_proxy_ = NULL;
86
  ATOMIC_STORE(&broadcast_times_, 0);
87
  ATOMIC_STORE(&rpc_update_times_, 0);
88
  ATOMIC_STORE(&sql_update_times_, 0);
89
  ATOMIC_STORE(&last_rpc_update_time_us_, OB_INVALID_TIMESTAMP);
90
}
91

92
int ObTenantInfoLoader::start()
93
{
94
  int ret = OB_SUCCESS;
95
  if (IS_NOT_INIT) {
96
    ret = OB_NOT_INIT;
97
    LOG_WARN("not init", KR(ret));
98
  } else if (!is_user_tenant(tenant_id_)) {
99
    //meta and sys tenant is primary
100
    MTL_SET_TENANT_ROLE_CACHE(ObTenantRole::PRIMARY_TENANT);
101
    LOG_INFO("not user tenant no need load", K(tenant_id_));
102
  } else if (OB_FAIL(logical_start())) {
103
    LOG_WARN("failed to start", KR(ret));
104
  } else {
105
    LOG_INFO("tenant info loader start", KPC(this));
106
  }
107
  return ret;
108
}
109

110
void ObTenantInfoLoader::stop()
111
{
112
  logical_stop();
113
}
114
void ObTenantInfoLoader::wait()
115
{
116
  logical_wait();
117
}
118

119
void ObTenantInfoLoader::wakeup()
120
{
121
  int ret = OB_SUCCESS;
122
  if (IS_NOT_INIT) {
123
    ret = OB_NOT_INIT;
124
    LOG_TRACE("not init", KR(ret));
125
  } else {
126
    ObThreadCondGuard guard(get_cond());
127
    get_cond().broadcast();
128
  }
129
}
130

131
void ObTenantInfoLoader::run2()
132
{
133
  int ret = OB_SUCCESS;
134
  LOG_INFO("tenant info loader run", KPC(this));
135
  if (IS_NOT_INIT) {
136
    ret = OB_NOT_INIT;
137
    LOG_WARN("not init", KR(ret));
138
  } else {
139
    ObThreadCondGuard guard(get_cond());
140
    int64_t last_dump_time_us = ObTimeUtility::current_time();
141

142
    while (!stop_) {
143
      const int64_t start_time_us = ObTimeUtility::current_time();
144
      share::ObAllTenantInfo tenant_info;
145
      bool content_changed = false;
146
      bool is_sys_ls_leader = is_sys_ls_leader_();
147
      const int64_t refresh_time_interval_us = act_as_standby_() && is_sys_ls_leader ?
148
                ObTenantRoleTransitionConstants::STS_TENANT_INFO_REFRESH_TIME_US :
149
                ObTenantRoleTransitionConstants::DEFAULT_TENANT_INFO_REFRESH_TIME_US;
150

151
      if (need_refresh(refresh_time_interval_us)
152
          && OB_FAIL(tenant_info_cache_.refresh_tenant_info(tenant_id_, sql_proxy_, content_changed))) {
153
        LOG_WARN("failed to update tenant info", KR(ret), K_(tenant_id), KP(sql_proxy_));
154
      }
155

156
      const int64_t now_us = ObTimeUtility::current_time();
157
      const int64_t sql_update_cost_time = now_us - start_time_us;
158
      if (OB_FAIL(ret)) {
159
      } else if (content_changed && is_sys_ls_leader) {
160
        (void)ATOMIC_AAF(&sql_update_times_, 1);
161
        (void)broadcast_tenant_info_content_();
162
      }
163
      const int64_t broadcast_cost_time = ObTimeUtility::current_time() - now_us;
164

165
      const int64_t end_time_us = ObTimeUtility::current_time();
166
      const int64_t cost_time_us = end_time_us - start_time_us;
167
      if (content_changed) {
168
        (void)dump_tenant_info_(sql_update_cost_time, is_sys_ls_leader, broadcast_cost_time, end_time_us, last_dump_time_us);
169
      }
170
      const int64_t idle_time = max(10 * 1000, refresh_time_interval_us - cost_time_us);
171
      //At least sleep 10ms, allowing the thread to release the lock
172
      if (!stop_) {
173
        get_cond().wait_us(idle_time);
174
      }
175
    }//end while
176
  }
177
}
178

179
void ObTenantInfoLoader::dump_tenant_info_(
180
      const int64_t sql_update_cost_time,
181
      const bool is_sys_ls_leader,
182
      const int64_t broadcast_cost_time,
183
      const int64_t end_time_us,
184
      int64_t &last_dump_time_us)
185
{
186
  int ret = OB_SUCCESS;
187
  if (IS_NOT_INIT) {
188
    ret = OB_NOT_INIT;
189
    LOG_WARN("not init", KR(ret));
190
  } else if (0 >= last_dump_time_us || end_time_us < last_dump_time_us) {
191
    ret = OB_INVALID_ARGUMENT;
192
    LOG_WARN("invalid argument", KR(ret), K(last_dump_time_us), K(end_time_us));
193
  } else {
194
    const int64_t dump_interval_s = (end_time_us - last_dump_time_us) / 1000000; // 1s unit;
195
    if (dump_tenant_info_cache_update_action_interval_.reach() && 1 <= dump_interval_s) {
196
      const uint64_t broadcast_times = ATOMIC_LOAD(&broadcast_times_);
197
      const uint64_t rpc_update_times = ATOMIC_LOAD(&rpc_update_times_);
198
      const uint64_t sql_update_times = ATOMIC_LOAD(&sql_update_times_);
199
      const int64_t broadcast_per_sec = broadcast_times / dump_interval_s; // per second
200
      const int64_t rpc_update_per_sec = rpc_update_times / dump_interval_s; // per second
201
      const int64_t sql_update_per_sec = sql_update_times / dump_interval_s; // per second
202
      if (OB_NOT_NULL(THE_TRACE)) {
203
        THE_TRACE->reset();
204
      }
205
      NG_TRACE_EXT(ob_tenant_info_loader, OB_ID(tenant_id), tenant_id_,
206
                   OB_ID(is_sys_ls_leader), is_sys_ls_leader,
207
                   OB_ID(broadcast_cost_time), broadcast_cost_time,
208
                   OB_ID(broadcast_times), broadcast_times_,
209
                   OB_ID(broadcast_per_sec), broadcast_per_sec,
210
                   OB_ID(rpc_update_times), rpc_update_times,
211
                   OB_ID(rpc_update_per_sec), rpc_update_per_sec,
212
                   OB_ID(last_rpc_update_time_us), last_rpc_update_time_us_,
213
                   OB_ID(sql_update_cost_time), sql_update_cost_time,
214
                   OB_ID(sql_update_times), sql_update_times,
215
                   OB_ID(tenant_info_cache), tenant_info_cache_, OB_ID(is_inited), is_inited_);
216
      FORCE_PRINT_TRACE(THE_TRACE, "[dump tenant_info_loader]");
217
      ATOMIC_STORE(&broadcast_times_, 0);
218
      ATOMIC_STORE(&rpc_update_times_, 0);
219
      ATOMIC_STORE(&sql_update_times_, 0);
220
      last_dump_time_us = ObTimeUtility::current_time();
221
    }
222
  }
223
}
224

225
bool ObTenantInfoLoader::need_refresh(const int64_t refresh_time_interval_us)
226
{
227
  int ret = OB_SUCCESS;
228
  ObAllTenantInfo tenant_info;
229
  bool need_refresh = true;
230
  const int64_t now = ObTimeUtility::current_time();
231
  int64_t last_sql_update_time = OB_INVALID_TIMESTAMP;
232
  int64_t ora_rowscn = 0;
233

234
  if (ObTenantRoleTransitionConstants::DEFAULT_TENANT_INFO_REFRESH_TIME_US < refresh_time_interval_us) {
235
    LOG_WARN("unexpected refresh_time_interval_us");
236
    need_refresh = true;
237
  } else if (OB_FAIL(tenant_info_cache_.get_tenant_info(tenant_info, last_sql_update_time, ora_rowscn))) {
238
    LOG_WARN("failed to get tenant info", KR(ret));
239
  } else if (now - last_sql_update_time < refresh_time_interval_us) {
240
    need_refresh = false;
241
  }
242

243
  return need_refresh;
244
}
245

246
bool ObTenantInfoLoader::is_sys_ls_leader_()
247
{
248
  int ret = OB_SUCCESS;
249
  bool is_sys_ls_leader = false;
250
  logservice::ObLogService *log_svr = MTL(logservice::ObLogService*);
251
  int64_t proposal_id = 0;
252
  common::ObRole role;
253

254
  if (IS_NOT_INIT) {
255
    ret = OB_NOT_INIT;
256
    LOG_WARN("not init", KR(ret));
257
  } else if (OB_ISNULL(log_svr)) {
258
    ret = OB_ERR_UNEXPECTED;
259
    LOG_WARN("mtl pointer is null", KR(ret), KP(log_svr));
260
  } else if (OB_FAIL(log_svr->get_palf_role(share::SYS_LS, role, proposal_id))) {
261
    LOG_WARN("failed to get palf role", KR(ret));
262
  } else if (is_strong_leader(role)) {
263
    is_sys_ls_leader = true;
264
  }
265

266
  return is_sys_ls_leader;
267
}
268

269
int ObTenantInfoLoader::get_max_ls_id(uint64_t &tenant_id, ObLSID &max_ls_id)
270
{
271
  int ret = OB_SUCCESS;
272
  ObAllTenantInfo tenant_info;
273
  const uint64_t mtl_tenant_id = MTL_ID();
274
  tenant_id = OB_INVALID_TENANT_ID;
275
  max_ls_id.reset();
276
  if (OB_SYS_TENANT_ID == mtl_tenant_id || is_meta_tenant(mtl_tenant_id)) {
277
    tenant_id = mtl_tenant_id;
278
    max_ls_id = ObLSID::SYS_LS_ID;
279
  } else if (OB_FAIL(get_tenant_info(tenant_info))) {
280
    LOG_WARN("get_tenant_info failed", KR(ret));
281
  } else {
282
    tenant_id = tenant_info.get_tenant_id();
283
    max_ls_id = tenant_info.get_max_ls_id();
284
  }
285
  return ret;
286
}
287

288
bool ObTenantInfoLoader::act_as_standby_()
289
{
290
  int ret = OB_SUCCESS;
291
  bool act_as_standby = true;
292
  ObAllTenantInfo tenant_info;
293

294
  if (IS_NOT_INIT) {
295
    ret = OB_NOT_INIT;
296
    LOG_WARN("not init", KR(ret));
297
  } else if (OB_FAIL(get_tenant_info(tenant_info))) {
298
    LOG_WARN("failed to get tenant_info", KR(ret));
299
  } else if (tenant_info.is_primary() && tenant_info.is_normal_status()) {
300
    act_as_standby = false;
301
  }
302

303
  return act_as_standby;
304
}
305

306
void ObTenantInfoLoader::broadcast_tenant_info_content_()
307
{
308
  int ret = OB_SUCCESS;
309
  const int64_t DEFAULT_TIMEOUT_US = 200 * 1000;  // 200ms
310
  const int64_t begin_time = ObTimeUtility::current_time();
311
  ObArray<int> return_code_array;
312
  share::ObAllTenantInfo tenant_info;
313
  int64_t last_sql_update_time = OB_INVALID_TIMESTAMP;
314
  int64_t ora_rowscn = 0;
315

316
  if (IS_NOT_INIT) {
317
    ret = OB_NOT_INIT;
318
    LOG_WARN("not init", KR(ret));
319
  } else if (OB_ISNULL(GCTX.srv_rpc_proxy_)) {
320
    ret = OB_ERR_UNEXPECTED;
321
    LOG_WARN("pointer is null", KR(ret), KP(GCTX.srv_rpc_proxy_));
322
  } else {
323
    ObUpdateTenantInfoCacheProxy proxy(
324
      *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::update_tenant_info_cache);
325
    int64_t rpc_count = 0;
326

327
    if (OB_FAIL(tenant_info_cache_.get_tenant_info(tenant_info, last_sql_update_time, ora_rowscn))) {
328
      LOG_WARN("failed to get tenant info", KR(ret));
329
    } else if (OB_FAIL(share::ObAllServerTracer::get_instance().for_each_server_info(
330
                  [&rpc_count, &tenant_info, &proxy, ora_rowscn](const share::ObServerInfoInTable &server_info) -> int {
331
                    int ret = OB_SUCCESS;
332
                    obrpc::ObUpdateTenantInfoCacheArg arg;
333
                    if (!server_info.is_valid()) {
334
                      LOG_WARN("skip invalid server_info", KR(ret), K(server_info));
335
                    } else if (!server_info.is_alive()) {
336
                      //not send to alive
337
                    } else if (OB_FAIL(arg.init(tenant_info.get_tenant_id(), tenant_info, ora_rowscn))) {
338
                      LOG_WARN("failed to init arg", KR(ret), K(tenant_info), K(ora_rowscn));
339
                    // use meta rpc process thread
340
                    } else if (OB_FAIL(proxy.call(server_info.get_server(), DEFAULT_TIMEOUT_US, gen_meta_tenant_id(tenant_info.get_tenant_id()), arg))) {
341
                      LOG_WARN("failed to send rpc", KR(ret), K(server_info), K(tenant_info), K(arg));
342
                    } else {
343
                      rpc_count++;
344
                    }
345

346
                    return ret;
347
                  }))) {
348
      LOG_WARN("for each server_info failed", KR(ret));
349
    }
350

351
    int tmp_ret = OB_SUCCESS;
352
    if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
353
      LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret));
354
      ret = OB_SUCCESS == ret ? tmp_ret : ret;
355
    } else if (OB_FAIL(ret)) {
356
    } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) {
357
      LOG_WARN("fail to check return cnt", KR(ret), "return_cnt", return_code_array.count());
358
    } else {
359
      (void)ATOMIC_AAF(&broadcast_times_, 1);
360
      ObUpdateTenantInfoCacheRes res;
361
      for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) {
362
        ret = return_code_array.at(i);
363
        const ObAddr &dest = proxy.get_dests().at(i);
364
        if (OB_FAIL(ret)) {
365
          LOG_WARN("send rpc is failed", KR(ret), K(i), K(dest));
366
        } else {
367
          LOG_INFO("refresh tenant info content success", KR(ret), K(i), K(dest));
368
        }
369
      }
370
    }
371
  }
372
  const int64_t cost_time_us = ObTimeUtility::current_time() - begin_time;
373
  const int64_t PRINT_INTERVAL = 3 * 1000 * 1000L;
374
  if (REACH_TIME_INTERVAL(PRINT_INTERVAL)) {
375
    LOG_INFO("broadcast_tenant_info_content_ finished", KR(ret), K_(tenant_id), K(cost_time_us),
376
           K(return_code_array), K_(broadcast_times));
377
  }
378
  return ;
379
}
380

381
int ObTenantInfoLoader::get_valid_sts_after(const int64_t specified_time_us, share::SCN &standby_scn)
382
{
383
  int ret = OB_SUCCESS;
384
  standby_scn.set_min();
385
  share::ObAllTenantInfo tenant_info;
386
  int64_t last_sql_update_time = OB_INVALID_TIMESTAMP;
387
  int64_t ora_rowscn = 0;
388

389
  if (IS_NOT_INIT) {
390
    ret = OB_NOT_INIT;
391
    LOG_WARN("not init", KR(ret));
392
  } else if (OB_INVALID_TIMESTAMP == specified_time_us) {
393
    ret = OB_INVALID_ARGUMENT;
394
    LOG_WARN("invalid argument", KR(ret), K(specified_time_us));
395
  } else if (OB_FAIL(tenant_info_cache_.get_tenant_info(tenant_info, last_sql_update_time, ora_rowscn))) {
396
    if (OB_NEED_WAIT == ret) {
397
      LOG_TRACE("tenant info cache is not refreshed, need wait", KR(ret));
398
    } else {
399
      LOG_WARN("failed to get tenant info", KR(ret));
400
    }
401
  } else if (last_sql_update_time <= specified_time_us) {
402
    ret = OB_NEED_WAIT;
403
    LOG_TRACE("tenant info cache is old, need wait", KR(ret), K(last_sql_update_time), K(specified_time_us), K(tenant_info));
404
    wakeup();
405
  } else if (!tenant_info.is_sts_ready()) {
406
    ret = OB_NEED_WAIT;
407
    LOG_TRACE("sts can not work for current tenant status", KR(ret), K(tenant_info));
408
  } else {
409
    standby_scn = tenant_info.get_standby_scn();
410
  }
411

412
  const int64_t PRINT_INTERVAL = 3 * 1000 * 1000L;
413
  if (REACH_TIME_INTERVAL(PRINT_INTERVAL)) {
414
    LOG_INFO("get_valid_sts_after", KR(ret), K(specified_time_us), K(last_sql_update_time), K(tenant_info));
415
  }
416

417
  return ret;
418
}
419

420
int ObTenantInfoLoader::get_readable_scn(share::SCN &readable_scn)
421
{
422
  int ret = OB_SUCCESS;
423
  readable_scn.set_min();
424

425
  if (OB_FAIL(OB_TS_MGR.get_gts(MTL_ID(), nullptr, readable_scn))) {
426
    LOG_WARN("failed to get gts as readable_scn", KR(ret));
427
  }
428

429
  return ret;
430
}
431

432
int ObTenantInfoLoader::check_is_standby_normal_status(bool &is_standby_normal_status)
433
{
434
  int ret = OB_SUCCESS;
435
  is_standby_normal_status = false;
436

437
  if (OB_SYS_TENANT_ID == MTL_ID() || is_meta_tenant(MTL_ID())) {
438
    is_standby_normal_status = false;
439
  } else {
440
    // user tenant
441
    share::ObAllTenantInfo tenant_info;
442
    if (OB_FAIL(tenant_info_cache_.get_tenant_info(tenant_info))) {
443
      LOG_WARN("failed to get tenant info", KR(ret));
444
    } else {
445
      is_standby_normal_status = tenant_info.is_standby() && tenant_info.is_normal_status();
446
    }
447
  }
448
  return ret;
449
}
450

451
int ObTenantInfoLoader::check_is_primary_normal_status(bool &is_primary_normal_status)
452
{
453
  int ret = OB_SUCCESS;
454
  is_primary_normal_status = false;
455

456
  if (OB_SYS_TENANT_ID == MTL_ID() || is_meta_tenant(MTL_ID())) {
457
    is_primary_normal_status = true;
458
  } else {
459
    // user tenant
460
    share::ObAllTenantInfo tenant_info;
461
    if (OB_FAIL(tenant_info_cache_.get_tenant_info(tenant_info))) {
462
      LOG_WARN("failed to get tenant info", KR(ret));
463
    } else {
464
      is_primary_normal_status = tenant_info.is_primary() && tenant_info.is_normal_status();
465
    }
466
  }
467
  return ret;
468
}
469

470
int ObTenantInfoLoader::get_replayable_scn(share::SCN &replayable_scn)
471
{
472
  int ret = OB_SUCCESS;
473
  replayable_scn.set_min();
474

475
  if (OB_SYS_TENANT_ID == MTL_ID() || is_meta_tenant(MTL_ID())) {
476
    // there isn't replayable_scn for SYS/META tenant
477
    ret = OB_ERR_UNEXPECTED;
478
    LOG_ERROR("there isn't replayable_scn for SYS/META tenant", KR(ret));
479
  } else {
480
    // user tenant
481
    share::ObAllTenantInfo tenant_info;
482
    if (OB_FAIL(tenant_info_cache_.get_tenant_info(tenant_info))) {
483
      LOG_WARN("failed to get tenant info", KR(ret));
484
    } else {
485
      replayable_scn = tenant_info.get_replayable_scn();
486
    }
487
  }
488
  return ret;
489
}
490

491
int ObTenantInfoLoader::get_sync_scn(share::SCN &sync_scn)
492
{
493
  int ret = OB_SUCCESS;
494
  share::ObAllTenantInfo tenant_info;
495
  sync_scn.set_invalid();
496
  if (OB_FAIL(get_tenant_info(tenant_info))) {
497
    LOG_WARN("failed to get tenant info", KR(ret));
498
  } else {
499
    sync_scn = tenant_info.get_sync_scn();
500
  }
501
  return ret;
502
}
503

504
int ObTenantInfoLoader::get_recovery_until_scn(share::SCN &recovery_until_scn)
505
{
506
  int ret = OB_SUCCESS;
507
  share::ObAllTenantInfo tenant_info;
508
  recovery_until_scn.set_invalid();
509
  if (OB_FAIL(get_tenant_info(tenant_info))) {
510
    LOG_WARN("failed to get tenant info", KR(ret));
511
  } else {
512
    recovery_until_scn = tenant_info.get_recovery_until_scn();
513
  }
514
  return ret;
515
}
516

517
int ObTenantInfoLoader::get_tenant_info(share::ObAllTenantInfo &tenant_info)
518
{
519
  int ret = OB_SUCCESS;
520
  tenant_info.reset();
521

522
  if (OB_SYS_TENANT_ID == MTL_ID() || is_meta_tenant(MTL_ID())) {
523
    // there isn't tenant info for SYS/META tenant
524
    ret = OB_ERR_UNEXPECTED;
525
    LOG_ERROR("there isn't tenant info for SYS/META tenant", KR(ret));
526
  } else {
527
    // user tenant
528
    if (OB_FAIL(tenant_info_cache_.get_tenant_info(tenant_info))) {
529
      LOG_WARN("failed to get tenant info", KR(ret));
530
    }
531
  }
532

533
  return ret;
534
}
535

536
int ObTenantInfoLoader::refresh_tenant_info()
537
{
538
  int ret = OB_SUCCESS;
539
  bool content_changed = false;
540
  if (IS_NOT_INIT) {
541
    ret = OB_NOT_INIT;
542
    LOG_WARN("not init", KR(ret));
543
  } else if (OB_FAIL(tenant_info_cache_.refresh_tenant_info(tenant_id_, sql_proxy_, content_changed))) {
544
    LOG_WARN("failed to refresh_tenant_info", KR(ret), K_(tenant_id), KP(sql_proxy_));
545
  }
546
  return ret;
547
}
548

549
int ObTenantInfoLoader::update_tenant_info_cache(const int64_t new_ora_rowscn, const ObAllTenantInfo &new_tenant_info)
550
{
551
  int ret = OB_SUCCESS;
552
  bool refreshed = false;
553
  if (IS_NOT_INIT) {
554
    ret = OB_NOT_INIT;
555
    LOG_WARN("not init", KR(ret));
556
  } else if (OB_FAIL(tenant_info_cache_.update_tenant_info_cache(new_ora_rowscn, new_tenant_info, refreshed))) {
557
    LOG_WARN("failed to update_tenant_info_cache", KR(ret), K(new_ora_rowscn), K(new_tenant_info));
558
  } else if (refreshed) {
559
    (void)ATOMIC_AAF(&rpc_update_times_, 1);
560
    (void)ATOMIC_STORE(&last_rpc_update_time_us_, ObTimeUtility::current_time());
561
  }
562
  return ret;
563
}
564

565
DEFINE_TO_YSON_KV(ObAllTenantInfoCache,
566
                  OB_ID(tenant_info), tenant_info_,
567
                  OB_ID(last_sql_update_time), last_sql_update_time_,
568
                  OB_ID(ora_rowscn), ora_rowscn_);
569

570
void ObAllTenantInfoCache::reset()
571
{
572
  SpinWLockGuard guard(lock_);
573
  tenant_info_.reset();
574
  last_sql_update_time_ = OB_INVALID_TIMESTAMP;
575
  ora_rowscn_ = 0;
576
}
577

578
int ObAllTenantInfoCache::refresh_tenant_info(const uint64_t tenant_id,
579
                                              common::ObMySQLProxy *sql_proxy,
580
                                              bool &content_changed)
581
{
582
  int ret = OB_SUCCESS;
583
  ObAllTenantInfo new_tenant_info;
584
  int64_t ora_rowscn = 0;
585
  const int64_t new_refresh_time_us = ObClockGenerator::getClock();
586
  content_changed = false;
587
  if (OB_ISNULL(sql_proxy) || !is_user_tenant(tenant_id)) {
588
    ret = OB_INVALID_ARGUMENT;
589
    LOG_WARN("invalid argument", KR(ret), K(tenant_id), KP(sql_proxy));
590
  } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id,
591
             sql_proxy, false /* for_update */, ora_rowscn, new_tenant_info))) {
592
    LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id));
593
  } else if (INT64_MAX == ora_rowscn || 0 == ora_rowscn) {
594
    ret = OB_ERR_UNEXPECTED;
595
    LOG_ERROR("invalid ora_rowscn", KR(ret), K(ora_rowscn), K(tenant_id), K(new_tenant_info), K(lbt()));
596
  } else {
597
    /**
598
    * Only need to refer to tenant role, no need to refer to switchover status.
599
    * tenant_role is primary in <primary, normal switchoverstatus> or <primary, prep switching_to_standby switchover_status>.
600
    * When switch to standby starts, it will change to <standby, switch to standby>.
601
    * During the switch to standby process, some LS may be in RO state. GTS & STS may not work.
602
    * This also ensures the consistency of tenant_role cache and the tenant role field in all_tenant_info
603
    */
604
    SpinWLockGuard guard(lock_);
605
    if (ora_rowscn >= ora_rowscn_) {
606
      if (ora_rowscn > ora_rowscn_) {
607
        MTL_SET_TENANT_ROLE_CACHE(new_tenant_info.get_tenant_role().value());
608
        (void)tenant_info_.assign(new_tenant_info);
609
        ora_rowscn_ = ora_rowscn;
610
        content_changed = true;
611
      }
612
      // In order to provide sts an accurate time of tenant info refresh time, it is necessary to
613
      // update last_sql_update_time_ after sql refresh
614
      last_sql_update_time_ = new_refresh_time_us;
615
    } else {
616
      ret = OB_EAGAIN;
617
      LOG_WARN("refresh tenant info conflict", KR(ret), K(new_tenant_info), K(new_refresh_time_us),
618
                                      K(tenant_id), K(tenant_info_), K(last_sql_update_time_), K(ora_rowscn_), K(ora_rowscn));
619
    }
620
  }
621

622
  if (dump_tenant_info_interval_.reach()) {
623
    LOG_INFO("refresh tenant info", KR(ret), K(new_tenant_info), K(new_refresh_time_us),
624
                                    K(tenant_id), K(tenant_info_), K(last_sql_update_time_), K(ora_rowscn_));
625
  }
626

627
  return ret;
628
}
629

630
int ObAllTenantInfoCache::update_tenant_info_cache(
631
    const int64_t new_ora_rowscn,
632
    const ObAllTenantInfo &new_tenant_info,
633
    bool &refreshed)
634
{
635
  int ret = OB_SUCCESS;
636
  refreshed = false;
637
  if (!new_tenant_info.is_valid() || 0 == new_ora_rowscn || INT64_MAX == new_ora_rowscn) {
638
    ret = OB_INVALID_ARGUMENT;
639
    LOG_WARN("invalid argument", KR(ret), K(new_tenant_info), K(new_ora_rowscn));
640
  } else {
641
    SpinWLockGuard guard(lock_);
642
    if (!tenant_info_.is_valid() || 0 == ora_rowscn_) {
643
      ret = OB_EAGAIN;
644
      LOG_WARN("my tenant_info is invalid, don't refresh", KR(ret), K_(tenant_info), K_(ora_rowscn));
645
    } else if (new_ora_rowscn > ora_rowscn_) {
646
      MTL_SET_TENANT_ROLE_CACHE(new_tenant_info.get_tenant_role().value());
647
      (void)tenant_info_.assign(new_tenant_info);
648
      ora_rowscn_ = new_ora_rowscn;
649
      refreshed = true;
650
      LOG_TRACE("refresh_tenant_info_content", K(new_tenant_info), K(new_ora_rowscn), K(tenant_info_), K(ora_rowscn_));
651
    }
652
  }
653

654
  return ret;
655
}
656

657
int ObAllTenantInfoCache::get_tenant_info(
658
    share::ObAllTenantInfo &tenant_info,
659
    int64_t &last_sql_update_time,
660
    int64_t &ora_rowscn)
661
{
662
  int ret = OB_SUCCESS;
663
  tenant_info.reset();
664
  last_sql_update_time = OB_INVALID_TIMESTAMP;
665
  ora_rowscn = 0;
666
  SpinRLockGuard guard(lock_);
667

668
  if (!tenant_info_.is_valid() || OB_INVALID_TIMESTAMP == last_sql_update_time_ || 0 == ora_rowscn_) {
669
    ret = OB_NEED_WAIT;
670
    const int64_t PRINT_INTERVAL = 1 * 1000 * 1000L;
671
    if (REACH_TIME_INTERVAL(PRINT_INTERVAL)) {
672
      LOG_WARN("tenant info is invalid, need wait", KR(ret), K(last_sql_update_time_), K(tenant_info_), K(ora_rowscn_));
673
    }
674
  } else {
675
    (void)tenant_info.assign(tenant_info_);
676
    last_sql_update_time = last_sql_update_time_;
677
    ora_rowscn = ora_rowscn_;
678
  }
679
  return ret;
680
}
681

682
int ObAllTenantInfoCache::get_tenant_info(share::ObAllTenantInfo &tenant_info)
683
{
684
  int ret = OB_SUCCESS;
685
  tenant_info.reset();
686
  int64_t last_sql_update_time = OB_INVALID_TIMESTAMP;
687
  int64_t ora_rowscn = 0;
688

689
  if (OB_FAIL(get_tenant_info(tenant_info, last_sql_update_time, ora_rowscn))) {
690
    LOG_WARN("failed to get tenant info", KR(ret));
691
  }
692
  return ret;
693
}
694

695
}
696
}
697

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.