oceanbase

Форк
0
/
ob_tenant_config_mgr.cpp 
795 строк · 27.5 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER_OMT
14
#include "ob_tenant_config_mgr.h"
15
#include "lib/thread/thread_mgr.h"
16
#include "observer/ob_sql_client_decorator.h"
17
#include "observer/ob_server_struct.h"
18
#include "share/config/ob_common_config.h"
19
#include "ob_multi_tenant.h"
20
#include "ob_tenant.h"
21
#include "share/ob_rpc_struct.h"
22
#include "share/inner_table/ob_inner_table_schema_constants.h"
23
#include "share/schema/ob_multi_version_schema_service.h"
24

25
using namespace oceanbase::common;
26
using namespace oceanbase::share;
27

28
namespace oceanbase {
29
namespace omt {
30

31
ObTenantConfigGuard::ObTenantConfigGuard() : ObTenantConfigGuard(nullptr)
32
{
33
}
34

35
ObTenantConfigGuard::ObTenantConfigGuard(ObTenantConfig *config)
36
{
37
  config_ = config;
38
}
39

40
ObTenantConfigGuard::~ObTenantConfigGuard()
41
{
42
  if (OB_NOT_NULL(config_)) {
43
    config_->unref();
44
  }
45
}
46

47
void ObTenantConfigGuard::set_config(ObTenantConfig *config)
48
{
49
  if (OB_NOT_NULL(config_)) {
50
    config_->unref();
51
  }
52
  config_ = config;
53
}
54

55
int TenantConfigInfo::assign(const TenantConfigInfo &rhs)
56
{
57
  int ret = OB_SUCCESS;
58
  tenant_id_ = rhs.tenant_id_;
59
  if (OB_FAIL(name_.assign(rhs.name_))) {
60
    LOG_WARN("assign name fail", K_(name), K(ret));
61
  } else if (OB_FAIL(data_type_.assign(rhs.data_type_))) {
62
    LOG_WARN("assign data_type fail", K_(data_type), K(ret));
63
  } else if (OB_FAIL(value_.assign(rhs.value_))) {
64
    LOG_WARN("assign value fail", K_(value), K(ret));
65
  } else if (OB_FAIL(info_.assign(rhs.info_))) {
66
    LOG_WARN("assign info fail", K_(info), K(ret));
67
  } else if (OB_FAIL(section_.assign(rhs.section_))) {
68
    LOG_WARN("assign section fail", K_(section), K(ret));
69
  } else if (OB_FAIL(scope_.assign(rhs.scope_))) {
70
    LOG_WARN("assign scope fail", K_(scope), K(ret));
71
  } else if (OB_FAIL(source_.assign(rhs.source_))) {
72
    LOG_WARN("assign source fail", K_(source), K(ret));
73
  } else if (OB_FAIL(edit_level_.assign(rhs.edit_level_))) {
74
    LOG_WARN("assign edit_level fail", K_(edit_level), K(ret));
75
  }
76
  return ret;
77
}
78

79
int64_t TenantConfigInfo::to_string(char *buf, const int64_t buf_len) const
80
{
81
  int64_t pos = 0;
82
  int ret = OB_SUCCESS;
83
  if (OB_NOT_NULL(buf) && buf_len > 0) {
84
    if (OB_FAIL(databuff_printf(buf, buf_len, pos, "tenant_id: [%lu], name: [%s],"
85
                    "value: [%s], info: [%s]",
86
                     tenant_id_, name_.ptr(), value_.ptr(), info_.ptr()))) {
87
      pos = 0;
88
      LOG_WARN("to_string buff fail", K(ret));
89
    }
90
  }
91
  return pos;
92
}
93

94
ObTenantConfigMgr::ObTenantConfigMgr()
95
    : rwlock_(ObLatchIds::CONFIG_LOCK), inited_(false), self_(), sql_proxy_(nullptr),
96
      config_map_(), config_version_map_(), sys_config_mgr_(nullptr),
97
      version_has_refreshed_(false), update_tenant_config_cb_()
98
{
99
}
100

101
ObTenantConfigMgr::~ObTenantConfigMgr()
102
{
103
}
104

105
ObTenantConfigMgr &ObTenantConfigMgr::get_instance()
106
{
107
  static ObTenantConfigMgr ob_tenant_config_mgr;
108
  return ob_tenant_config_mgr;
109
}
110

111
int ObTenantConfigMgr::init(ObMySQLProxy &sql_proxy,
112
                            const ObAddr &server,
113
                            ObConfigManager *config_mgr,
114
                            const UpdateTenantConfigCb &update_tenant_config_cb)
115
{
116
  int ret = OB_SUCCESS;
117
  sql_proxy_ = &sql_proxy;
118
  self_ = server;
119
  sys_config_mgr_ = config_mgr;
120
  update_tenant_config_cb_ = update_tenant_config_cb;
121
  ret = config_version_map_.create(oceanbase::common::OB_MAX_SERVER_TENANT_CNT,
122
                oceanbase::common::ObModIds::OB_HASH_BUCKET_CONF_CONTAINER,
123
                oceanbase::common::ObModIds::OB_HASH_NODE_CONF_CONTAINER);
124
  inited_ = true;
125
  return ret;
126
}
127

128
void ObTenantConfigMgr::refresh_config_version_map(const ObIArray<uint64_t> &tenants)
129
{
130
  int ret = OB_SUCCESS;
131
  if (true == version_has_refreshed_) {
132
    // no need to refresh again
133
  } else if (OB_ISNULL(sql_proxy_)) {
134
    ret = OB_NOT_INIT;
135
    LOG_WARN("sql proxy is null", K(ret));
136
  } else {
137
    ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_);
138
    SMART_VAR(ObMySQLProxy::MySQLResult, result) {
139
      bool has_sql_error = false;
140
      for (int i = 0; OB_SUCC(ret) && i < tenants.count(); ++i) {
141
        int64_t version = 0;
142
        uint64_t tenant_id = tenants.at(i);
143
        uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
144
        ObSqlString sql;
145
        if (OB_FAIL(sql.assign_fmt("select max(config_version) from %s where tenant_id = '%lu'",
146
                                   OB_TENANT_PARAMETER_TNAME, tenant_id))) {
147
          LOG_WARN("fail to generate sql", KR(ret), K(tenant_id));
148
        } else if (OB_FAIL(sql_client_retry_weak.read(result, exec_tenant_id, sql.ptr()))) {
149
          has_sql_error = true;
150
          ret = OB_SUCCESS;
151
          LOG_INFO("read config from __tenant_parameter failed",
152
                   KR(ret), K(tenant_id), K(exec_tenant_id), K(sql));
153
        } else if (NULL == result.get_result()) {
154
          ret = OB_ERR_UNEXPECTED;
155
          LOG_WARN("config result is null", K(tenant_id), K(ret));
156
        } else if (OB_FAIL(result.get_result()->next())) {
157
          LOG_WARN("get result next failed", K(tenant_id), K(ret));
158
        } else if (OB_FAIL(result.get_result()->get_int(0L, version))) {
159
          if (OB_ERR_NULL_VALUE != ret) {
160
            LOG_WARN("get config_version failed", K(tenant_id), K(ret));
161
          } else {
162
            LOG_INFO("tenant has no config", K(tenant_id));
163
            ret = OB_SUCCESS;
164
          }
165
        } else if (OB_FAIL(set_tenant_config_version(tenant_id, version))) {
166
          LOG_WARN("set config version failed", K(tenant_id), K(ret));
167
        } else {
168
          LOG_INFO("update tenant config version success", K(tenant_id), K(version));
169
        }
170
      }
171
      if ((OB_SUCCESS == ret) && (false == has_sql_error)) {
172
        version_has_refreshed_ = true;
173
        LOG_INFO("update all tenant config success");
174
      }
175
    }
176
  }
177
}
178

179
// 背景:每个 server 上需要保存所有租户的 config 信息
180
// 当新建租户/删除租户时需要对应维护 config 状态。
181
//
182
// IN: 当前活跃租户
183
// ACTION: 根据 tenants 信息,决定要添加/删除哪些租户配置项
184
int ObTenantConfigMgr::refresh_tenants(const ObIArray<uint64_t> &tenants)
185
{
186
  int ret = OB_SUCCESS;
187
  ObSEArray<uint64_t, 1> new_tenants;
188
  ObSEArray<uint64_t, 1> del_tenants;
189

190
  if (tenants.count() > 0) {
191
    // 探测要加的 config
192
    {
193
      DRWLock::RDLockGuard guard(rwlock_);
194
      for (int i = 0; i < tenants.count(); ++i) {
195
        uint64_t tenant_id = tenants.at(i);
196
        ObTenantConfig *const *config = nullptr;
197
        if (NULL == (config = config_map_.get(ObTenantID(tenant_id)))) {
198
          // 如果 tenant 不存在,则添加到 config_map 中
199
          if (OB_FAIL(new_tenants.push_back(tenant_id))) {
200
            LOG_WARN("fail add tenant config", K(tenant_id), K(ret));
201
          }
202
        }
203
      }
204
    }
205
    // 探测要删的 config
206
    {
207
      DRWLock::RDLockGuard guard(rwlock_);
208
      TenantConfigMap::const_iterator it = config_map_.begin();
209
      for (; it != config_map_.end(); ++it) {
210
        uint64_t tenant_id = it->first.tenant_id_;
211
        if (OB_SYS_TENANT_ID == tenant_id) {
212
          continue;
213
        }
214
        bool need_del = true;
215
        for (int i = 0; i < tenants.count(); ++i) {
216
          if (tenant_id == tenants.at(i) || GCTX.omt_->has_tenant(tenant_id)) {
217
            need_del = false;
218
            break;
219
          }
220
        }
221
        if (need_del) {
222
          if (OB_FAIL(del_tenants.push_back(tenant_id))) {
223
            LOG_WARN("fail add tenant config", K(tenant_id), K(ret));
224
          }
225
        }
226
      }
227
    }
228
  }
229

230
  // 加 config
231
  for (int i = 0; i < new_tenants.count(); ++i) {
232
    if (OB_FAIL(add_tenant_config(new_tenants.at(i)))) {
233
      LOG_WARN("fail add tenant config", K(i), K(new_tenants.at(i)), K(ret));
234
    } else {
235
      LOG_INFO("add created tenant config succ", K(i), K(new_tenants.at(i)));
236
    }
237
  }
238
  // 删 config
239
  for (int i = 0; i < del_tenants.count(); ++i) {
240
    if (OB_FAIL(del_tenant_config(del_tenants.at(i)))) {
241
      LOG_WARN("fail del tenant config, will try later", K(i), K(del_tenants.at(i)), K(ret));
242
    } else {
243
      LOG_INFO("del dropped tenant config succ.", K(i), K(del_tenants.at(i)));
244
    }
245
  }
246

247
  refresh_config_version_map(tenants);
248

249
  return ret;
250
}
251

252
// This function will be called in the early stage in bootstrap/create tenant.
253
// Meanwhile, related tenant's tables are not readable, so it's safe to call add_extra_config().
254
int ObTenantConfigMgr::init_tenant_config(const obrpc::ObTenantConfigArg &arg)
255
{
256
  int ret = OB_SUCCESS;
257
  if (OB_FAIL(add_tenant_config(arg.tenant_id_))) {
258
    LOG_WARN("fail to add tenant config", KR(ret), K(arg));
259
  } else if (OB_FAIL(add_extra_config(arg))) {
260
    LOG_WARN("fail to add extra config", KR(ret), K(arg));
261
  } else {
262
    DRWLock::WRLockGuard guard(rwlock_);
263
    ObTenantConfig *config = nullptr;
264
    if (OB_FAIL(dump2file())) {
265
      LOG_WARN("fail to dump config to file", KR(ret), K(arg));
266
    } else if (OB_FAIL(config_map_.get_refactored(ObTenantID(arg.tenant_id_), config))) {
267
      LOG_WARN("No tenant config found", K(arg.tenant_id_), K(ret));
268
    } else if (OB_FAIL(config->publish_special_config_after_dump())) {
269
      LOG_WARN("publish special config after dump failed", K(ret));
270
    }
271
  }
272
  return ret;
273
}
274

275
int ObTenantConfigMgr::add_tenant_config(uint64_t tenant_id)
276
{
277
  int ret = OB_SUCCESS;
278
  ObTenantConfig *const *config = nullptr;
279
  DRWLock::WRLockGuard guard(rwlock_);
280
  if (is_virtual_tenant_id(tenant_id)
281
      || OB_NOT_NULL(config = config_map_.get(ObTenantID(tenant_id)))) {
282
    if (nullptr != config) {
283
      ObTenantConfig *new_config = *config;
284
      new_config->set_deleting(false);
285
    }
286
  } else {
287
    ObTenantConfig *new_config = nullptr;
288
    new_config = OB_NEW(ObTenantConfig, SET_USE_UNEXPECTED_500("TenantConfig"), tenant_id);
289
    if (OB_NOT_NULL(new_config)) {
290
      if(OB_FAIL(new_config->init(this))) {
291
        LOG_WARN("new tenant config init failed", K(ret));
292
      } else if (OB_FAIL(config_map_.set_refactored(ObTenantID(tenant_id),
293
                                                               new_config, 0))) {
294
        LOG_WARN("add new tenant config failed", K(ret));
295
      }
296
      if (OB_FAIL(ret)) {
297
        ob_delete(new_config);
298
      }
299
    } else {
300
      ret = OB_ALLOCATE_MEMORY_FAILED;
301
      LOG_WARN("alloc new tenant config failed", K(ret));
302
    }
303
  }
304
  LOG_INFO("tenant config added", K(tenant_id), K(ret));
305
  return ret;
306
}
307

308
int ObTenantConfigMgr::del_tenant_config(uint64_t tenant_id)
309
{
310
  int ret = OB_SUCCESS;
311
  ObTenantConfig *config = nullptr;
312
  DRWLock::WRLockGuard guard(rwlock_);
313
  ObTenant *tenant = NULL;
314
  bool has_dropped = false;
315
  if (is_virtual_tenant_id(tenant_id)) {
316
  } else if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {
317
    LOG_WARN("get tenant config failed", K(tenant_id), K(ret));
318
  } else if (OB_FAIL(GSCHEMASERVICE.check_if_tenant_has_been_dropped(tenant_id, has_dropped))) {
319
    LOG_WARN("failed to check tenant has been dropped", K(tenant_id));
320
  } else if (!has_dropped && ObTimeUtility::current_time() - config->get_create_timestamp() < RECYCLE_LATENCY) {
321
    LOG_WARN("tenant still exist, try to delete tenant config later...", K(tenant_id));
322
  } else if (!config->is_ref_clear()) {
323
    ret = OB_EAGAIN;
324
    LOG_INFO("something hold config ref, try delete later...");
325
  } else if (OB_ISNULL(GCTX.omt_)) {
326
    ret = OB_ERR_UNEXPECTED;
327
    LOG_WARN("omt is null", KR(ret));
328
  } else if (GCTX.omt_->has_tenant(tenant_id)) {
329
    LOG_WARN("local tenant resource still exist, try to delete tenant config later", K(tenant_id));
330
  } else {
331
    config->set_deleting();
332
    if (OB_FAIL(wait(config->get_update_task()))) {
333
      LOG_WARN("wait tenant config update task failed", K(ret), K(tenant_id));
334
    } else if (OB_FAIL(config_map_.erase_refactored(ObTenantID(tenant_id)))) {
335
      LOG_WARN("delete tenant config failed", K(ret), K(tenant_id));
336
    } else {
337
      ob_delete(config);
338
      LOG_INFO("tenant config deleted", K(tenant_id), K(ret));
339
    }
340
  }
341
  return ret;
342
}
343

344
ObTenantConfig *ObTenantConfigMgr::get_tenant_config_with_lock(
345
    const uint64_t tenant_id, const uint64_t fallback_tenant_id /* = 0 */,
346
    const uint64_t timeout_us /* = 0 */) const
347
{
348
  int ret = OB_SUCCESS;
349
  ObTenantConfig *config = nullptr;
350
  DRWLock::RDLockGuard guard(rwlock_);
351
  do {
352
    if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config)) && timeout_us > 0) {
353
      ob_usleep(10 * 1000L);
354
      if (REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
355
        LOG_WARN("failed to get tenant config, retry for 1s", K(tenant_id), K(ret));
356
      }
357
    } else {
358
      break;
359
    }
