13
#define USING_LOG_PREFIX SERVER_OMT
15
#include "ob_tenant_config.h"
16
#include "common/ob_common_utility.h"
17
#include "lib/net/ob_net_util.h"
18
#include "lib/oblog/ob_log.h"
19
#include "share/config/ob_server_config.h"
20
#include "share/schema/ob_schema_getter_guard.h"
21
#include "share/schema/ob_multi_version_schema_service.h"
22
#include "observer/ob_server_struct.h"
23
#include "observer/omt/ob_tenant_config.h"
24
#include "observer/omt/ob_tenant_config_mgr.h"
25
#include "sql/monitor/flt/ob_flt_control_info_mgr.h"
26
#include "share/errsim_module/ob_errsim_module_interface_imp.h"
28
using namespace oceanbase::common;
33
ObTenantConfig::ObTenantConfig() : ObTenantConfig(OB_INVALID_TENANT_ID)
37
ObTenantConfig::ObTenantConfig(uint64_t tenant_id)
38
: tenant_id_(tenant_id), current_version_(INITIAL_TENANT_CONF_VERSION),
40
update_task_(), system_config_(), config_mgr_(nullptr),
41
ref_(0L), is_deleting_(false), create_timestamp_(0L)
45
int ObTenantConfig::init(ObTenantConfigMgr *config_mgr)
48
config_mgr_ = config_mgr;
49
create_timestamp_ = ObTimeUtility::current_time();
50
if (OB_FAIL(system_config_.init())) {
51
LOG_ERROR("init system config failed", K(ret));
52
} else if (OB_FAIL(update_task_.init(config_mgr, this))) {
53
LOG_ERROR("init tenant config updata task failed", K_(tenant_id), K(ret));
58
void ObTenantConfig::print() const
60
OB_LOG(INFO, "===================== * begin tenant config report * =====================", K(tenant_id_));
61
ObConfigContainer::const_iterator it = container_.begin();
62
for (; it != container_.end(); ++it) {
63
if (OB_ISNULL(it->second)) {
64
OB_LOG_RET(WARN, OB_ERR_UNEXPECTED, "config item is null", "name", it->first.str());
66
_OB_LOG(INFO, "| %-36s = %s", it->first.str(), it->second->str());
69
OB_LOG(INFO, "===================== * stop tenant config report * =======================", K(tenant_id_));
72
int ObTenantConfig::read_config()
75
ObSystemConfigKey key;
77
char local_ip[OB_MAX_SERVER_ADDR_SIZE] = "";
78
server = GCTX.self_addr();
79
if (OB_UNLIKELY(true != server.ip_to_string(local_ip, sizeof(local_ip)))) {
80
ret = OB_CONVERT_ERROR;
82
key.set_varchar(ObString::make_string("svr_type"), print_server_role(get_server_type()));
83
key.set_int(ObString::make_string("svr_port"), GCONF.rpc_port);
84
key.set_varchar(ObString::make_string("svr_ip"), local_ip);
85
key.set_varchar(ObString::make_string("zone"), GCONF.zone);
86
ObConfigContainer::const_iterator it = container_.begin();
87
for (; OB_SUCC(ret) && it != container_.end(); ++it) {
88
key.set_name(it->first.str());
89
if (OB_ISNULL(it->second)) {
90
ret = OB_ERR_UNEXPECTED;
91
OB_LOG(ERROR, "config item is null", "name", it->first.str(), K(ret));
93
key.set_version(it->second->version());
94
int temp_ret = system_config_.read_config(get_tenant_id(), key, *(it->second));
95
if (OB_SUCCESS != temp_ret) {
96
OB_LOG(DEBUG, "Read config error", "name", it->first.str(), K(temp_ret));
104
void ObTenantConfig::TenantConfigUpdateTask::runTimerTask()
106
int ret = OB_SUCCESS;
107
if (OB_ISNULL(config_mgr_)) {
109
LOG_WARN("invalid argument", K_(config_mgr), K(ret));
110
} else if (OB_ISNULL(tenant_config_)){
112
LOG_WARN("invalid argument", K_(tenant_config), K(ret));
114
const int64_t saved_current_version = tenant_config_->current_version_;
115
const int64_t version = version_;
116
THIS_WORKER.set_timeout_ts(INT64_MAX);
117
if (tenant_config_->current_version_ >= version) {
118
ret = OB_ALREADY_DONE;
119
} else if (update_local_) {
120
tenant_config_->current_version_ = version;
121
if (OB_FAIL(tenant_config_->system_config_.clear())) {
122
LOG_WARN("Clear system config map failed", K(ret));
123
} else if (OB_FAIL(config_mgr_->update_local(tenant_config_->tenant_id_, version))) {
124
LOG_WARN("ObTenantConfigMgr update_local failed", K(ret), K(tenant_config_));
126
config_mgr_->notify_tenant_config_changed(tenant_config_->tenant_id_);
129
sql::ObFLTControlInfoManager mgr(tenant_config_->tenant_id_);
132
} else if (OB_FAIL(mgr.init())) {
133
LOG_WARN("fail to init", KR(ret));
134
} else if (OB_FAIL(mgr.apply_control_info())) {
135
LOG_WARN("fail to apply control info", KR(ret));
137
LOG_TRACE("apply control info", K(tenant_config_->tenant_id_));
141
int tmp_ret = OB_SUCCESS;
142
uint64_t tenant_id = tenant_config_->tenant_id_;
143
tenant_config_->current_version_ = saved_current_version;
144
share::schema::ObSchemaGetterGuard schema_guard;
145
share::schema::ObMultiVersionSchemaService *schema_service = GCTX.schema_service_;
146
bool tenant_dropped = false;
148
if (OB_ISNULL(schema_service)) {
149
tmp_ret = OB_ERR_UNEXPECTED;
150
LOG_WARN("schema_service is null", K(ret), K(tmp_ret));
151
} else if (OB_SUCCESS != (tmp_ret = schema_service->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
152
LOG_WARN("fail to get schema guard", K(ret), K(tmp_ret), K(tenant_id));
153
} else if (OB_SUCCESS != (tmp_ret = schema_guard.check_if_tenant_has_been_dropped(tenant_id, tenant_dropped))) {
154
LOG_WARN("fail to check if tenant has been dropped", K(ret), K(tmp_ret), K(tenant_id));
156
if (tenant_dropped) {
157
LOG_INFO("tenant has dropped", K(tenant_id));
158
} else if ((ATOMIC_FAA(&running_task_count_, 1) < 2)) {
159
if (OB_FAIL(config_mgr_->schedule(*this, 0))) {
160
LOG_WARN("schedule task failed", K(tenant_id), K(ret));
161
ATOMIC_DEC(&running_task_count_);
162
} else if (tenant_config_->is_deleting_) {
163
LOG_INFO("tenant under deleting, cancel task", K(tenant_id));
164
if (OB_FAIL(config_mgr_->cancel(*this))) {
165
LOG_WARN("cancel task failed", K(tenant_id), K(ret));
167
ATOMIC_DEC(&running_task_count_);
170
LOG_INFO("Schedule a retry task!", K(tenant_id));
173
ATOMIC_DEC(&running_task_count_);
174
LOG_INFO("already 2 running task, temporory no need more", K(tenant_id));
178
const int64_t read_version = tenant_config_->system_config_.get_version();
179
LOG_INFO("loaded new tenant config",
180
"tenant_id", tenant_config_->tenant_id_,
181
"read_version", read_version,
182
"old_version", saved_current_version,
183
"current_version", tenant_config_->current_version_,
184
"expected_version", version);
185
tenant_config_->print();
190
ATOMIC_DEC(&running_task_count_);
197
int ObTenantConfig::got_version(int64_t version, const bool remove_repeat)
199
UNUSED(remove_repeat);
200
int ret = OB_SUCCESS;
201
bool schedule_task = false;
203
update_task_.update_local_ = false;
204
schedule_task = true;
205
} else if (0 == version) {
206
LOG_DEBUG("root server restarting");
207
} else if (current_version_ == version) {
208
} else if (version < current_version_) {
209
LOG_WARN("Local tenant config is newer than rs, weird", K_(current_version), K(version));
210
} else if (version > current_version_) {
211
if (!mutex_.trylock()) {
212
LOG_DEBUG("Processed by other thread!");
214
LOG_INFO("Got new tenant config version", K_(tenant_id),
215
K_(current_version), K(version));
216
update_task_.update_local_ = true;
217
update_task_.version_ = version;
218
update_task_.scheduled_time_ = ObTimeUtility::current_monotonic_raw_time();
219
schedule_task = true;
224
bool schedule = true;
225
if (!config_mgr_->inited()) {
228
LOG_WARN("Couldn't update config because timer is NULL", K(ret));
230
if (schedule && !is_deleting_) {
262
if ((ATOMIC_FAA(&update_task_.running_task_count_, 1) < 2)) {
263
if (OB_FAIL(config_mgr_->schedule(update_task_, 0))) {
264
LOG_WARN("schedule task failed", K_(tenant_id), K(ret));
266
ATOMIC_DEC(&update_task_.running_task_count_);
268
LOG_INFO("Schedule update tenant config task successfully!", K_(tenant_id));
271
ATOMIC_DEC(&update_task_.running_task_count_);
272
LOG_INFO("already 2 running task, temporory no need more", K_(tenant_id));
279
int ObTenantConfig::update_local(int64_t expected_version, ObMySQLProxy::MySQLResult &result,
282
int ret = OB_SUCCESS;
283
if (OB_FAIL(system_config_.update(result))) {
284
LOG_WARN("failed to load system config", K(ret));
285
} else if (expected_version != ObSystemConfig::INIT_VERSION && (system_config_.get_version() < current_version_
286
|| system_config_.get_version() < expected_version)) {
288
LOG_WARN("__tenant_parameter is older than the expected version", K(ret),
289
"read_version", system_config_.get_version(),
290
"current_version", current_version_,
291
"expected_version", expected_version);
293
LOG_INFO("read tenant config from __tenant_parameter succeed", K_(tenant_id));
297
if (OB_FAIL(read_config())) {
298
LOG_ERROR("Read tenant config failed", K_(tenant_id), K(ret));
299
} else if (save2file && OB_FAIL(config_mgr_->dump2file())) {
300
LOG_WARN("Dump to file failed", K(ret));
301
} else if (OB_FAIL(publish_special_config_after_dump())) {
302
LOG_WARN("publish special config after dump failed", K(tenant_id_), K(ret));
305
else if (OB_FAIL(build_errsim_module_())) {
306
LOG_WARN("failed to build errsim module", K(ret), K(tenant_id_));
311
LOG_WARN("Read tenant config from inner table error", K_(tenant_id), K(ret));
316
int ObTenantConfig::publish_special_config_after_dump()
318
int ret = OB_SUCCESS;
319
ObConfigItem *const *pp_item = NULL;
320
if (OB_ISNULL(pp_item = container_.get(ObConfigStringKey(COMPATIBLE)))) {
321
ret = OB_INVALID_CONFIG;
322
LOG_WARN("Invalid config string", K(tenant_id_), K(ret));
323
} else if (!(*pp_item)->dump_value_updated()) {
324
LOG_INFO("config dump value is not set, no need read", K(tenant_id_), K((*pp_item)->spfile_str()));
325
} else if (!(*pp_item)->set_value((*pp_item)->spfile_str())) {
326
ret = OB_INVALID_CONFIG;
327
LOG_WARN("Invalid config value", K(tenant_id_), K((*pp_item)->spfile_str()), K(ret));
329
FLOG_INFO("[COMPATIBLE] read data_version after dump",
330
KR(ret), K_(tenant_id),
331
"version", (*pp_item)->version(),
332
"value", (*pp_item)->str(),
333
"value_updated", (*pp_item)->value_updated(),
334
"dump_version", (*pp_item)->dumped_version(),
335
"dump_value", (*pp_item)->spfile_str(),
336
"dump_value_updated", (*pp_item)->dump_value_updated());
341
int ObTenantConfig::add_extra_config(const char *config_str,
345
int ret = OB_SUCCESS;
346
const int64_t MAX_OPTS_LENGTH = sysconf(_SC_ARG_MAX);
347
int64_t config_str_length = 0;
349
char *saveptr = NULL;
351
if (OB_ISNULL(config_str)) {
352
ret = OB_ERR_UNEXPECTED;
353
LOG_WARN("config str is null", K(ret));
354
} else if ((config_str_length = static_cast<int64_t>(STRLEN(config_str))) >= MAX_OPTS_LENGTH) {
355
ret = OB_BUF_NOT_ENOUGH;
356
LOG_ERROR("Extra config is too long", K(ret));
357
} else if (OB_ISNULL(buf = new (std::nothrow) char[config_str_length + 1])) {
358
ret = OB_ALLOCATE_MEMORY_FAILED;
359
LOG_ERROR("ob tc malloc memory for buf fail", K(ret));
361
MEMCPY(buf, config_str, config_str_length);
362
buf[config_str_length] = '\0';
363
token = STRTOK_R(buf, ",\n", &saveptr);
364
const ObString compatible_cfg(COMPATIBLE);
365
while (OB_SUCC(ret) && OB_LIKELY(NULL != token)) {
366
char *saveptr_one = NULL;
368
const char *value = NULL;
369
ObConfigItem *const *pp_item = NULL;
370
if (OB_ISNULL(name = STRTOK_R(token, "=", &saveptr_one))) {
371
ret = OB_INVALID_CONFIG;
372
LOG_ERROR("Invalid config string", K(token), K(ret));
373
} else if (OB_ISNULL(saveptr_one) || OB_UNLIKELY('\0' == *(value = saveptr_one))) {
374
LOG_INFO("Empty config string", K(token), K(name));
379
if (OB_ISNULL(name)) {
382
char curr_tid_str[32] = {'\0'};
383
snprintf(curr_tid_str, sizeof(curr_tid_str), "%lu", tenant_id_);
384
char *tid_in_arg = NULL;
385
name = STRTOK_R(name, "@", &tid_in_arg);
386
if (OB_ISNULL(name) || OB_UNLIKELY('\0' == *name)) {
388
} else if (OB_NOT_NULL(tid_in_arg) && ('\0' != *tid_in_arg) &&
389
0 != strcmp(tid_in_arg, curr_tid_str)) {
392
const int value_len = strlen(value);
395
const int external_info_val_len = value_len / 2 + 1 + 1;
396
char *external_info_val = (char*)ob_malloc(external_info_val_len, "temp");
397
DEFER(if (external_info_val != nullptr) ob_free(external_info_val););
398
if (OB_ISNULL(external_info_val)) {
399
ret = OB_ALLOCATE_MEMORY_FAILED;
400
LOG_WARN("failed to alloc", K(ret));
401
} else if (FALSE_IT(external_info_val[0] = '\0')) {
402
} else if (OB_ISNULL(pp_item = container_.get(ObConfigStringKey(name)))) {
404
LOG_WARN("Invalid config string, no such config item", K(name), K(value), K(ret));
406
if (OB_FAIL(ret) || OB_ISNULL(pp_item)) {
407
} else if (compatible_cfg.case_compare(name) == 0) {
408
if (!(*pp_item)->set_dump_value(value)) {
409
ret = OB_INVALID_CONFIG;
410
LOG_WARN("Invalid config value", K(name), K(value), K(ret));
412
(*pp_item)->set_dump_value_updated();
413
(*pp_item)->set_version(version);
414
FLOG_INFO("[COMPATIBLE] init data_version before dump",
415
KR(ret), K_(tenant_id),
416
"version", (*pp_item)->version(),
417
"value", (*pp_item)->str(),
418
"value_updated", (*pp_item)->value_updated(),
419
"dump_version", (*pp_item)->dumped_version(),
420
"dump_value", (*pp_item)->spfile_str(),
421
"dump_value_updated", (*pp_item)->dump_value_updated());
423
} else if (!(*pp_item)->set_value(value)) {
424
ret = OB_INVALID_CONFIG;
425
LOG_WARN("Invalid config value", K(name), K(value), K(ret));
426
} else if (check_config && (!(*pp_item)->check_unit(value) || !(*pp_item)->check())) {
427
ret = OB_INVALID_CONFIG;
428
const char* range = (*pp_item)->range();
429
if (OB_ISNULL(range) || strlen(range) == 0) {
430
LOG_ERROR("Invalid config, value out of range", K(name), K(value), K(ret));
432
_LOG_ERROR("Invalid config, value out of %s (for reference only). name=%s, value=%s, ret=%d", range, name, value, ret);
435
(*pp_item)->set_version(version);
436
LOG_INFO("Load tenant config succ", K(name), K(value));
440
token = STRTOK_R(NULL, ",\n", &saveptr);
451
OB_DEF_SERIALIZE(ObTenantConfig)
453
int ret = OB_SUCCESS;
454
int64_t expect_data_len = get_serialize_size_();
455
int64_t saved_pos = pos;
456
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "[%lu]\n", tenant_id_))) {
458
ret = ObCommonConfig::serialize(buf, buf_len, pos);
461
int64_t writen_len = pos - saved_pos;
462
if (writen_len != expect_data_len) {
463
ret = OB_ERR_UNEXPECTED;
464
LOG_WARN("unexpected data size", K(writen_len), K(expect_data_len));
470
OB_DEF_DESERIALIZE(ObTenantConfig)
472
int ret = OB_SUCCESS;
473
if ('[' != *(buf + pos)) {
474
ret = OB_INVALID_DATA;
475
LOG_ERROR("invalid tenant config", K(ret));
477
int64_t cur = pos + 1;
478
while (cur < data_len - 1 && ']' != *(buf + cur)) {
481
if (cur >= data_len - 1 || '\n' != *(buf + cur + 1)) {
482
ret = OB_INVALID_DATA;
483
LOG_ERROR("invalid tenant config", K(ret));
485
uint64_t tenant_id = OB_INVALID_TENANT_ID;
486
char tenant_str[100];
487
char *p_end = nullptr;
488
MEMSET(tenant_str, '\0', 100);
489
if (cur - pos - 1 < 100) {
490
MEMCPY(tenant_str, buf + pos + 1, cur - pos - 1);
491
tenant_id = strtoul(tenant_str, &p_end, 0);
493
if ('\0' != *p_end) {
494
ret = OB_INVALID_CONFIG;
495
LOG_ERROR("invalid tenant id", K(ret));
496
} else if (tenant_id != tenant_id_) {
497
LOG_ERROR("wrong tenant id", K(ret));
499
ret = ObCommonConfig::deserialize(buf, data_len, pos);
502
ret = OB_INVALID_DATA;
503
LOG_ERROR("invalid tenant id", K(ret));
510
OB_DEF_SERIALIZE_SIZE(ObTenantConfig)
512
int64_t len = 0, tmp_pos = 0;
513
int ret = OB_SUCCESS;
514
char tenant_str[100] = {'\0'};
515
if (OB_FAIL(databuff_printf(tenant_str, 100, tmp_pos, "[%lu]\n", tenant_id_))) {
516
LOG_WARN("write data buff failed", K(ret));
520
len += ObCommonConfig::get_serialize_size();
525
int ObTenantConfig::build_errsim_module_()
527
int ret = OB_SUCCESS;
528
char buf[ObErrsimModuleTypeHelper::MAX_TYPE_NAME_LENGTH] = "";
529
ObTenantErrsimModuleMgr::ModuleArray module_array;
530
ObTenantErrsimModuleMgr::ErrsimModuleString string;
532
for (int64_t i = 0; OB_SUCC(ret) && i < this->errsim_module_types.size(); ++i) {
533
if (OB_FAIL(this->errsim_module_types.get(
534
static_cast<int>(i), buf, sizeof(buf)))) {
535
LOG_WARN("get rs failed", K(ret), K(i));
536
} else if (OB_FAIL(string.assign(buf))) {
537
LOG_WARN("failed to assign buffer", K(ret));
538
} else if (OB_FAIL(module_array.push_back(string))) {
539
LOG_WARN("failed to push string into array", K(ret), K(string));
544
const int64_t percentage = this->errsim_module_error_percentage;
546
if (OB_FAIL(build_tenant_errsim_moulde(tenant_id_, current_version_, module_array, percentage))) {
547
LOG_WARN("failed to build tenant module", K(ret), K(tenant_id_));