2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER_OMT
14
#include "ob_tenant_node_balancer.h"
15
#include "lib/time/ob_time_utility.h"
16
#include "lib/oblog/ob_log.h"
17
#include "lib/alloc/ob_malloc_allocator.h"
18
#include "lib/container/ob_se_array_iterator.h"
19
#include "lib/mysqlclient/ob_mysql_proxy.h"
20
#include "share/ob_tenant_mgr.h"
21
#include "share/ob_debug_sync.h"
22
#include "share/system_variable/ob_sys_var_class_type.h"
23
#include "observer/ob_inner_sql_result.h"
25
#include "ob_multi_tenant.h"
26
#include "share/allocator/ob_tenant_mutil_allocator.h"
27
#include "share/allocator/ob_tenant_mutil_allocator_mgr.h"
28
#include "share/config/ob_server_config.h"
29
#include "observer/ob_server_struct.h"
30
#include "observer/omt/ob_tenant_config_mgr.h"
31
#include "storage/blocksstable/ob_block_manager.h"
32
#include "logservice/palf/palf_options.h"
33
#include "logservice/ob_server_log_block_mgr.h"
34
#include "storage/tx_storage/ob_tenant_freezer.h"
35
#include "storage/tx_storage/ob_ls_service.h"
36
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
37
#include "storage/slog_ckpt/ob_server_checkpoint_slog_handler.h"
38
#include "observer/ob_server_event_history_table_operator.h"
39
#ifdef OB_BUILD_TDE_SECURITY
40
#include "share/ob_master_key_getter.h"
43
using namespace oceanbase::obsys;
44
using namespace oceanbase::lib;
45
using namespace oceanbase::common;
46
using namespace oceanbase::share;
47
using namespace oceanbase::omt;
48
using namespace oceanbase::observer;
49
using namespace oceanbase::storage;
51
ObTenantNodeBalancer::ObTenantNodeBalancer()
52
: omt_(NULL), myaddr_(), unit_getter_(), lock_(common::ObLatchIds::CONFIG_LOCK),
53
refresh_interval_(10L * 1000L * 1000L)
55
if (lib::is_mini_mode()) {
56
refresh_interval_ /= 2;
60
ObTenantNodeBalancer::~ObTenantNodeBalancer()
66
int ObTenantNodeBalancer::init(ObMultiTenant *omt, common::ObMySQLProxy &sql_proxy,
67
const common::ObAddr &myaddr)
71
if (OB_FAIL(unit_getter_.init(sql_proxy, &GCONF))) {
72
LOG_ERROR("init unit getter fail", K(ret));
80
void ObTenantNodeBalancer::run1()
83
lib::set_thread_name("OmtNodeBalancer");
85
while (!has_set_stop()) {
87
int64_t sys_unit_cnt = 0;
88
ObCurTraceId::init(GCONF.self_addr_);
89
if (!ObServerCheckpointSlogHandler::get_instance().is_started()) {
90
// do nothing if not finish replaying slog
91
LOG_INFO("server slog not finish replaying, need wait");
93
} else if (OB_FAIL(unit_getter_.get_sys_unit_count(sys_unit_cnt))) {
94
LOG_WARN("get sys unit count fail", KR(ret));
95
} else if (sys_unit_cnt <= 0) {
96
// check wether sys tenant has been created, do nothing if sys tenant has not been created
97
LOG_INFO("sys tenant has not been created, tenant node balancer can not run, need wait",
100
} else if (OB_FAIL(unit_getter_.get_server_tenant_configs(myaddr_, units))) {
101
LOG_WARN("get server tenant units fail", K(myaddr_), K(ret));
102
} else if (OB_FAIL(refresh_tenant(units))) {
103
LOG_WARN("failed to refresh tenant", K(ret), K(units));
104
} else if (FALSE_IT(periodically_check_tenant())) {
108
FLOG_INFO("refresh tenant units", K(sys_unit_cnt), K(units), KR(ret));
110
// will try to update tma whether tenant unit is changed or not,
111
// because memstore_limit_percentage may be changed
112
int tmp_ret = OB_SUCCESS;
113
if (OB_SUCCESS != (tmp_ret = TMA_MGR_INSTANCE.update_tenant_mem_limit(units))) {
114
LOG_WARN("TMA_MGR_INSTANCE.update_tenant_mem_limit failed", K(tmp_ret));
117
// check whether tenant unit is changed, try to update unit config of tenant
118
ObSEArray<uint64_t, 10> tenants;
119
if (!ObServerCheckpointSlogHandler::get_instance().is_started()) {
120
// do nothing if not finish replaying slog
121
LOG_INFO("server slog not finish replaying, need wait");
123
} else if (OB_FAIL(unit_getter_.get_tenants(tenants))) {
124
LOG_WARN("get cluster tenants fail", K(ret));
125
} else if (OB_FAIL(OTC_MGR.refresh_tenants(tenants))) {
126
LOG_WARN("fail refresh tenant config", K(tenants), K(ret));
128
if (OB_SUCCESS != (tmp_ret = GCTX.log_block_mgr_->try_resize())) {
129
LOG_WARN("ObServerLogBlockMgr try_resize failed", K(tmp_ret));
132
FLOG_INFO("refresh tenant config", K(tenants), K(ret));
135
USLEEP(refresh_interval_); // sleep 10s
139
int ObTenantNodeBalancer::handle_notify_unit_resource(const obrpc::TenantServerUnitConfig &arg)
141
int ret = OB_SUCCESS;
142
if (!arg.is_delete_) {
143
if (OB_FAIL(notify_create_tenant(arg))) {
144
LOG_WARN("failed to notify update tenant", KR(ret), K(arg));
147
if (OB_FAIL(try_notify_drop_tenant(arg.tenant_id_))) {
148
LOG_WARN("fail to try drop tenant", KR(ret), K(arg));
154
int ObTenantNodeBalancer::notify_create_tenant(const obrpc::TenantServerUnitConfig &unit)
156
LOG_INFO("succ to receive notify of creating tenant", K(unit));
157
int ret = OB_SUCCESS;
158
bool is_hidden_sys = false;
159
bool unit_id_exist = false;
161
if (!unit.is_valid()) {
162
ret = OB_INVALID_ARGUMENT;
163
LOG_WARN("invalid argument", KR(ret), K(unit));
164
} else if (!ObServerCheckpointSlogHandler::get_instance().is_started()) {
165
ret = OB_SERVER_IS_INIT;
166
LOG_WARN("slog replay not finish", KR(ret),K(unit));
167
} else if (is_meta_tenant(unit.tenant_id_)) {
168
ret = OB_OP_NOT_ALLOW;
169
LOG_WARN("can not create meta tenant", K(ret), K(unit));
170
} else if (OB_FAIL(omt_->check_if_hidden_sys(unit.tenant_id_, is_hidden_sys))) {
171
LOG_WARN("fail to check_if_hidden_sys", KR(ret), K(unit));
172
} else if (omt_->has_tenant(unit.tenant_id_) && !is_hidden_sys) {
173
ret = OB_TENANT_EXIST;
174
LOG_WARN("tenant has exist", KR(ret), K(unit));
175
} else if (is_user_tenant(unit.tenant_id_) && omt_->has_tenant(gen_meta_tenant_id(unit.tenant_id_))) {
176
ret = OB_TENANT_EXIST;
177
LOG_WARN("meta tenant has exist", KR(ret), K(unit));
178
// TODO(fenggu.yh) 临时注释,防止创建租户失败
179
//} else if (OB_FAIL(omt_->check_if_unit_id_exist(unit.unit_id_, unit_id_exist))) {
180
// LOG_WARN("fail to check_if_unit_id_exist", KR(ret), K(unit));
181
} else if (unit_id_exist) { // the unit may be wait_gc status
182
ret = OB_ENTRY_EXIST;
183
LOG_WARN("unit_id exist", KR(ret), K(unit));
185
const uint64_t tenant_id = unit.tenant_id_;
186
ObUnitInfoGetter::ObTenantConfig basic_tenant_unit;
187
ObUnitInfoGetter::ObTenantConfig meta_tenant_unit;
188
const bool has_memstore = (unit.replica_type_ != REPLICA_TYPE_LOGONLY);
189
const int64_t create_timestamp = ObTimeUtility::current_time();
190
basic_tenant_unit.unit_status_ = ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL;
191
const int64_t create_tenant_timeout_ts = THIS_WORKER.get_timeout_ts();
193
if (create_tenant_timeout_ts < create_timestamp) {
195
LOG_WARN("notify_create_tenant has timeout", K(ret), K(create_timestamp), K(create_tenant_timeout_ts));
196
} else if (OB_FAIL(basic_tenant_unit.init(tenant_id,
198
ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL,
203
false /*is_removed*/))) {
204
LOG_WARN("fail to init user tenant config", KR(ret), K(unit));
205
} else if (is_user_tenant(tenant_id)
206
&& OB_FAIL(basic_tenant_unit.divide_meta_tenant(meta_tenant_unit))) {
207
LOG_WARN("divide meta tenant failed", KR(ret), K(unit), K(basic_tenant_unit));
208
} else if (OB_FAIL(check_new_tenant(basic_tenant_unit, false /*check_data_version*/, create_tenant_timeout_ts))) {
209
LOG_WARN("failed to create new tenant", KR(ret), K(basic_tenant_unit), K(create_tenant_timeout_ts));
212
LOG_INFO("succ to create new user tenant", KR(ret), K(unit), K(basic_tenant_unit), K(create_tenant_timeout_ts));
214
#ifdef OB_BUILD_TDE_SECURITY
215
// get and set root_key
217
if (!unit.with_root_key_) {
219
if (OB_FAIL(ObMasterKeyGetter::instance().get_root_key(tenant_id, root_key, true))) {
220
LOG_WARN("failed to get root key", KR(ret));
223
const obrpc::ObRootKeyResult &root_key = unit.root_key_;
224
if (obrpc::RootKeyType::INVALID == root_key.key_type_) {
226
LOG_INFO("root_key got from RS is INVALID, won't set now", KR(ret));
227
} else if (OB_FAIL(ObMasterKeyGetter::instance().set_root_key(
228
tenant_id, root_key.key_type_, root_key.root_key_))) {
229
LOG_WARN("failed to set root_key", KR(ret));
234
// create meta tenant
235
if (OB_SUCC(ret) && is_user_tenant(tenant_id)) {
236
if (OB_FAIL(check_new_tenant(meta_tenant_unit, false /*check_data_version*/, create_tenant_timeout_ts))) {
237
LOG_WARN("failed to create meta tenant", KR(ret), K(meta_tenant_unit), K(create_tenant_timeout_ts));
240
LOG_INFO("succ to create meta tenant", KR(ret), K(meta_tenant_unit), K(create_tenant_timeout_ts));
245
// In standby cluster, may repeat create tenant, if if_not_grant_ is true, ignore OB_TENANT_EXIST
246
if (OB_TENANT_EXIST == ret && unit.if_not_grant_) {
247
if (GCTX.is_standby_cluster()) {
255
// 标记删除,而不是直接删,是因为并发时,另一个线程可能刷到tenant了,但是还没有refresh tenant,
256
// 此时drop tenant将tenant删除了,另一个线程过一会refresh tenant时,又给加回来了
257
// 所以这里只做标记,删除tenant统一在refresh tenant里做
258
int ObTenantNodeBalancer::try_notify_drop_tenant(const int64_t tenant_id)
260
LOG_INFO("[DELETE_TENANT] succ to receive notify of dropping tenant", K(tenant_id));
261
int ret = OB_SUCCESS;
262
int tmp_ret = OB_SUCCESS;
263
TCWLockGuard guard(lock_);
264
uint64_t meta_tenant_id = OB_INVALID_TENANT_ID;
265
if (OB_UNLIKELY(is_meta_tenant(tenant_id))) {
266
ret = OB_OP_NOT_ALLOW;
267
LOG_WARN("meta tenant is not allowed", KR(ret), K(tenant_id));
268
} else if (OB_ISNULL(omt_)) {
269
ret = OB_ERR_UNEXPECTED;
270
LOG_WARN("omt_ is null", KR(ret),KP(omt_));
272
if (OB_TMP_FAIL(omt_->mark_del_tenant(tenant_id))) {
273
LOG_WARN("fail to mark del user_tenant", KR(ret), KR(tmp_ret), K(tenant_id));
274
ret = OB_SUCC(ret) ? tmp_ret : ret;
276
meta_tenant_id = gen_meta_tenant_id(tenant_id);
277
if (OB_TMP_FAIL(omt_->mark_del_tenant(meta_tenant_id))) {
278
LOG_WARN("fail to mark del meta_tenant", KR(ret), KR(tmp_ret), K(meta_tenant_id));
279
ret = OB_SUCC(ret) ? tmp_ret : ret;
282
LOG_INFO("[DELETE_TENANT] mark drop tenant", KR(ret), K(tenant_id), K(meta_tenant_id));
286
int ObTenantNodeBalancer::get_server_allocated_resource(ServerResource &server_resource)
288
int ret = OB_SUCCESS;
289
server_resource.reset();
290
TenantUnits tenant_units;
292
if (OB_FAIL(omt_->get_tenant_units(tenant_units, false))) {
293
LOG_WARN("failed to get tenant units");
295
for (int64_t i = 0; i < tenant_units.count(); i++) {
296
// META tenant and USER tenant share CPU resource, so skip META tenant
297
if (! is_meta_tenant(tenant_units.at(i).tenant_id_)) {
298
server_resource.max_cpu_ += tenant_units.at(i).config_.max_cpu();
299
server_resource.min_cpu_ += tenant_units.at(i).config_.min_cpu();
301
int64_t extra_memory = is_sys_tenant(tenant_units.at(i).tenant_id_) ? GMEMCONF.get_extra_memory() : 0;
302
server_resource.memory_size_ += max(ObMallocAllocator::get_instance()->get_tenant_limit(tenant_units.at(i).tenant_id_) - extra_memory,
303
tenant_units.at(i).config_.memory_size());
304
server_resource.log_disk_size_ += tenant_units.at(i).config_.log_disk_size();
311
int ObTenantNodeBalancer::lock_tenant_balancer()
313
return lock_.rdlock();
316
int ObTenantNodeBalancer::unlock_tenant_balancer()
318
return lock_.unlock();
321
int ObTenantNodeBalancer::check_del_tenants(const TenantUnits &local_units, TenantUnits &units)
323
int ret = OB_SUCCESS;
325
for (int64_t i = 0; i < local_units.count(); i++) {
326
bool tenant_exists = false;
327
const ObUnitInfoGetter::ObTenantConfig &local_unit = local_units.at(i);
328
for (auto punit = units.begin(); punit != units.end(); punit++) {
329
if (local_unit.tenant_id_ == punit->tenant_id_) {
330
tenant_exists = true;
334
if (!tenant_exists) {
335
LOG_INFO("[DELETE_TENANT] begin to delete tenant", K(local_unit));
336
if (OB_SYS_TENANT_ID == local_unit.tenant_id_) {
337
LOG_INFO("[DELETE_TENANT] need convert_real_to_hidden_sys_tenant");
338
if (OB_FAIL(omt_->convert_real_to_hidden_sys_tenant())) {
339
LOG_INFO("fail to convert_real_to_hidden_sys_tenant", K(ret));
341
} else if (OB_FAIL(omt_->del_tenant(local_unit.tenant_id_))) {
342
LOG_WARN("delete tenant fail", K(local_unit), K(ret));
350
int ObTenantNodeBalancer::check_new_tenants(TenantUnits &units)
352
int ret = OB_SUCCESS;
353
int tmp_ret = OB_SUCCESS;
355
DEBUG_SYNC(CHECK_NEW_TENANT);
357
const bool check_data_version = true;
358
// check all units of tenants.
359
for (TenantUnits::iterator it = units.begin(); it != units.end(); it++) {
360
if (OB_TMP_FAIL(check_new_tenant(*it, check_data_version))) {
361
LOG_WARN("failed to check new tenant", KR(tmp_ret));
362
ret = OB_SUCC(ret) ? tmp_ret : ret;
368
int ObTenantNodeBalancer::check_new_tenant(
369
const ObUnitInfoGetter::ObTenantConfig &unit,
370
const bool check_data_version,
371
const int64_t abs_timeout_us)
373
int ret = OB_SUCCESS;
375
const int64_t tenant_id = unit.tenant_id_;
376
ObTenant *tenant = nullptr;
378
if (OB_FAIL(omt_->get_tenant(tenant_id, tenant))) {
379
if (is_sys_tenant(tenant_id)) {
380
ret = OB_ERR_UNEXPECTED;
381
LOG_ERROR("real or hidden sys tenant must be exist", K(ret));
384
ObTenantMeta tenant_meta;
385
ObTenantSuperBlock super_block(tenant_id, false /*is_hidden*/); // empty super block
386
const bool should_check_data_version = check_data_version && is_user_tenant(tenant_id);
387
uint64_t data_version = 0;
388
if (should_check_data_version && OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
389
if (OB_ENTRY_NOT_EXIST == ret) {
391
LOG_WARN("data_version not refreshed yet, create tenant later", KR(ret), K(tenant_id));
393
LOG_WARN("fail to get data_version", KR(ret), K(tenant_id));
395
} else if (OB_FAIL(tenant_meta.build(unit, super_block))) {
396
LOG_WARN("fail to build tenant meta", K(ret));
397
} else if (OB_FAIL(omt_->create_tenant(tenant_meta, true /* write_slog */, abs_timeout_us))) {
398
LOG_WARN("fail to create new tenant", K(ret), K(tenant_id));
402
int64_t extra_memory = 0;
403
if (is_sys_tenant(tenant_id)) {
404
if (tenant->is_hidden() && OB_FAIL(omt_->convert_hidden_to_real_sys_tenant(unit, abs_timeout_us))) {
405
LOG_WARN("fail to create real sys tenant", K(unit));
407
extra_memory = GMEMCONF.get_extra_memory();
409
if (OB_SUCC(ret) && !(unit == tenant->get_unit())) {
410
if (OB_FAIL(omt_->update_tenant_unit(unit))) {
411
LOG_WARN("fail to update tenant unit", K(ret), K(tenant_id));
414
if (OB_SUCC(ret) && OB_FAIL(omt_->update_tenant_memory(unit, extra_memory))) {
415
LOG_ERROR("fail to update tenant memory", K(ret), K(tenant_id));
418
if (OB_SUCC(ret) && !is_virtual_tenant_id(tenant_id)) {
419
if (OB_FAIL(omt_->modify_tenant_io(tenant_id, unit.config_))) {
420
LOG_WARN("modify tenant io config failed", K(ret), K(tenant_id), K(unit.config_));
426
int ObTenantNodeBalancer::refresh_hidden_sys_memory()
428
int ret = OB_SUCCESS;
429
int64_t allowed_mem_limit = 0;
430
ObTenant *tenant = nullptr;
431
if (OB_FAIL(omt_->get_tenant(OB_SYS_TENANT_ID, tenant))) {
432
LOG_WARN("get sys tenant failed", K(ret));
433
} else if (OB_ISNULL(tenant) || !tenant->is_hidden()) {
435
} else if (OB_FAIL(omt_->update_tenant_memory(OB_SYS_TENANT_ID, GMEMCONF.get_hidden_sys_memory(), allowed_mem_limit))) {
436
LOG_WARN("update hidden sys tenant memory failed", K(ret));
438
LOG_INFO("update hidden sys tenant memory succeed ", K(allowed_mem_limit));
443
void ObTenantNodeBalancer::periodically_check_tenant()
445
int ret = OB_SUCCESS;
446
struct TenantHandlePair {
449
TO_STRING_KV(KP(tenant_));
451
ObSEArray<TenantHandlePair, 32> pairs;
452
omt_->lock_tenant_list();
453
TenantList &tenants = omt_->get_tenant_list();
454
ObArenaAllocator alloc("lock_diagnose");
455
for (TenantList::iterator it = tenants.begin();
459
ObLDHandle *handle = nullptr;
460
if (!OB_ISNULL(*it) && !(*it)->has_stopped()) {
461
if (OB_ISNULL(ptr = alloc.alloc(sizeof(ObLDHandle)))) {
462
ret = OB_ALLOCATE_MEMORY_FAILED;
463
LOG_WARN("failed to alloc", K(ret));
464
} else if (FALSE_IT(handle = new (ptr) ObLDHandle())) {
465
} else if (OB_FAIL((*it)->rdlock(*handle))) {
466
LOG_WARN("failed to rd lock tenant", K(ret));
468
TenantHandlePair pair;
470
pair.handle_ = handle;
471
if (OB_FAIL(pairs.push_back(pair))) {
472
LOG_WARN("failed to push back tenant", K(ret));
473
} else {/*do-nothing*/}
476
IGNORE_RETURN (*it)->unlock(*handle);
481
omt_->unlock_tenant_list();
484
for (auto it = pairs.begin();
487
(*it).tenant_->periodically_check();
488
IGNORE_RETURN (*it).tenant_->unlock(*(*it).handle_);
492
// Although unit has been deleted, the local cached unit cannot be deleted if the tenant still holds resource
493
int ObTenantNodeBalancer::fetch_effective_tenants(const TenantUnits &old_tenants, TenantUnits &new_tenants)
495
int ret = OB_SUCCESS;
497
bool is_released = false;
500
for (int64_t i = 0; OB_SUCC(ret) && i < old_tenants.count(); i++) {
502
const ObUnitInfoGetter::ObTenantConfig &tenant_config = old_tenants.at(i);
503
const ObUnitInfoGetter::ObUnitStatus local_unit_status = tenant_config.unit_status_;
504
for (int64_t j = 0; j < new_tenants.count(); j++) {
505
if (tenant_config.tenant_id_ == new_tenants.at(j).tenant_id_) {
506
new_tenants.at(j).create_timestamp_ = tenant_config.create_timestamp_;
507
new_tenants.at(j).is_removed_ = tenant_config.is_removed_;
514
ObTenant *tenant = nullptr;
515
MTL_SWITCH(tenant_config.tenant_id_) {
516
if (OB_FAIL(MTL(ObTenantMetaMemMgr*)->check_all_meta_mem_released(is_released, "[DELETE_TENANT]"))) {
517
LOG_WARN("fail to check_all_meta_mem_released", K(ret), K(tenant_config));
518
} else if (!is_released) {
519
// can not release now. dump some debug info
520
const uint64_t interval = 180 * 1000 * 1000; // 180s
521
if (!is_released && REACH_TIME_INTERVAL(interval)) {
522
MTL(ObTenantMetaMemMgr*)->dump_tablet_info();
523
MTL(ObLSService *)->dump_ls_info();
524
PRINT_OBJ_LEAK(MTL_ID(), share::LEAK_CHECK_OBJ_MAX_NUM);
527
// check ls service is empty.
528
is_released = MTL(ObLSService *)->is_empty();
531
bool is_tenant_snapshot_released = false;
532
if (is_user_tenant(tenant_config.tenant_id_)) {
533
const int64_t now_time = ObTimeUtility::current_time();
534
const int64_t life_time = now_time - tenant_config.create_timestamp_;
535
if (tenant_config.is_removed_ || life_time >= RECYCLE_LATENCY) {
536
MTL(ObTenantSnapshotService*)->notify_unit_is_deleting();
538
if (OB_FAIL(MTL(ObTenantSnapshotService*)->
539
check_all_tenant_snapshot_released(is_tenant_snapshot_released))) {
540
LOG_WARN("fail to check_all_tenant_snapshot_released", K(ret), K(tenant_config));
541
} else if (!is_tenant_snapshot_released) {
542
// can not release now. dump some debug info
543
const uint64_t interval = 180 * 1000 * 1000; // 180s
544
if (!is_tenant_snapshot_released && REACH_TIME_INTERVAL(interval)) {
545
MTL(ObTenantSnapshotService*)->dump_all_tenant_snapshot_info();
547
LOG_INFO("[DELETE_TENANT] tenant has been dropped, tenant snapshot is still waiting for gc",
551
is_released = is_released && is_tenant_snapshot_released;
559
// remove local units after RECYCLE_LATENCY to avoid removing by mistake
560
// but if marked removed, remove it directly without waiting
561
const int64_t now_time = ObTimeUtility::current_time();
562
const int64_t life_time = now_time - tenant_config.create_timestamp_;
563
if ((!tenant_config.is_removed_ && life_time < RECYCLE_LATENCY) || !is_released) {
564
if (OB_FAIL(tenants.push_back(tenant_config))) {
565
LOG_WARN("failed to push back tenant", KR(ret));
567
// update tenant unit status which need be deleted
568
// need wait gc in observer
569
// NOTE: only update unit status when can not release resource
571
tenants.at(tenants.count() - 1).unit_status_ = ObUnitInfoGetter::UNIT_WAIT_GC_IN_OBSERVER;
572
// add a event when try to gc for the first time
573
if (local_unit_status != ObUnitInfoGetter::ObUnitStatus::UNIT_WAIT_GC_IN_OBSERVER &&
574
local_unit_status != ObUnitInfoGetter::ObUnitStatus::UNIT_DELETING_IN_OBSERVER) {
575
SERVER_EVENT_ADD("unit", "start unit gc", "tenant_id", tenant_config.tenant_id_,
576
"unit_id", tenant_config.unit_id_, "unit_status", "WAIT GC");
580
LOG_INFO("[DELETE_TENANT] tenant has been dropped. can not delete tenant",
581
K(is_released), "local_unit_status", ObUnitInfoGetter::get_unit_status_str(local_unit_status),
582
"is_removed", tenant_config.is_removed_,
583
"create_timestamp", tenant_config.create_timestamp_,
584
K(life_time), K(tenant_config));
587
LOG_INFO("[DELETE_TENANT] tenant has been dropped. can delete tenant",
588
K(is_released), "local_unit_status", ObUnitInfoGetter::get_unit_status_str(local_unit_status),
589
"is_removed", tenant_config.is_removed_,
590
"create_timestamp", tenant_config.create_timestamp_,
591
K(life_time), K(tenant_config));
597
for (int64_t i = 0; OB_SUCC(ret) && i < tenants.count(); i++) {
598
if (OB_FAIL(new_tenants.push_back(tenants.at(i)))) {
599
LOG_WARN("failed to add new tenant", K(ret));
606
int ObTenantNodeBalancer::refresh_tenant(TenantUnits &units)
608
int ret = OB_SUCCESS;
610
TenantUnits local_units;
611
if (OB_FAIL(omt_->get_tenant_units(local_units, false))) {
612
LOG_WARN("failed to get local tenant units");
613
} else if (OB_FAIL(fetch_effective_tenants(local_units, units))) {
614
LOG_WARN("failed to fetch effective tenants", K(local_units));
618
if (OB_FAIL(check_new_tenants(units))) {
619
LOG_WARN("check and add new tenant fail", K(ret));
620
ret = OB_SUCCESS; // just don't affect the following process in run1().
625
if (OB_FAIL(check_del_tenants(local_units, units))) { // overwrite ret
626
LOG_WARN("check delete tenant fail", K(ret));
629
if (OB_FAIL(refresh_hidden_sys_memory())) { // overwrite ret
630
LOG_WARN("refresh hidden sys memory failed", K(ret));
637
int ObTenantNodeBalancer::update_tenant_memory(const obrpc::ObTenantMemoryArg &tenant_memory)
639
int ret = OB_SUCCESS;
640
const int64_t tenant_id = tenant_memory.tenant_id_;
641
const int64_t memory_size = tenant_memory.memory_size_;
642
const int64_t refresh_interval = tenant_memory.refresh_interval_;
644
ObUnitInfoGetter::ObTenantConfig unit;
645
int64_t allowed_mem_limit = 0;
647
TCWLockGuard guard(lock_);
649
if (!tenant_memory.is_valid()) {
650
ret = OB_INVALID_ARGUMENT;
651
LOG_WARN("invalid argument", K(ret), K(tenant_memory));
652
} else if (OB_FAIL(omt_->get_tenant_unit(tenant_id, unit))) {
653
LOG_WARN("failed to get tenant config", K(ret), K(tenant_id));
658
} else if (OB_FAIL(omt_->update_tenant_memory(tenant_id, memory_size, allowed_mem_limit))) {
659
LOG_WARN("failed to update tenant memory", K(ret), K(tenant_id), K(memory_size));
660
} else if (OB_FAIL(omt_->update_tenant_freezer_mem_limit(tenant_id, unit.config_.memory_size(), allowed_mem_limit))) {
661
LOG_WARN("set_tenant_freezer_mem_limit failed", K(ret), K(tenant_id));
663
refresh_interval_ = refresh_interval * 1000L * 1000L;
664
LOG_INFO("succ to admin update tenant memory", K(tenant_id), K(memory_size));