360
  } while (!REACH_TIME_INTERVAL(timeout_us));
361
  if (OB_FAIL(ret)) {
362
    if (fallback_tenant_id > 0 && OB_INVALID_ID != fallback_tenant_id) {
363
      if (OB_FAIL(config_map_.get_refactored(ObTenantID(fallback_tenant_id), config))) {
364
        LOG_WARN("failed to get fallback tenant config", K(fallback_tenant_id), K(ret));
365
      }
366
    } else {
367
      LOG_WARN("failed to get tenant config", K(tenant_id), K(ret));
368
    }
369
  }
370
  if (OB_SUCC(ret) && OB_NOT_NULL(config)) {
371
    config->ref(); // remember to unref outside
372
  }
373
  return config;
374
}
375

376
int ObTenantConfigMgr::read_tenant_config(
377
    const uint64_t tenant_id,
378
    const uint64_t fallback_tenant_id,
379
    const SuccessFunctor &on_success,
380
    const FailureFunctor &on_failure) const
381
{
382
  int ret = OB_SUCCESS;
383
  ObTenantConfig *config = nullptr;
384
  DRWLock::RDLockGuard guard(rwlock_);
385
  if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {
386
    if (fallback_tenant_id > 0 && OB_INVALID_ID != fallback_tenant_id) {
387
      if (OB_FAIL(config_map_.get_refactored(ObTenantID(fallback_tenant_id), config))) {
388
        LOG_WARN("failed to get tenant config", K(fallback_tenant_id), K(ret), K(lbt()));
389
      }
390
    } else {
391
      LOG_WARN("failed to get tenant config", K(tenant_id), K(ret));
392
    }
393
  }
394
  if (OB_SUCC(ret) && OB_NOT_NULL(config)) {
395
    on_success(*config);
396
  } else {
397
    on_failure();
398
    LOG_WARN("fail read tenant config", K(tenant_id), K(ret));
399
  }
400
  return ret;
401
}
402

