2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER_OMT
14
#include "ob_tenant_config_mgr.h"
15
#include "lib/thread/thread_mgr.h"
16
#include "observer/ob_sql_client_decorator.h"
17
#include "observer/ob_server_struct.h"
18
#include "share/config/ob_common_config.h"
19
#include "ob_multi_tenant.h"
21
#include "share/ob_rpc_struct.h"
22
#include "share/inner_table/ob_inner_table_schema_constants.h"
23
#include "share/schema/ob_multi_version_schema_service.h"
25
using namespace oceanbase::common;
26
using namespace oceanbase::share;
31
ObTenantConfigGuard::ObTenantConfigGuard() : ObTenantConfigGuard(nullptr)
35
ObTenantConfigGuard::ObTenantConfigGuard(ObTenantConfig *config)
40
ObTenantConfigGuard::~ObTenantConfigGuard()
42
if (OB_NOT_NULL(config_)) {
47
void ObTenantConfigGuard::set_config(ObTenantConfig *config)
49
if (OB_NOT_NULL(config_)) {
55
int TenantConfigInfo::assign(const TenantConfigInfo &rhs)
58
tenant_id_ = rhs.tenant_id_;
59
if (OB_FAIL(name_.assign(rhs.name_))) {
60
LOG_WARN("assign name fail", K_(name), K(ret));
61
} else if (OB_FAIL(data_type_.assign(rhs.data_type_))) {
62
LOG_WARN("assign data_type fail", K_(data_type), K(ret));
63
} else if (OB_FAIL(value_.assign(rhs.value_))) {
64
LOG_WARN("assign value fail", K_(value), K(ret));
65
} else if (OB_FAIL(info_.assign(rhs.info_))) {
66
LOG_WARN("assign info fail", K_(info), K(ret));
67
} else if (OB_FAIL(section_.assign(rhs.section_))) {
68
LOG_WARN("assign section fail", K_(section), K(ret));
69
} else if (OB_FAIL(scope_.assign(rhs.scope_))) {
70
LOG_WARN("assign scope fail", K_(scope), K(ret));
71
} else if (OB_FAIL(source_.assign(rhs.source_))) {
72
LOG_WARN("assign source fail", K_(source), K(ret));
73
} else if (OB_FAIL(edit_level_.assign(rhs.edit_level_))) {
74
LOG_WARN("assign edit_level fail", K_(edit_level), K(ret));
79
int64_t TenantConfigInfo::to_string(char *buf, const int64_t buf_len) const
83
if (OB_NOT_NULL(buf) && buf_len > 0) {
84
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "tenant_id: [%lu], name: [%s],"
85
"value: [%s], info: [%s]",
86
tenant_id_, name_.ptr(), value_.ptr(), info_.ptr()))) {
88
LOG_WARN("to_string buff fail", K(ret));
94
ObTenantConfigMgr::ObTenantConfigMgr()
95
: rwlock_(ObLatchIds::CONFIG_LOCK), inited_(false), self_(), sql_proxy_(nullptr),
96
config_map_(), config_version_map_(), sys_config_mgr_(nullptr),
97
version_has_refreshed_(false), update_tenant_config_cb_()
101
ObTenantConfigMgr::~ObTenantConfigMgr()
105
ObTenantConfigMgr &ObTenantConfigMgr::get_instance()
107
static ObTenantConfigMgr ob_tenant_config_mgr;
108
return ob_tenant_config_mgr;
111
int ObTenantConfigMgr::init(ObMySQLProxy &sql_proxy,
112
const ObAddr &server,
113
ObConfigManager *config_mgr,
114
const UpdateTenantConfigCb &update_tenant_config_cb)
116
int ret = OB_SUCCESS;
117
sql_proxy_ = &sql_proxy;
119
sys_config_mgr_ = config_mgr;
120
update_tenant_config_cb_ = update_tenant_config_cb;
121
ret = config_version_map_.create(oceanbase::common::OB_MAX_SERVER_TENANT_CNT,
122
oceanbase::common::ObModIds::OB_HASH_BUCKET_CONF_CONTAINER,
123
oceanbase::common::ObModIds::OB_HASH_NODE_CONF_CONTAINER);
128
void ObTenantConfigMgr::refresh_config_version_map(const ObIArray<uint64_t> &tenants)
130
int ret = OB_SUCCESS;
131
if (true == version_has_refreshed_) {
132
// no need to refresh again
133
} else if (OB_ISNULL(sql_proxy_)) {
135
LOG_WARN("sql proxy is null", K(ret));
137
ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_);
138
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
139
bool has_sql_error = false;
140
for (int i = 0; OB_SUCC(ret) && i < tenants.count(); ++i) {
142
uint64_t tenant_id = tenants.at(i);
143
uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
145
if (OB_FAIL(sql.assign_fmt("select max(config_version) from %s where tenant_id = '%lu'",
146
OB_TENANT_PARAMETER_TNAME, tenant_id))) {
147
LOG_WARN("fail to generate sql", KR(ret), K(tenant_id));
148
} else if (OB_FAIL(sql_client_retry_weak.read(result, exec_tenant_id, sql.ptr()))) {
149
has_sql_error = true;
151
LOG_INFO("read config from __tenant_parameter failed",
152
KR(ret), K(tenant_id), K(exec_tenant_id), K(sql));
153
} else if (NULL == result.get_result()) {
154
ret = OB_ERR_UNEXPECTED;
155
LOG_WARN("config result is null", K(tenant_id), K(ret));
156
} else if (OB_FAIL(result.get_result()->next())) {
157
LOG_WARN("get result next failed", K(tenant_id), K(ret));
158
} else if (OB_FAIL(result.get_result()->get_int(0L, version))) {
159
if (OB_ERR_NULL_VALUE != ret) {
160
LOG_WARN("get config_version failed", K(tenant_id), K(ret));
162
LOG_INFO("tenant has no config", K(tenant_id));
165
} else if (OB_FAIL(set_tenant_config_version(tenant_id, version))) {
166
LOG_WARN("set config version failed", K(tenant_id), K(ret));
168
LOG_INFO("update tenant config version success", K(tenant_id), K(version));
171
if ((OB_SUCCESS == ret) && (false == has_sql_error)) {
172
version_has_refreshed_ = true;
173
LOG_INFO("update all tenant config success");
179
// 背景:每个 server 上需要保存所有租户的 config 信息
180
// 当新建租户/删除租户时需要对应维护 config 状态。
183
// ACTION: 根据 tenants 信息,决定要添加/删除哪些租户配置项
184
int ObTenantConfigMgr::refresh_tenants(const ObIArray<uint64_t> &tenants)
186
int ret = OB_SUCCESS;
187
ObSEArray<uint64_t, 1> new_tenants;
188
ObSEArray<uint64_t, 1> del_tenants;
190
if (tenants.count() > 0) {
193
DRWLock::RDLockGuard guard(rwlock_);
194
for (int i = 0; i < tenants.count(); ++i) {
195
uint64_t tenant_id = tenants.at(i);
196
ObTenantConfig *const *config = nullptr;
197
if (NULL == (config = config_map_.get(ObTenantID(tenant_id)))) {
198
// 如果 tenant 不存在,则添加到 config_map 中
199
if (OB_FAIL(new_tenants.push_back(tenant_id))) {
200
LOG_WARN("fail add tenant config", K(tenant_id), K(ret));
207
DRWLock::RDLockGuard guard(rwlock_);
208
TenantConfigMap::const_iterator it = config_map_.begin();
209
for (; it != config_map_.end(); ++it) {
210
uint64_t tenant_id = it->first.tenant_id_;
211
if (OB_SYS_TENANT_ID == tenant_id) {
214
bool need_del = true;
215
for (int i = 0; i < tenants.count(); ++i) {
216
if (tenant_id == tenants.at(i) || GCTX.omt_->has_tenant(tenant_id)) {
222
if (OB_FAIL(del_tenants.push_back(tenant_id))) {
223
LOG_WARN("fail add tenant config", K(tenant_id), K(ret));
231
for (int i = 0; i < new_tenants.count(); ++i) {
232
if (OB_FAIL(add_tenant_config(new_tenants.at(i)))) {
233
LOG_WARN("fail add tenant config", K(i), K(new_tenants.at(i)), K(ret));
235
LOG_INFO("add created tenant config succ", K(i), K(new_tenants.at(i)));
239
for (int i = 0; i < del_tenants.count(); ++i) {
240
if (OB_FAIL(del_tenant_config(del_tenants.at(i)))) {
241
LOG_WARN("fail del tenant config, will try later", K(i), K(del_tenants.at(i)), K(ret));
243
LOG_INFO("del dropped tenant config succ.", K(i), K(del_tenants.at(i)));
247
refresh_config_version_map(tenants);
252
// This function will be called in the early stage in bootstrap/create tenant.
253
// Meanwhile, related tenant's tables are not readable, so it's safe to call add_extra_config().
254
int ObTenantConfigMgr::init_tenant_config(const obrpc::ObTenantConfigArg &arg)
256
int ret = OB_SUCCESS;
257
if (OB_FAIL(add_tenant_config(arg.tenant_id_))) {
258
LOG_WARN("fail to add tenant config", KR(ret), K(arg));
259
} else if (OB_FAIL(add_extra_config(arg))) {
260
LOG_WARN("fail to add extra config", KR(ret), K(arg));
262
DRWLock::WRLockGuard guard(rwlock_);
263
ObTenantConfig *config = nullptr;
264
if (OB_FAIL(dump2file())) {
265
LOG_WARN("fail to dump config to file", KR(ret), K(arg));
266
} else if (OB_FAIL(config_map_.get_refactored(ObTenantID(arg.tenant_id_), config))) {
267
LOG_WARN("No tenant config found", K(arg.tenant_id_), K(ret));
268
} else if (OB_FAIL(config->publish_special_config_after_dump())) {
269
LOG_WARN("publish special config after dump failed", K(ret));
275
int ObTenantConfigMgr::add_tenant_config(uint64_t tenant_id)
277
int ret = OB_SUCCESS;
278
ObTenantConfig *const *config = nullptr;
279
DRWLock::WRLockGuard guard(rwlock_);
280
if (is_virtual_tenant_id(tenant_id)
281
|| OB_NOT_NULL(config = config_map_.get(ObTenantID(tenant_id)))) {
282
if (nullptr != config) {
283
ObTenantConfig *new_config = *config;
284
new_config->set_deleting(false);
287
ObTenantConfig *new_config = nullptr;
288
new_config = OB_NEW(ObTenantConfig, SET_USE_UNEXPECTED_500("TenantConfig"), tenant_id);
289
if (OB_NOT_NULL(new_config)) {
290
if(OB_FAIL(new_config->init(this))) {
291
LOG_WARN("new tenant config init failed", K(ret));
292
} else if (OB_FAIL(config_map_.set_refactored(ObTenantID(tenant_id),
294
LOG_WARN("add new tenant config failed", K(ret));
297
ob_delete(new_config);
300
ret = OB_ALLOCATE_MEMORY_FAILED;
301
LOG_WARN("alloc new tenant config failed", K(ret));
304
LOG_INFO("tenant config added", K(tenant_id), K(ret));
308
int ObTenantConfigMgr::del_tenant_config(uint64_t tenant_id)
310
int ret = OB_SUCCESS;
311
ObTenantConfig *config = nullptr;
312
DRWLock::WRLockGuard guard(rwlock_);
313
ObTenant *tenant = NULL;
314
bool has_dropped = false;
315
if (is_virtual_tenant_id(tenant_id)) {
316
} else if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {
317
LOG_WARN("get tenant config failed", K(tenant_id), K(ret));
318
} else if (OB_FAIL(GSCHEMASERVICE.check_if_tenant_has_been_dropped(tenant_id, has_dropped))) {
319
LOG_WARN("failed to check tenant has been dropped", K(tenant_id));
320
} else if (!has_dropped && ObTimeUtility::current_time() - config->get_create_timestamp() < RECYCLE_LATENCY) {
321
LOG_WARN("tenant still exist, try to delete tenant config later...", K(tenant_id));
322
} else if (!config->is_ref_clear()) {
324
LOG_INFO("something hold config ref, try delete later...");
325
} else if (OB_ISNULL(GCTX.omt_)) {
326
ret = OB_ERR_UNEXPECTED;
327
LOG_WARN("omt is null", KR(ret));
328
} else if (GCTX.omt_->has_tenant(tenant_id)) {
329
LOG_WARN("local tenant resource still exist, try to delete tenant config later", K(tenant_id));
331
config->set_deleting();
332
if (OB_FAIL(wait(config->get_update_task()))) {
333
LOG_WARN("wait tenant config update task failed", K(ret), K(tenant_id));
334
} else if (OB_FAIL(config_map_.erase_refactored(ObTenantID(tenant_id)))) {
335
LOG_WARN("delete tenant config failed", K(ret), K(tenant_id));
338
LOG_INFO("tenant config deleted", K(tenant_id), K(ret));
344
ObTenantConfig *ObTenantConfigMgr::get_tenant_config_with_lock(
345
const uint64_t tenant_id, const uint64_t fallback_tenant_id /* = 0 */,
346
const uint64_t timeout_us /* = 0 */) const
348
int ret = OB_SUCCESS;
349
ObTenantConfig *config = nullptr;
350
DRWLock::RDLockGuard guard(rwlock_);
352
if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config)) && timeout_us > 0) {
353
ob_usleep(10 * 1000L);
354
if (REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
355
LOG_WARN("failed to get tenant config, retry for 1s", K(tenant_id), K(ret));
360
} while (!REACH_TIME_INTERVAL(timeout_us));
362
if (fallback_tenant_id > 0 && OB_INVALID_ID != fallback_tenant_id) {
363
if (OB_FAIL(config_map_.get_refactored(ObTenantID(fallback_tenant_id), config))) {
364
LOG_WARN("failed to get fallback tenant config", K(fallback_tenant_id), K(ret));
367
LOG_WARN("failed to get tenant config", K(tenant_id), K(ret));
370
if (OB_SUCC(ret) && OB_NOT_NULL(config)) {
371
config->ref(); // remember to unref outside
376
int ObTenantConfigMgr::read_tenant_config(
377
const uint64_t tenant_id,
378
const uint64_t fallback_tenant_id,
379
const SuccessFunctor &on_success,
380
const FailureFunctor &on_failure) const
382
int ret = OB_SUCCESS;
383
ObTenantConfig *config = nullptr;
384
DRWLock::RDLockGuard guard(rwlock_);
385
if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {
386
if (fallback_tenant_id > 0 && OB_INVALID_ID != fallback_tenant_id) {
387
if (OB_FAIL(config_map_.get_refactored(ObTenantID(fallback_tenant_id), config))) {
388
LOG_WARN("failed to get tenant config", K(fallback_tenant_id), K(ret), K(lbt()));
391
LOG_WARN("failed to get tenant config", K(tenant_id), K(ret));
394
if (OB_SUCC(ret) && OB_NOT_NULL(config)) {
398
LOG_WARN("fail read tenant config", K(tenant_id), K(ret));
403
void ObTenantConfigMgr::print() const
405
DRWLock::RDLockGuard guard(rwlock_);
406
TenantConfigMap::const_iterator it = config_map_.begin();
407
for (; it != config_map_.end(); ++it) {
408
if (OB_NOT_NULL(it->second)) {
414
int ObTenantConfigMgr::dump2file()
416
int ret = OB_SUCCESS;
417
if (OB_FAIL(sys_config_mgr_->dump2file())) {
418
LOG_WARN("failed to dump2file", K(ret));
419
} else if (OB_FAIL(sys_config_mgr_->config_backup())) {
420
LOG_WARN("failed to dump2file backup", K(ret));
425
int ObTenantConfigMgr::set_tenant_config_version(uint64_t tenant_id, int64_t version)
427
int ret = OB_SUCCESS;
428
int64_t cur_version = 0;
429
DRWLock::WRLockGuard guard(rwlock_);
430
if (OB_FAIL(config_version_map_.get_refactored(ObTenantID(tenant_id), cur_version))) {
431
if (OB_HASH_NOT_EXIST != ret) {
432
LOG_WARN("get tenant config version fail", K(tenant_id), K(ret));
438
if (version <= cur_version) {
439
// avoid set smaller version when refresh config_version_map_ in refresh_config_version_map()
440
} else if (OB_FAIL(config_version_map_.set_refactored(ObTenantID(tenant_id), version, 1))) {
441
LOG_WARN("set tenant config version fail", K(tenant_id), K(version), K(ret));
447
int64_t ObTenantConfigMgr::get_tenant_config_version(uint64_t tenant_id)
449
int64_t version = ObSystemConfig::INIT_VERSION;
450
int ret = OB_SUCCESS;
451
DRWLock::RDLockGuard guard(rwlock_);
452
if (OB_FAIL(config_version_map_.get_refactored(ObTenantID(tenant_id), version))) {
453
if (OB_HASH_NOT_EXIST != ret) {
454
LOG_WARN("get tenant config version fail", K(tenant_id), K(ret));
461
int ObTenantConfigMgr::get_lease_response(share::ObLeaseResponse &lease_response)
463
int ret = OB_SUCCESS;
464
std::pair<uint64_t, int64_t> pair;
465
lease_response.tenant_config_version_.reset();
466
lease_response.tenant_config_version_.reserve(config_version_map_.size());
467
DRWLock::RDLockGuard guard(rwlock_);
468
TenantConfigVersionMap::const_iterator it = config_version_map_.begin();
469
for (; it != config_version_map_.end() && OB_SUCC(ret); ++it) {
470
pair.first = it->first.tenant_id_;
471
pair.second = it->second;
472
if (OB_FAIL(lease_response.tenant_config_version_.push_back(pair))) {
473
LOG_WARN("push back tenant config fail",
474
"tenant_id", pair.first, "version", pair.second, K(ret));
476
LOG_DEBUG("push back tenant config response succ",
477
"tenant", pair.first, "version", pair.second);
481
// 和 obs 端约定,如果 lease_response 中没有返回任何 config version,则 obs 什么都不做
482
lease_response.tenant_config_version_.reset();
483
} else if (0 == config_version_map_.size()) {
484
ret = OB_ENTRY_NOT_EXIST;
485
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
486
LOG_INFO("config version map size is 0, try get lease later");
493
void ObTenantConfigMgr::get_lease_request(share::ObLeaseRequest &lease_request)
495
int ret = OB_SUCCESS;
496
DRWLock::RDLockGuard guard(rwlock_);
497
std::pair<uint64_t, int64_t> pair;
498
TenantConfigMap::const_iterator it = config_map_.begin();
499
for (; it != config_map_.end(); ++it) {
500
if (OB_NOT_NULL(it->second)) {
501
pair.first = it->first.tenant_id_;
502
pair.second = it->second->get_current_version();
503
if (OB_FAIL(lease_request.tenant_config_version_.push_back(pair))) {
504
LOG_WARN("push back tenant config fail", K(ret));
510
// for __all_virtual_tenant_parameter_info
511
int ObTenantConfigMgr::get_all_tenant_config_info(common::ObArray<TenantConfigInfo> &all_config)
513
int ret = OB_SUCCESS;
514
DRWLock::RDLockGuard guard(rwlock_);
515
TenantConfigMap::const_iterator it = config_map_.begin();
516
for (; OB_SUCC(ret) && it != config_map_.end(); ++it) {
517
uint64_t tenant_id = it->first.tenant_id_;
518
ObTenantConfig *tenant_config = it->second;
519
for (ObConfigContainer::const_iterator iter = tenant_config->get_container().begin();
520
iter != tenant_config->get_container().end(); iter++) {
521
TenantConfigInfo config_info(tenant_id);
522
if (0 == ObString("compatible").case_compare(iter->first.str())
523
&& !iter->second->value_updated()) {
524
if (OB_FAIL(config_info.set_value("0.0.0.0"))) {
525
LOG_WARN("set value fail", K(iter->second->str()), K(ret));
528
if (OB_FAIL(config_info.set_value(iter->second->str()))) {
529
LOG_WARN("set value fail", K(iter->second->str()), K(ret));
532
if (FAILEDx(config_info.set_name(iter->first.str()))) {
533
LOG_WARN("set name fail", K(iter->first.str()), K(ret));
534
} else if (OB_FAIL(config_info.set_data_type(iter->second->data_type()))) {
535
LOG_WARN("set data_type fail", K(iter->second->data_type()), K(ret));
536
} else if (OB_FAIL(config_info.set_info(iter->second->info()))) {
537
LOG_WARN("set info fail", K(iter->second->info()), K(ret));
538
} else if (OB_FAIL(config_info.set_section(iter->second->section()))) {
539
LOG_WARN("set section fail", K(iter->second->section()), K(ret));
540
} else if (OB_FAIL(config_info.set_scope(iter->second->scope()))) {
541
LOG_WARN("set scope fail", K(iter->second->scope()), K(ret));
542
} else if (OB_FAIL(config_info.set_source(iter->second->source()))) {
543
LOG_WARN("set source fail", K(iter->second->source()), K(ret));
544
} else if (OB_FAIL(config_info.set_edit_level(iter->second->edit_level()))) {
545
LOG_WARN("set edit_level fail", K(iter->second->edit_level()), K(ret));
546
} else if (OB_FAIL(all_config.push_back(config_info))) {
547
LOG_WARN("push to array fail", K(config_info), K(ret));
554
int ObTenantConfigMgr::got_versions(const common::ObIArray<std::pair<uint64_t, int64_t>> &versions)
556
int ret = OB_SUCCESS;
557
DRWLock::RDLockGuard guard(rwlock_);
558
for (int i = 0; i < versions.count(); ++i) {
559
uint64_t tenant_id = versions.at(i).first;
560
int64_t version = versions.at(i).second;
561
if (OB_FAIL(got_version(tenant_id, version))) {
562
LOG_WARN("fail got version", K(tenant_id), K(version), K(ret));
568
int ObTenantConfigMgr::got_version(uint64_t tenant_id, int64_t version, const bool remove_repeat/* = true */)
570
int ret = OB_SUCCESS;
571
DRWLock::RDLockGuard guard(rwlock_);
572
ObTenantConfig *config = nullptr;
573
if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {
574
LOG_WARN("No tenant config found", K(tenant_id), K(ret));
576
ret = config->got_version(version, remove_repeat);
581
int ObTenantConfigMgr::update_local(uint64_t tenant_id, int64_t expected_version)
583
int ret = OB_SUCCESS;
584
if (OB_ISNULL(sql_proxy_)) {
586
LOG_WARN("sql proxy is null", K(ret));
588
uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
589
ObTenantConfig *config = nullptr;
590
ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_);
592
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
593
if (OB_FAIL(sql.assign_fmt(
594
"select config_version, zone, svr_type, svr_ip, svr_port, name, "
595
"data_type, value, info, section, scope, source, edit_level "
596
"from %s where tenant_id = '%lu'", OB_TENANT_PARAMETER_TNAME, tenant_id))) {
597
} else if (OB_FAIL(sql_client_retry_weak.read(result, exec_tenant_id, sql.ptr()))) {
598
LOG_WARN("read config from __tenant_parameter failed",
599
KR(ret), K(tenant_id), K(exec_tenant_id), K(sql));
601
DRWLock::WRLockGuard guard(rwlock_);
602
ret = config_map_.get_refactored(ObTenantID(tenant_id), config);
604
LOG_ERROR("failed to get tenant config", K(tenant_id), K(ret));
606
ret = config->update_local(expected_version, result);
614
void ObTenantConfigMgr::notify_tenant_config_changed(uint64_t tenant_id)
616
update_tenant_config_cb_(tenant_id);
619
int ObTenantConfigMgr::add_config_to_existing_tenant(const char *config_str)
621
int ret = OB_SUCCESS;
622
DRWLock::WRLockGuard guard(rwlock_);
623
if (!config_map_.empty() && nullptr != config_str) {
624
TenantConfigMap::const_iterator it = config_map_.begin();
625
for (; it != config_map_.end() && OB_SUCC(ret); ++it) {
626
if (OB_NOT_NULL(it->second)) {
627
int64_t version = ObTimeUtility::current_time();
628
if (OB_FAIL(it->second->add_extra_config(config_str, version))) {
629
LOG_WARN("add tenant extra config failed", "tenant_id", it->second->get_tenant_id(),
630
"config_str", config_str, KR(ret));
638
int ObTenantConfigMgr::add_extra_config(const obrpc::ObTenantConfigArg &arg)
640
int ret = OB_SUCCESS;
641
ObTenantConfig *config = nullptr;
642
if (!arg.is_valid()) {
643
ret = OB_INVALID_ARGUMENT;
644
LOG_ERROR("invalid arg", K(ret), K(arg));
646
DRWLock::WRLockGuard guard(rwlock_);
647
if (OB_FAIL(config_map_.get_refactored(ObTenantID(arg.tenant_id_), config))) {
648
LOG_ERROR("failed to get tenant config", K(arg.tenant_id_), K(ret));
650
ret = config->add_extra_config(arg.config_str_.ptr());
653
FLOG_INFO("add tenant extra config", K(arg));
657
int ObTenantConfigMgr::schedule(ObTenantConfig::TenantConfigUpdateTask &task, const int64_t delay)
659
int ret = OB_SUCCESS;
661
if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, task, delay, repeat))) {
662
ret = OB_ERR_UNEXPECTED;
663
LOG_WARN("schedule task failed", K(ret));
668
int ObTenantConfigMgr::cancel(const ObTenantConfig::TenantConfigUpdateTask &task)
670
int ret = OB_SUCCESS;
672
if (OB_FAIL(TG_CANCEL_R(lib::TGDefIDs::CONFIG_MGR, task))) {
673
LOG_WARN("cancel tenant config update task failed", K(ret));
678
int ObTenantConfigMgr::wait(const ObTenantConfig::TenantConfigUpdateTask &task)
681
const int try_times = 300;
682
const int64_t period = 10000;
684
for (int i = 0; i < try_times; ++i) {
685
if (ATOMIC_LOAD(&task.running_task_count_) > 0) {
686
// wait running task finish
693
if (OB_EAGAIN == ret) {
694
LOG_WARN("wait running update task failed, try later", K(ret), K(ATOMIC_LOAD(&task.running_task_count_)));
699
OB_DEF_SERIALIZE(ObTenantConfigMgr)
701
int ret = OB_SUCCESS;
702
int64_t expect_data_len = get_serialize_size_();
703
int64_t saved_pos = pos;
704
TenantConfigMap::const_iterator it = config_map_.begin();
705
for (; OB_SUCC(ret) && it != config_map_.end(); ++it) {
706
if (OB_ISNULL(it->second)) {
707
ret = OB_ERR_UNEXPECTED;
709
ret = it->second->serialize(buf, buf_len, pos);
713
int64_t writen_len = pos - saved_pos;
714
if (writen_len != expect_data_len) {
715
ret = OB_ERR_UNEXPECTED;
716
LOG_WARN("unexpected data size", K(writen_len), K(expect_data_len));
722
OB_DEF_DESERIALIZE(ObTenantConfigMgr)
724
int ret = OB_SUCCESS;
725
if (data_len == 0 || pos >= data_len) {
727
while(OB_SUCC(ret) && pos < data_len) {
728
int64_t saved_pos = pos;
729
int64_t ignore_version = 0, ignore_len = 0;
730
OB_UNIS_DECODE(ignore_version);
731
OB_UNIS_DECODE(ignore_len);
733
} else if ('[' != *(buf + pos)) {
734
ret = OB_INVALID_DATA;
735
LOG_ERROR("invalid tenant config", K(ret));
737
int64_t cur = pos + 1;
738
while (cur < data_len - 1 && ']' != *(buf + cur)) {
741
if (cur >= data_len - 1 || '\n' != *(buf + cur + 1)) {
742
ret = OB_INVALID_DATA;
743
LOG_ERROR("invalid tenant config", K(ret));
745
uint64_t tenant_id = OB_INVALID_TENANT_ID;
746
char tenant_str[100];
747
char *p_end = nullptr;
748
MEMSET(tenant_str, '\0', 100);
749
if (cur - pos - 1 < 100) {
750
MEMCPY(tenant_str, buf + pos + 1, cur - pos - 1);
751
tenant_id = strtoul(tenant_str, &p_end, 0);
753
ret = OB_INVALID_DATA;
754
LOG_ERROR("invalid tenant id", K(ret));
757
if ('\0' != *p_end) {
758
ret = OB_INVALID_CONFIG;
759
LOG_ERROR("invalid tenant id", K(ret));
762
ObTenantConfig *config = nullptr;
763
if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {
764
if (ret != OB_HASH_NOT_EXIST || OB_FAIL(add_tenant_config(tenant_id))) {
765
LOG_ERROR("get tenant config failed", K(tenant_id), K(ret));
768
ret = config_map_.get_refactored(ObTenantID(tenant_id), config);
772
ret = config->deserialize(buf, data_len, pos);
782
OB_DEF_SERIALIZE_SIZE(ObTenantConfigMgr)
785
TenantConfigMap::const_iterator it = config_map_.begin();
786
for (; it != config_map_.end(); ++it) {
787
if (OB_NOT_NULL(it->second)) {
788
len += it->second->get_serialize_size();