403
void ObTenantConfigMgr::print() const
404
{
405
  DRWLock::RDLockGuard guard(rwlock_);
406
  TenantConfigMap::const_iterator it = config_map_.begin();
407
  for (; it != config_map_.end(); ++it) {
408
    if (OB_NOT_NULL(it->second)) {
409
      it->second->print();
410
    }
411
  } // for
412
}
413

414
int ObTenantConfigMgr::dump2file()
415
{
416
  int ret = OB_SUCCESS;
417
  if (OB_FAIL(sys_config_mgr_->dump2file())) {
418
    LOG_WARN("failed to dump2file", K(ret));
419
  } else if (OB_FAIL(sys_config_mgr_->config_backup())) {
420
    LOG_WARN("failed to dump2file backup", K(ret));
421
  }
422
  return ret;
423
}
424

425
int ObTenantConfigMgr::set_tenant_config_version(uint64_t tenant_id, int64_t version)
426
{
427
  int ret = OB_SUCCESS;
428
  int64_t cur_version = 0;
429
  DRWLock::WRLockGuard guard(rwlock_);
430
  if (OB_FAIL(config_version_map_.get_refactored(ObTenantID(tenant_id), cur_version))) {
431
    if (OB_HASH_NOT_EXIST != ret) {
432
      LOG_WARN("get tenant config version fail", K(tenant_id), K(ret));
433
    } else {
434
      ret = OB_SUCCESS;
435
    }
436
  }
437
  if (OB_SUCC(ret)) {
438
    if (version <= cur_version) {
439
      // avoid set smaller version when refresh config_version_map_ in refresh_config_version_map()
440
    } else if (OB_FAIL(config_version_map_.set_refactored(ObTenantID(tenant_id), version, 1))) {
441
      LOG_WARN("set tenant config version fail", K(tenant_id), K(version), K(ret));
442
    }
443
  }
444
  return ret;
445
}
446

447
int64_t ObTenantConfigMgr::get_tenant_config_version(uint64_t tenant_id)
448
{
449
  int64_t version = ObSystemConfig::INIT_VERSION;
450
  int ret = OB_SUCCESS;
451
  DRWLock::RDLockGuard guard(rwlock_);
452
  if (OB_FAIL(config_version_map_.get_refactored(ObTenantID(tenant_id), version))) {
453
    if (OB_HASH_NOT_EXIST != ret) {
454
      LOG_WARN("get tenant config version fail", K(tenant_id), K(ret));
455
    }
456
  }
457
  return version;
458
}
459

460
// 由 RS 调用
461
int ObTenantConfigMgr::get_lease_response(share::ObLeaseResponse &lease_response)
462
{
463
  int ret = OB_SUCCESS;
464
  std::pair<uint64_t, int64_t> pair;
465
  lease_response.tenant_config_version_.reset();
466
  lease_response.tenant_config_version_.reserve(config_version_map_.size());
467
  DRWLock::RDLockGuard guard(rwlock_);
468
  TenantConfigVersionMap::const_iterator it = config_version_map_.begin();
469
  for (; it != config_version_map_.end() && OB_SUCC(ret); ++it) {
470
    pair.first = it->first.tenant_id_;
471
    pair.second = it->second;
472
    if (OB_FAIL(lease_response.tenant_config_version_.push_back(pair))) {
473
      LOG_WARN("push back tenant config fail",
474
               "tenant_id", pair.first, "version", pair.second, K(ret));
475
    } else {
476
      LOG_DEBUG("push back tenant config response succ",
477
               "tenant", pair.first, "version", pair.second);
478
    }
479
  }
480
  if (OB_FAIL(ret)) {
481
    // 和 obs 端约定,如果 lease_response 中没有返回任何 config version,则 obs 什么都不做
482
    lease_response.tenant_config_version_.reset();
483
  } else if (0 == config_version_map_.size()) {
484
    ret = OB_ENTRY_NOT_EXIST;
485
    if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
486
      LOG_INFO("config version map size is 0, try get lease later");
487
    }
488
  }
489
  return ret;
490
}
491

492
// 由各个 observer 调用
493
void ObTenantConfigMgr::get_lease_request(share::ObLeaseRequest &lease_request)
494
{
495
  int ret = OB_SUCCESS;
496
  DRWLock::RDLockGuard guard(rwlock_);
497
  std::pair<uint64_t, int64_t> pair;
498
  TenantConfigMap::const_iterator it = config_map_.begin();
499
  for (; it != config_map_.end(); ++it) {
500
    if (OB_NOT_NULL(it->second)) {
501
      pair.first = it->first.tenant_id_;
502
      pair.second = it->second->get_current_version();
503
      if (OB_FAIL(lease_request.tenant_config_version_.push_back(pair))) {
504
        LOG_WARN("push back tenant config fail", K(ret));
505
      }
506
    }
507
  } // for
508
}
509

510
// for __all_virtual_tenant_parameter_info
511
int ObTenantConfigMgr::get_all_tenant_config_info(common::ObArray<TenantConfigInfo> &all_config)
512
{
513
  int ret = OB_SUCCESS;
514
  DRWLock::RDLockGuard guard(rwlock_);
515
  TenantConfigMap::const_iterator it = config_map_.begin();
516
  for (; OB_SUCC(ret) && it != config_map_.end(); ++it) {
517
    uint64_t tenant_id = it->first.tenant_id_;
518
    ObTenantConfig *tenant_config = it->second;
519
    for (ObConfigContainer::const_iterator iter = tenant_config->get_container().begin();
520
         iter != tenant_config->get_container().end(); iter++) {
521
      TenantConfigInfo config_info(tenant_id);
522
      if (0 == ObString("compatible").case_compare(iter->first.str())
523
          && !iter->second->value_updated()) {
524
        if (OB_FAIL(config_info.set_value("0.0.0.0"))) {
525
          LOG_WARN("set value fail", K(iter->second->str()), K(ret));
526
        }
527
      } else {
528
        if (OB_FAIL(config_info.set_value(iter->second->str()))) {
529
          LOG_WARN("set value fail", K(iter->second->str()), K(ret));
530
        }
531
      }
532
      if (FAILEDx(config_info.set_name(iter->first.str()))) {
533
        LOG_WARN("set name fail", K(iter->first.str()), K(ret));
534
      } else if (OB_FAIL(config_info.set_data_type(iter->second->data_type()))) {
535
        LOG_WARN("set data_type fail", K(iter->second->data_type()), K(ret));
536
      } else if (OB_FAIL(config_info.set_info(iter->second->info()))) {
537
        LOG_WARN("set info fail", K(iter->second->info()), K(ret));
538
      } else if (OB_FAIL(config_info.set_section(iter->second->section()))) {
539
        LOG_WARN("set section fail", K(iter->second->section()), K(ret));
540
      } else if (OB_FAIL(config_info.set_scope(iter->second->scope()))) {
541
        LOG_WARN("set scope fail", K(iter->second->scope()), K(ret));
542
      } else if (OB_FAIL(config_info.set_source(iter->second->source()))) {
543
        LOG_WARN("set source fail", K(iter->second->source()), K(ret));
544
      } else if (OB_FAIL(config_info.set_edit_level(iter->second->edit_level()))) {
545
        LOG_WARN("set edit_level fail", K(iter->second->edit_level()), K(ret));
546
      } else if (OB_FAIL(all_config.push_back(config_info))) {
547
        LOG_WARN("push to array fail", K(config_info), K(ret));
548
      }
549
    } // for
550
  } // for
551
  return ret;
552
}
553

554
int ObTenantConfigMgr::got_versions(const common::ObIArray<std::pair<uint64_t, int64_t>> &versions)
555
{
556
  int ret = OB_SUCCESS;
557
  DRWLock::RDLockGuard guard(rwlock_);
558
  for (int i = 0; i < versions.count(); ++i) {
559
    uint64_t tenant_id = versions.at(i).first;
560
    int64_t version = versions.at(i).second;
561
    if (OB_FAIL(got_version(tenant_id, version))) {
562
      LOG_WARN("fail got version", K(tenant_id), K(version), K(ret));
563
    }
564
  }
565
  return ret;
566
}
567

568
int ObTenantConfigMgr::got_version(uint64_t tenant_id, int64_t version, const bool remove_repeat/* = true */)
569
{
570
  int ret = OB_SUCCESS;
571
  DRWLock::RDLockGuard guard(rwlock_);
572
  ObTenantConfig *config = nullptr;
573
  if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {
574
    LOG_WARN("No tenant config found", K(tenant_id), K(ret));
575
  } else {
576
    ret = config->got_version(version, remove_repeat);
577
  }
578
  return ret;
579
}
580

581
int ObTenantConfigMgr::update_local(uint64_t tenant_id, int64_t expected_version)
582
{
583
  int ret = OB_SUCCESS;
584
  if (OB_ISNULL(sql_proxy_)) {
585
    ret = OB_NOT_INIT;
586
    LOG_WARN("sql proxy is null", K(ret));
587
  } else {
588
    uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
589
    ObTenantConfig *config = nullptr;
590
    ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_);
591
    ObSqlString sql;
592
    SMART_VAR(ObMySQLProxy::MySQLResult, result) {
593
      if (OB_FAIL(sql.assign_fmt(
594
          "select config_version, zone, svr_type, svr_ip, svr_port, name, "
595
          "data_type, value, info, section, scope, source, edit_level "
596
          "from %s where tenant_id = '%lu'", OB_TENANT_PARAMETER_TNAME, tenant_id))) {
597
      } else if (OB_FAIL(sql_client_retry_weak.read(result, exec_tenant_id, sql.ptr()))) {
598
        LOG_WARN("read config from __tenant_parameter failed",
599
                 KR(ret), K(tenant_id), K(exec_tenant_id), K(sql));
600
      } else {
601
        DRWLock::WRLockGuard guard(rwlock_);
602
        ret = config_map_.get_refactored(ObTenantID(tenant_id), config);
603
        if (OB_FAIL(ret)) {
604
          LOG_ERROR("failed to get tenant config", K(tenant_id), K(ret));
605
        } else {
606
          ret = config->update_local(expected_version, result);
607
        }
608
      }
609
    }
610
  }
611
  return ret;
612
}
613

614
void ObTenantConfigMgr::notify_tenant_config_changed(uint64_t tenant_id)
615
{
616
  update_tenant_config_cb_(tenant_id);
617
}
618

619
int ObTenantConfigMgr::add_config_to_existing_tenant(const char *config_str)
620
{
621
  int ret = OB_SUCCESS;
622
  DRWLock::WRLockGuard guard(rwlock_);
623
  if (!config_map_.empty() && nullptr != config_str) {
624
    TenantConfigMap::const_iterator it = config_map_.begin();
625
    for (; it != config_map_.end() && OB_SUCC(ret); ++it) {
626
      if (OB_NOT_NULL(it->second)) {
627
        int64_t version = ObTimeUtility::current_time();
628
        if (OB_FAIL(it->second->add_extra_config(config_str, version))) {
629
          LOG_WARN("add tenant extra config failed", "tenant_id", it->second->get_tenant_id(),
630
                   "config_str", config_str, KR(ret));
631
        }
632
      }
633
    }
634
  }
635
  return ret;
636
}
637

638
int ObTenantConfigMgr::add_extra_config(const obrpc::ObTenantConfigArg &arg)
639
{
640
  int ret = OB_SUCCESS;
641
  ObTenantConfig *config = nullptr;
642
  if (!arg.is_valid()) {
643
    ret = OB_INVALID_ARGUMENT;
644
    LOG_ERROR("invalid arg", K(ret), K(arg));
645
  } else {
646
    DRWLock::WRLockGuard guard(rwlock_);
647
    if (OB_FAIL(config_map_.get_refactored(ObTenantID(arg.tenant_id_), config))) {
648
      LOG_ERROR("failed to get tenant config", K(arg.tenant_id_), K(ret));
649
    } else {
650
      ret = config->add_extra_config(arg.config_str_.ptr());
651
    }
652
  }
653
  FLOG_INFO("add tenant extra config", K(arg));
654
  return ret;
655
}
656

657
int ObTenantConfigMgr::schedule(ObTenantConfig::TenantConfigUpdateTask &task, const int64_t delay)
658
{
659
  int ret = OB_SUCCESS;
660
  bool repeat = false;
661
  if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, task, delay, repeat))) {
662
    ret = OB_ERR_UNEXPECTED;
663
    LOG_WARN("schedule task failed", K(ret));
664
  }
665
  return ret;
666
}
667

668
int ObTenantConfigMgr::cancel(const ObTenantConfig::TenantConfigUpdateTask &task)
669
{
670
  int ret = OB_SUCCESS;
671
  bool exist = false;
672
  if (OB_FAIL(TG_CANCEL_R(lib::TGDefIDs::CONFIG_MGR, task))) {
673
    LOG_WARN("cancel tenant config update task failed", K(ret));
674
  }
675
  return ret;
676
}
677

678
int ObTenantConfigMgr::wait(const ObTenantConfig::TenantConfigUpdateTask &task)
679
{
680
  int ret = OB_EAGAIN;
681
  const int try_times = 300;
682
  const int64_t period = 10000;
683
  ret = OB_EAGAIN;
684
  for (int i = 0; i < try_times; ++i) {
685
    if (ATOMIC_LOAD(&task.running_task_count_) > 0) {
686
      // wait running task finish
687
    } else {
688
      ret = OB_SUCCESS;
689
      break;
690
    }
691
    ob_usleep(period);
692
  }  // for
693
  if (OB_EAGAIN == ret) {
694
    LOG_WARN("wait running update task failed, try later", K(ret), K(ATOMIC_LOAD(&task.running_task_count_)));
695
  }
696
  return ret;
697
}
698

699
OB_DEF_SERIALIZE(ObTenantConfigMgr)
700
{
701
  int ret = OB_SUCCESS;
702
  int64_t expect_data_len = get_serialize_size_();
703
  int64_t saved_pos = pos;
704
  TenantConfigMap::const_iterator it = config_map_.begin();
705
  for (; OB_SUCC(ret) && it != config_map_.end(); ++it) {
706
    if (OB_ISNULL(it->second)) {
707
      ret = OB_ERR_UNEXPECTED;
708
    } else {
709
      ret = it->second->serialize(buf, buf_len, pos);
710
    }
711
  } // for
712
  if (OB_SUCC(ret)) {
713
    int64_t writen_len = pos - saved_pos;
714
    if (writen_len != expect_data_len) {
715
      ret = OB_ERR_UNEXPECTED;
716
      LOG_WARN("unexpected data size", K(writen_len), K(expect_data_len));
717
    }
718
  }
719
  return ret;
720
}
721

722
OB_DEF_DESERIALIZE(ObTenantConfigMgr)
723
{
724
  int ret = OB_SUCCESS;
725
  if (data_len == 0 || pos >= data_len) {
726
  } else {
727
    while(OB_SUCC(ret) && pos < data_len) {
728
      int64_t saved_pos = pos;
729
      int64_t ignore_version = 0, ignore_len = 0;
730
      OB_UNIS_DECODE(ignore_version);
731
      OB_UNIS_DECODE(ignore_len);
732
      if (OB_FAIL(ret)) {
733
      } else if ('[' != *(buf + pos)) {
734
        ret = OB_INVALID_DATA;
735
        LOG_ERROR("invalid tenant config", K(ret));
736
      } else {
737
        int64_t cur = pos + 1;
738
        while (cur < data_len - 1 && ']' != *(buf + cur)) {
739
          ++cur;
740
        } // while
741
        if (cur >= data_len - 1 || '\n' != *(buf + cur + 1)) {
742
          ret = OB_INVALID_DATA;
743
          LOG_ERROR("invalid tenant config", K(ret));
744
        } else {
745
          uint64_t tenant_id = OB_INVALID_TENANT_ID;
746
          char tenant_str[100];
747
          char *p_end = nullptr;
748
          MEMSET(tenant_str, '\0', 100);
749
          if (cur - pos - 1 < 100) {
750
            MEMCPY(tenant_str, buf + pos + 1, cur - pos - 1);
751
            tenant_id = strtoul(tenant_str, &p_end, 0);
752
          } else {
753
            ret = OB_INVALID_DATA;
754
            LOG_ERROR("invalid tenant id", K(ret));
755
            break;
756
          }
757
          if ('\0' != *p_end) {
758
            ret = OB_INVALID_CONFIG;
759
            LOG_ERROR("invalid tenant id", K(ret));
760
          } else {
761
            pos = cur + 2;
762
            ObTenantConfig *config = nullptr;
763
            if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {
764
              if (ret != OB_HASH_NOT_EXIST || OB_FAIL(add_tenant_config(tenant_id))) {
765
                LOG_ERROR("get tenant config failed", K(tenant_id),  K(ret));
766
                break;
767
              }
768
              ret = config_map_.get_refactored(ObTenantID(tenant_id), config);
769
            }
770
            if (OB_SUCC(ret)) {
771
              pos = saved_pos;
772
              ret = config->deserialize(buf, data_len, pos);
773
            }
774
          }
775
        }
776
      } // else
777
    } // while
778
  }
779
  return ret;
780
}
781

782
OB_DEF_SERIALIZE_SIZE(ObTenantConfigMgr)
783
{
784
  int64_t len = 0;
785
  TenantConfigMap::const_iterator it = config_map_.begin();
786
  for (; it != config_map_.end(); ++it) {
787
    if (OB_NOT_NULL(it->second)) {
788
      len += it->second->get_serialize_size();
789
    }
790
  } // for
791
  return len;
792
}
793

794
} // omt
795
} // oceanbase
796

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.