oceanbase

Форк
0
/
ob_tenant_node_balancer.cpp 
668 строк · 25.8 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER_OMT
14
#include "ob_tenant_node_balancer.h"
15
#include "lib/time/ob_time_utility.h"
16
#include "lib/oblog/ob_log.h"
17
#include "lib/alloc/ob_malloc_allocator.h"
18
#include "lib/container/ob_se_array_iterator.h"
19
#include "lib/mysqlclient/ob_mysql_proxy.h"
20
#include "share/ob_tenant_mgr.h"
21
#include "share/ob_debug_sync.h"
22
#include "share/system_variable/ob_sys_var_class_type.h"
23
#include "observer/ob_inner_sql_result.h"
24
#include "ob_tenant.h"
25
#include "ob_multi_tenant.h"
26
#include "share/allocator/ob_tenant_mutil_allocator.h"
27
#include "share/allocator/ob_tenant_mutil_allocator_mgr.h"
28
#include "share/config/ob_server_config.h"
29
#include "observer/ob_server_struct.h"
30
#include "observer/omt/ob_tenant_config_mgr.h"
31
#include "storage/blocksstable/ob_block_manager.h"
32
#include "logservice/palf/palf_options.h"
33
#include "logservice/ob_server_log_block_mgr.h"
34
#include "storage/tx_storage/ob_tenant_freezer.h"
35
#include "storage/tx_storage/ob_ls_service.h"
36
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
37
#include "storage/slog_ckpt/ob_server_checkpoint_slog_handler.h"
38
#include "observer/ob_server_event_history_table_operator.h"
39
#ifdef OB_BUILD_TDE_SECURITY
40
#include "share/ob_master_key_getter.h"
41
#endif
42

43
using namespace oceanbase::obsys;
44
using namespace oceanbase::lib;
45
using namespace oceanbase::common;
46
using namespace oceanbase::share;
47
using namespace oceanbase::omt;
48
using namespace oceanbase::observer;
49
using namespace oceanbase::storage;
50

51
ObTenantNodeBalancer::ObTenantNodeBalancer()
52
    : omt_(NULL), myaddr_(), unit_getter_(), lock_(common::ObLatchIds::CONFIG_LOCK),
53
      refresh_interval_(10L * 1000L * 1000L)
54
{
55
  if (lib::is_mini_mode()) {
56
    refresh_interval_ /= 2;
57
  }
58
}
59

60
ObTenantNodeBalancer::~ObTenantNodeBalancer()
61
{
62
  omt_ = NULL;
63
  myaddr_.reset();
64
}
65

66
int ObTenantNodeBalancer::init(ObMultiTenant *omt, common::ObMySQLProxy &sql_proxy,
67
    const common::ObAddr &myaddr)
68
{
69
  int ret = OB_SUCCESS;
70
  myaddr_ = myaddr;
71
  if (OB_FAIL(unit_getter_.init(sql_proxy, &GCONF))) {
72
    LOG_ERROR("init unit getter fail", K(ret));
73
  } else {
74
    omt_= omt;
75
    myaddr_ = myaddr;
76
  }
77
  return ret;
78
}
79

80
void ObTenantNodeBalancer::run1()
81
{
82
  int ret = OB_SUCCESS;
83
  lib::set_thread_name("OmtNodeBalancer");
84

85
  while (!has_set_stop()) {
86
    TenantUnits units;
87
    int64_t sys_unit_cnt = 0;
88
    ObCurTraceId::init(GCONF.self_addr_);
89
    if (!ObServerCheckpointSlogHandler::get_instance().is_started()) {
90
      // do nothing if not finish replaying slog
91
      LOG_INFO("server slog not finish replaying, need wait");
92
      ret = OB_NEED_RETRY;
93
    } else if (OB_FAIL(unit_getter_.get_sys_unit_count(sys_unit_cnt))) {
94
      LOG_WARN("get sys unit count fail", KR(ret));
95
    } else if (sys_unit_cnt <= 0) {
96
      // check wether sys tenant has been created, do nothing if sys tenant has not been created
97
      LOG_INFO("sys tenant has not been created, tenant node balancer can not run, need wait",
98
          K(sys_unit_cnt));
99
      ret = OB_NEED_RETRY;
100
    } else if (OB_FAIL(unit_getter_.get_server_tenant_configs(myaddr_, units))) {
101
      LOG_WARN("get server tenant units fail", K(myaddr_), K(ret));
102
    } else if (OB_FAIL(refresh_tenant(units))) {
103
      LOG_WARN("failed to refresh tenant", K(ret), K(units));
104
    } else if (FALSE_IT(periodically_check_tenant())) {
105
      // never reach here
106
    }
107

108
    FLOG_INFO("refresh tenant units", K(sys_unit_cnt), K(units), KR(ret));
109

110
    // will try to update tma whether tenant unit is changed or not,
111
    // because memstore_limit_percentage may be changed
112
    int tmp_ret = OB_SUCCESS;
113
    if (OB_SUCCESS != (tmp_ret = TMA_MGR_INSTANCE.update_tenant_mem_limit(units))) {
114
      LOG_WARN("TMA_MGR_INSTANCE.update_tenant_mem_limit failed", K(tmp_ret));
115
    }
116

117
    // check whether tenant unit is changed, try to update unit config of tenant
118
    ObSEArray<uint64_t, 10> tenants;
119
    if (!ObServerCheckpointSlogHandler::get_instance().is_started()) {
120
      // do nothing if not finish replaying slog
121
      LOG_INFO("server slog not finish replaying, need wait");
122
      ret = OB_NEED_RETRY;
123
    } else if (OB_FAIL(unit_getter_.get_tenants(tenants))) {
124
      LOG_WARN("get cluster tenants fail", K(ret));
125
    } else if (OB_FAIL(OTC_MGR.refresh_tenants(tenants))) {
126
      LOG_WARN("fail refresh tenant config", K(tenants), K(ret));
127
    }
128
    if (OB_SUCCESS != (tmp_ret = GCTX.log_block_mgr_->try_resize())) {
129
      LOG_WARN("ObServerLogBlockMgr try_resize failed", K(tmp_ret));
130
    }
131

132
    FLOG_INFO("refresh tenant config", K(tenants), K(ret));
133

134

135
    USLEEP(refresh_interval_);  // sleep 10s
136
  }
137
}
138

139
int ObTenantNodeBalancer::handle_notify_unit_resource(const obrpc::TenantServerUnitConfig &arg)
140
{
141
  int ret = OB_SUCCESS;
142
  if (!arg.is_delete_) {
143
    if (OB_FAIL(notify_create_tenant(arg))) {
144
      LOG_WARN("failed to notify update tenant", KR(ret), K(arg));
145
    }
146
  } else {
147
    if (OB_FAIL(try_notify_drop_tenant(arg.tenant_id_))) {
148
      LOG_WARN("fail to try drop tenant", KR(ret), K(arg));
149
    }
150
  }
151
  return ret;
152
}
153

154
int ObTenantNodeBalancer::notify_create_tenant(const obrpc::TenantServerUnitConfig &unit)
155
{
156
  LOG_INFO("succ to receive notify of creating tenant", K(unit));
157
  int ret = OB_SUCCESS;
158
  bool is_hidden_sys = false;
159
  bool unit_id_exist = false;
160

161
  if (!unit.is_valid()) {
162
    ret = OB_INVALID_ARGUMENT;
163
    LOG_WARN("invalid argument", KR(ret), K(unit));
164
  } else if (!ObServerCheckpointSlogHandler::get_instance().is_started()) {
165
    ret = OB_SERVER_IS_INIT;
166
    LOG_WARN("slog replay not finish", KR(ret),K(unit));
167
  } else if (is_meta_tenant(unit.tenant_id_)) {
168
    ret = OB_OP_NOT_ALLOW;
169
    LOG_WARN("can not create meta tenant", K(ret), K(unit));
170
  } else if (OB_FAIL(omt_->check_if_hidden_sys(unit.tenant_id_, is_hidden_sys))) {
171
    LOG_WARN("fail to check_if_hidden_sys", KR(ret), K(unit));
172
  } else if (omt_->has_tenant(unit.tenant_id_) && !is_hidden_sys) {
173
    ret = OB_TENANT_EXIST;
174
    LOG_WARN("tenant has exist", KR(ret), K(unit));
175
  } else if (is_user_tenant(unit.tenant_id_) && omt_->has_tenant(gen_meta_tenant_id(unit.tenant_id_))) {
176
    ret = OB_TENANT_EXIST;
177
    LOG_WARN("meta tenant has exist", KR(ret), K(unit));
178
   // TODO(fenggu.yh) 临时注释,防止创建租户失败
179
  //} else if (OB_FAIL(omt_->check_if_unit_id_exist(unit.unit_id_, unit_id_exist))) {
180
  //  LOG_WARN("fail to check_if_unit_id_exist", KR(ret), K(unit));
181
  } else if (unit_id_exist) { // the unit may be wait_gc status
182
    ret = OB_ENTRY_EXIST;
183
    LOG_WARN("unit_id exist", KR(ret), K(unit));
184
  } else {
185
    const uint64_t tenant_id = unit.tenant_id_;
186
    ObUnitInfoGetter::ObTenantConfig basic_tenant_unit;
187
    ObUnitInfoGetter::ObTenantConfig meta_tenant_unit;
188
    const bool has_memstore = (unit.replica_type_ != REPLICA_TYPE_LOGONLY);
189
    const int64_t create_timestamp = ObTimeUtility::current_time();
190
    basic_tenant_unit.unit_status_ = ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL;
191
    const int64_t create_tenant_timeout_ts = THIS_WORKER.get_timeout_ts();
192

193
    if (create_tenant_timeout_ts < create_timestamp) {
194
      ret = OB_TIMEOUT;
195
      LOG_WARN("notify_create_tenant has timeout", K(ret), K(create_timestamp), K(create_tenant_timeout_ts));
196
    } else if (OB_FAIL(basic_tenant_unit.init(tenant_id,
197
                                       unit.unit_id_,
198
                                       ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL,
199
                                       unit.unit_config_,
200
                                       unit.compat_mode_,
201
                                       create_timestamp,
202
                                       has_memstore,
203
                                       false /*is_removed*/))) {
204
      LOG_WARN("fail to init user tenant config", KR(ret), K(unit));
205
    } else if (is_user_tenant(tenant_id)
206
        && OB_FAIL(basic_tenant_unit.divide_meta_tenant(meta_tenant_unit))) {
207
      LOG_WARN("divide meta tenant failed", KR(ret), K(unit), K(basic_tenant_unit));
208
    } else if (OB_FAIL(check_new_tenant(basic_tenant_unit, false /*check_data_version*/, create_tenant_timeout_ts))) {
209
      LOG_WARN("failed to create new tenant", KR(ret), K(basic_tenant_unit), K(create_tenant_timeout_ts));
210
    } else {
211
      ret = OB_SUCCESS;
212
      LOG_INFO("succ to create new user tenant", KR(ret), K(unit), K(basic_tenant_unit), K(create_tenant_timeout_ts));
213
    }
214
#ifdef OB_BUILD_TDE_SECURITY
215
    // get and set root_key
216
    if (OB_SUCC(ret)) {
217
      if (!unit.with_root_key_) {
218
        ObRootKey root_key;
219
        if (OB_FAIL(ObMasterKeyGetter::instance().get_root_key(tenant_id, root_key, true))) {
220
          LOG_WARN("failed to get root key", KR(ret));
221
        }
222
      } else {
223
        const obrpc::ObRootKeyResult &root_key = unit.root_key_;
224
        if (obrpc::RootKeyType::INVALID == root_key.key_type_) {
225
          // do nothing
226
          LOG_INFO("root_key got from RS is INVALID, won't set now", KR(ret));
227
        } else if (OB_FAIL(ObMasterKeyGetter::instance().set_root_key(
228
                            tenant_id, root_key.key_type_, root_key.root_key_))) {
229
          LOG_WARN("failed to set root_key", KR(ret));
230
        }
231
      }
232
    }
233
#endif
234
    // create meta tenant
235
    if (OB_SUCC(ret) && is_user_tenant(tenant_id)) {
236
      if (OB_FAIL(check_new_tenant(meta_tenant_unit, false /*check_data_version*/, create_tenant_timeout_ts))) {
237
        LOG_WARN("failed to create meta tenant", KR(ret), K(meta_tenant_unit), K(create_tenant_timeout_ts));
238
      } else {
239
        ret = OB_SUCCESS;
240
        LOG_INFO("succ to create meta tenant", KR(ret), K(meta_tenant_unit), K(create_tenant_timeout_ts));
241
      }
242
    }
243
  }
244

245
  // In standby cluster, may repeat create tenant, if if_not_grant_ is true, ignore OB_TENANT_EXIST
246
  if (OB_TENANT_EXIST == ret && unit.if_not_grant_) {
247
    if (GCTX.is_standby_cluster()) {
248
      ret = OB_SUCCESS;
249
    }
250
  }
251

252
  return ret;
253
}
254

255
// 标记删除,而不是直接删,是因为并发时,另一个线程可能刷到tenant了,但是还没有refresh tenant,
256
// 此时drop tenant将tenant删除了,另一个线程过一会refresh tenant时,又给加回来了
257
// 所以这里只做标记,删除tenant统一在refresh tenant里做
258
int ObTenantNodeBalancer::try_notify_drop_tenant(const int64_t tenant_id)
259
{
260
  LOG_INFO("[DELETE_TENANT] succ to receive notify of dropping tenant", K(tenant_id));
261
  int ret = OB_SUCCESS;
262
  int tmp_ret = OB_SUCCESS;
263
  TCWLockGuard guard(lock_);
264
  uint64_t meta_tenant_id = OB_INVALID_TENANT_ID;
265
  if (OB_UNLIKELY(is_meta_tenant(tenant_id))) {
266
    ret = OB_OP_NOT_ALLOW;
267
    LOG_WARN("meta tenant is not allowed", KR(ret), K(tenant_id));
268
  } else if (OB_ISNULL(omt_)) {
269
    ret = OB_ERR_UNEXPECTED;
270
    LOG_WARN("omt_ is null", KR(ret),KP(omt_));
271
  } else {
272
    if (OB_TMP_FAIL(omt_->mark_del_tenant(tenant_id))) {
273
      LOG_WARN("fail to mark del user_tenant", KR(ret), KR(tmp_ret), K(tenant_id));
274
      ret = OB_SUCC(ret) ? tmp_ret : ret;
275
    }
276
    meta_tenant_id = gen_meta_tenant_id(tenant_id);
277
    if (OB_TMP_FAIL(omt_->mark_del_tenant(meta_tenant_id))) {
278
      LOG_WARN("fail to mark del meta_tenant", KR(ret), KR(tmp_ret), K(meta_tenant_id));
279
      ret = OB_SUCC(ret) ? tmp_ret : ret;
280
    }
281
  }
282
  LOG_INFO("[DELETE_TENANT] mark drop tenant", KR(ret), K(tenant_id), K(meta_tenant_id));
283
  return ret;
284
}
285

286
int ObTenantNodeBalancer::get_server_allocated_resource(ServerResource &server_resource)
287
{
288
  int ret = OB_SUCCESS;
289
  server_resource.reset();
290
  TenantUnits tenant_units;
291

292
  if (OB_FAIL(omt_->get_tenant_units(tenant_units, false))) {
293
    LOG_WARN("failed to get tenant units");
294
  } else {
295
    for (int64_t i = 0; i < tenant_units.count(); i++) {
296
      // META tenant and USER tenant share CPU resource, so skip META tenant
297
      if (! is_meta_tenant(tenant_units.at(i).tenant_id_)) {
298
        server_resource.max_cpu_ += tenant_units.at(i).config_.max_cpu();
299
        server_resource.min_cpu_ += tenant_units.at(i).config_.min_cpu();
300
      }
301
      int64_t extra_memory = is_sys_tenant(tenant_units.at(i).tenant_id_) ? GMEMCONF.get_extra_memory() : 0;
302
      server_resource.memory_size_ += max(ObMallocAllocator::get_instance()->get_tenant_limit(tenant_units.at(i).tenant_id_) - extra_memory,
303
                                          tenant_units.at(i).config_.memory_size());
304
      server_resource.log_disk_size_ += tenant_units.at(i).config_.log_disk_size();
305
    }
306
  }
307
  return ret;
308
}
309

310

311
int ObTenantNodeBalancer::lock_tenant_balancer()
312
{
313
  return lock_.rdlock();
314
}
315

316
int ObTenantNodeBalancer::unlock_tenant_balancer()
317
{
318
  return lock_.unlock();
319
}
320

321
int ObTenantNodeBalancer::check_del_tenants(const TenantUnits &local_units, TenantUnits &units)
322
{
323
  int ret = OB_SUCCESS;
324

325
  for (int64_t i = 0; i < local_units.count(); i++) {
326
    bool tenant_exists = false;
327
    const ObUnitInfoGetter::ObTenantConfig &local_unit = local_units.at(i);
328
    for (auto punit = units.begin(); punit != units.end(); punit++) {
329
      if (local_unit.tenant_id_ == punit->tenant_id_) {
330
        tenant_exists = true;
331
        break;
332
      }
333
    }
334
    if (!tenant_exists) {
335
      LOG_INFO("[DELETE_TENANT] begin to delete tenant", K(local_unit));
336
      if (OB_SYS_TENANT_ID == local_unit.tenant_id_) {
337
        LOG_INFO("[DELETE_TENANT] need convert_real_to_hidden_sys_tenant");
338
        if (OB_FAIL(omt_->convert_real_to_hidden_sys_tenant())) {
339
          LOG_INFO("fail to convert_real_to_hidden_sys_tenant", K(ret));
340
        }
341
      } else if (OB_FAIL(omt_->del_tenant(local_unit.tenant_id_))) {
342
        LOG_WARN("delete tenant fail", K(local_unit), K(ret));
343
      }
344
    }
345
  }
346

347
  return ret;
348
}
349

350
int ObTenantNodeBalancer::check_new_tenants(TenantUnits &units)
351
{
352
  int ret = OB_SUCCESS;
353
  int tmp_ret = OB_SUCCESS;
354

355
  DEBUG_SYNC(CHECK_NEW_TENANT);
356

357
  const bool check_data_version = true;
358
  // check all units of tenants.
359
  for (TenantUnits::iterator it = units.begin(); it != units.end(); it++) {
360
    if (OB_TMP_FAIL(check_new_tenant(*it, check_data_version))) {
361
      LOG_WARN("failed to check new tenant", KR(tmp_ret));
362
      ret = OB_SUCC(ret) ? tmp_ret : ret;
363
    }
364
  }
365
  return ret;
366
}
367

368
int ObTenantNodeBalancer::check_new_tenant(
369
    const ObUnitInfoGetter::ObTenantConfig &unit,
370
    const bool check_data_version,
371
    const int64_t abs_timeout_us)
372
{
373
  int ret = OB_SUCCESS;
374

375
  const int64_t tenant_id = unit.tenant_id_;
376
  ObTenant *tenant = nullptr;
377

378
  if (OB_FAIL(omt_->get_tenant(tenant_id, tenant))) {
379
    if (is_sys_tenant(tenant_id)) {
380
      ret = OB_ERR_UNEXPECTED;
381
      LOG_ERROR("real or hidden sys tenant must be exist", K(ret));
382
    } else {
383
      ret = OB_SUCCESS;
384
      ObTenantMeta tenant_meta;
385
      ObTenantSuperBlock super_block(tenant_id, false /*is_hidden*/);  // empty super block
386
      const bool should_check_data_version = check_data_version && is_user_tenant(tenant_id);
387
      uint64_t data_version = 0;
388
      if (should_check_data_version && OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
389
        if (OB_ENTRY_NOT_EXIST == ret) {
390
          ret = OB_EAGAIN;
391
          LOG_WARN("data_version not refreshed yet, create tenant later", KR(ret), K(tenant_id));
392
        } else {
393
          LOG_WARN("fail to get data_version", KR(ret), K(tenant_id));
394
        }
395
      } else if (OB_FAIL(tenant_meta.build(unit, super_block))) {
396
        LOG_WARN("fail to build tenant meta", K(ret));
397
      } else if (OB_FAIL(omt_->create_tenant(tenant_meta, true /* write_slog */, abs_timeout_us))) {
398
        LOG_WARN("fail to create new tenant", K(ret), K(tenant_id));
399
      }
400
    }
401
  } else {
402
    int64_t extra_memory = 0;
403
    if (is_sys_tenant(tenant_id)) {
404
      if (tenant->is_hidden() && OB_FAIL(omt_->convert_hidden_to_real_sys_tenant(unit, abs_timeout_us))) {
405
        LOG_WARN("fail to create real sys tenant", K(unit));
406
      }
407
      extra_memory = GMEMCONF.get_extra_memory();
408
    }
409
    if (OB_SUCC(ret) && !(unit == tenant->get_unit())) {
410
      if (OB_FAIL(omt_->update_tenant_unit(unit))) {
411
        LOG_WARN("fail to update tenant unit", K(ret), K(tenant_id));
412
      }
413
    }
414
    if (OB_SUCC(ret) && OB_FAIL(omt_->update_tenant_memory(unit, extra_memory))) {
415
      LOG_ERROR("fail to update tenant memory", K(ret), K(tenant_id));
416
    }
417
  }
418
  if (OB_SUCC(ret) && !is_virtual_tenant_id(tenant_id)) {
419
    if (OB_FAIL(omt_->modify_tenant_io(tenant_id, unit.config_))) {
420
      LOG_WARN("modify tenant io config failed", K(ret), K(tenant_id), K(unit.config_));
421
    }
422
  }
423
  return ret;
424
}
425

426
int ObTenantNodeBalancer::refresh_hidden_sys_memory()
427
{
428
  int ret = OB_SUCCESS;
429
  int64_t allowed_mem_limit = 0;
430
  ObTenant *tenant = nullptr;
431
  if (OB_FAIL(omt_->get_tenant(OB_SYS_TENANT_ID, tenant))) {
432
    LOG_WARN("get sys tenant failed", K(ret));
433
  } else if (OB_ISNULL(tenant) || !tenant->is_hidden()) {
434
    // do nothing
435
  } else if (OB_FAIL(omt_->update_tenant_memory(OB_SYS_TENANT_ID, GMEMCONF.get_hidden_sys_memory(), allowed_mem_limit))) {
436
    LOG_WARN("update hidden sys tenant memory failed", K(ret));
437
  } else {
438
    LOG_INFO("update hidden sys tenant memory succeed ", K(allowed_mem_limit));
439
  }
440
  return ret;
441
}
442

443
void ObTenantNodeBalancer::periodically_check_tenant()
444
{
445
  int ret = OB_SUCCESS;
446
  struct TenantHandlePair {
447
    ObTenant *tenant_;
448
    ObLDHandle *handle_;
449
    TO_STRING_KV(KP(tenant_));
450
  };
451
  ObSEArray<TenantHandlePair, 32> pairs;
452
  omt_->lock_tenant_list();
453
  TenantList &tenants = omt_->get_tenant_list();
454
  ObArenaAllocator alloc("lock_diagnose");
455
  for (TenantList::iterator it = tenants.begin();
456
       it != tenants.end();
457
       it++) {
458
    void *ptr = nullptr;
459
    ObLDHandle *handle = nullptr;
460
    if (!OB_ISNULL(*it) && !(*it)->has_stopped()) {
461
      if (OB_ISNULL(ptr = alloc.alloc(sizeof(ObLDHandle)))) {
462
        ret = OB_ALLOCATE_MEMORY_FAILED;
463
        LOG_WARN("failed to alloc", K(ret));
464
      } else if (FALSE_IT(handle = new (ptr) ObLDHandle())) {
465
      } else if (OB_FAIL((*it)->rdlock(*handle))) {
466
        LOG_WARN("failed to rd lock tenant", K(ret));
467
      } else {
468
        TenantHandlePair pair;
469
        pair.tenant_ = *it;
470
        pair.handle_ = handle;
471
        if (OB_FAIL(pairs.push_back(pair))) {
472
          LOG_WARN("failed to push back tenant", K(ret));
473
        } else {/*do-nothing*/}
474
        // cleanup
475
        if (OB_FAIL(ret)) {
476
          IGNORE_RETURN (*it)->unlock(*handle);
477
        }
478
      }
479
    }
480
  }
481
  omt_->unlock_tenant_list();
482

483
  int i = 0;
484
  for (auto it = pairs.begin();
485
       it != pairs.end();
486
       it++) {
487
    (*it).tenant_->periodically_check();
488
    IGNORE_RETURN (*it).tenant_->unlock(*(*it).handle_);
489
  }
490
}
491

492
// Although unit has been deleted, the local cached unit cannot be deleted if the tenant still holds resource
493
int ObTenantNodeBalancer::fetch_effective_tenants(const TenantUnits &old_tenants, TenantUnits &new_tenants)
494
{
495
  int ret = OB_SUCCESS;
496
  bool found = false;
497
  bool is_released = false;
498
  TenantUnits tenants;
499

500
  for (int64_t i = 0; OB_SUCC(ret) && i < old_tenants.count(); i++) {
501
    found = false;
502
    const ObUnitInfoGetter::ObTenantConfig &tenant_config = old_tenants.at(i);
503
    const ObUnitInfoGetter::ObUnitStatus local_unit_status = tenant_config.unit_status_;
504
    for (int64_t j = 0; j < new_tenants.count(); j++) {
505
      if (tenant_config.tenant_id_ == new_tenants.at(j).tenant_id_) {
506
        new_tenants.at(j).create_timestamp_ = tenant_config.create_timestamp_;
507
        new_tenants.at(j).is_removed_ = tenant_config.is_removed_;
508
        found = true;
509
        break;
510
      }
511
    }
512

513
    if (!found) {
514
      ObTenant *tenant = nullptr;
515
      MTL_SWITCH(tenant_config.tenant_id_) {
516
        if (OB_FAIL(MTL(ObTenantMetaMemMgr*)->check_all_meta_mem_released(is_released, "[DELETE_TENANT]"))) {
517
          LOG_WARN("fail to check_all_meta_mem_released", K(ret), K(tenant_config));
518
        } else if (!is_released) {
519
          // can not release now. dump some debug info
520
          const uint64_t interval = 180 * 1000 * 1000; // 180s
521
          if (!is_released && REACH_TIME_INTERVAL(interval)) {
522
            MTL(ObTenantMetaMemMgr*)->dump_tablet_info();
523
            MTL(ObLSService *)->dump_ls_info();
524
            PRINT_OBJ_LEAK(MTL_ID(), share::LEAK_CHECK_OBJ_MAX_NUM);
525
          }
526
        } else {
527
          // check ls service is empty.
528
          is_released = MTL(ObLSService *)->is_empty();
529
        }
530

531
        bool is_tenant_snapshot_released = false;
532
        if (is_user_tenant(tenant_config.tenant_id_)) {
533
          const int64_t now_time = ObTimeUtility::current_time();
534
          const int64_t life_time = now_time - tenant_config.create_timestamp_;
535
          if (tenant_config.is_removed_ || life_time >= RECYCLE_LATENCY) {
536
            MTL(ObTenantSnapshotService*)->notify_unit_is_deleting();
537
          }
538
          if (OB_FAIL(MTL(ObTenantSnapshotService*)->
539
                check_all_tenant_snapshot_released(is_tenant_snapshot_released))) {
540
            LOG_WARN("fail to check_all_tenant_snapshot_released", K(ret), K(tenant_config));
541
          } else if (!is_tenant_snapshot_released) {
542
            // can not release now. dump some debug info
543
            const uint64_t interval = 180 * 1000 * 1000; // 180s
544
            if (!is_tenant_snapshot_released && REACH_TIME_INTERVAL(interval)) {
545
              MTL(ObTenantSnapshotService*)->dump_all_tenant_snapshot_info();
546
            }
547
            LOG_INFO("[DELETE_TENANT] tenant has been dropped, tenant snapshot is still waiting for gc",
548
                K(tenant_config));
549
          }
550
          if (OB_SUCC(ret)) {
551
            is_released = is_released && is_tenant_snapshot_released;
552
          } else {
553
            is_released = false;
554
          }
555
        }
556
      }
557

558
      if (OB_SUCC(ret)) {
559
        // remove local units after RECYCLE_LATENCY to avoid removing by mistake
560
        // but if marked removed, remove it directly without waiting
561
        const int64_t now_time = ObTimeUtility::current_time();
562
        const int64_t life_time = now_time - tenant_config.create_timestamp_;
563
        if ((!tenant_config.is_removed_ && life_time < RECYCLE_LATENCY) || !is_released) {
564
          if (OB_FAIL(tenants.push_back(tenant_config))) {
565
            LOG_WARN("failed to push back tenant", KR(ret));
566
          } else {
567
            // update tenant unit status which need be deleted
568
            // need wait gc in observer
569
            // NOTE: only update unit status when can not release resource
570
            if (!is_released) {
571
              tenants.at(tenants.count() - 1).unit_status_ = ObUnitInfoGetter::UNIT_WAIT_GC_IN_OBSERVER;
572
              // add a event when try to gc for the first time
573
              if (local_unit_status != ObUnitInfoGetter::ObUnitStatus::UNIT_WAIT_GC_IN_OBSERVER &&
574
                  local_unit_status != ObUnitInfoGetter::ObUnitStatus::UNIT_DELETING_IN_OBSERVER) {
575
                SERVER_EVENT_ADD("unit", "start unit gc", "tenant_id", tenant_config.tenant_id_,
576
                    "unit_id", tenant_config.unit_id_, "unit_status", "WAIT GC");
577
              }
578
            }
579

580
            LOG_INFO("[DELETE_TENANT] tenant has been dropped. can not delete tenant",
581
                K(is_released), "local_unit_status", ObUnitInfoGetter::get_unit_status_str(local_unit_status),
582
                "is_removed", tenant_config.is_removed_,
583
                "create_timestamp", tenant_config.create_timestamp_,
584
                K(life_time), K(tenant_config));
585
          }
586
        } else {
587
            LOG_INFO("[DELETE_TENANT] tenant has been dropped. can delete tenant",
588
                K(is_released), "local_unit_status", ObUnitInfoGetter::get_unit_status_str(local_unit_status),
589
                "is_removed", tenant_config.is_removed_,
590
                "create_timestamp", tenant_config.create_timestamp_,
591
                K(life_time), K(tenant_config));
592
        }
593
      }
594
    }
595
  }
596

597
  for (int64_t i = 0; OB_SUCC(ret) && i < tenants.count(); i++) {
598
    if (OB_FAIL(new_tenants.push_back(tenants.at(i)))) {
599
      LOG_WARN("failed to add new tenant", K(ret));
600
    }
601
  }
602

603
  return ret;
604
}
605

606
int ObTenantNodeBalancer::refresh_tenant(TenantUnits &units)
607
{
608
  int ret = OB_SUCCESS;
609

610
  TenantUnits local_units;
611
  if (OB_FAIL(omt_->get_tenant_units(local_units, false))) {
612
    LOG_WARN("failed to get local tenant units");
613
  } else if (OB_FAIL(fetch_effective_tenants(local_units, units))) {
614
    LOG_WARN("failed to fetch effective tenants", K(local_units));
615
  }
616

617
  if (OB_SUCC(ret)) {
618
    if (OB_FAIL(check_new_tenants(units))) {
619
      LOG_WARN("check and add new tenant fail", K(ret));
620
      ret = OB_SUCCESS; // just don't affect the following process in run1().
621
    } else {
622
      omt_->set_synced();
623
    }
624

625
    if (OB_FAIL(check_del_tenants(local_units, units))) { // overwrite ret
626
      LOG_WARN("check delete tenant fail", K(ret));
627
    }
628

629
    if (OB_FAIL(refresh_hidden_sys_memory())) { // overwrite ret
630
      LOG_WARN("refresh hidden sys memory failed", K(ret));
631
    }
632
  }
633

634
  return ret;
635
}
636

637
int ObTenantNodeBalancer::update_tenant_memory(const obrpc::ObTenantMemoryArg &tenant_memory)
638
{
639
  int ret = OB_SUCCESS;
640
  const int64_t tenant_id = tenant_memory.tenant_id_;
641
  const int64_t memory_size = tenant_memory.memory_size_;
642
  const int64_t refresh_interval = tenant_memory.refresh_interval_;
643

644
  ObUnitInfoGetter::ObTenantConfig unit;
645
  int64_t allowed_mem_limit = 0;
646

647
  TCWLockGuard guard(lock_);
648

649
  if (!tenant_memory.is_valid()) {
650
    ret = OB_INVALID_ARGUMENT;
651
    LOG_WARN("invalid argument", K(ret), K(tenant_memory));
652
  } else if (OB_FAIL(omt_->get_tenant_unit(tenant_id, unit))) {
653
    LOG_WARN("failed to get tenant config", K(ret), K(tenant_id));
654
  }
655

656
  if (OB_FAIL(ret)) {
657
    // do nothing
658
  } else if (OB_FAIL(omt_->update_tenant_memory(tenant_id, memory_size, allowed_mem_limit))) {
659
    LOG_WARN("failed to update tenant memory", K(ret), K(tenant_id), K(memory_size));
660
  } else if (OB_FAIL(omt_->update_tenant_freezer_mem_limit(tenant_id, unit.config_.memory_size(), allowed_mem_limit))) {
661
    LOG_WARN("set_tenant_freezer_mem_limit failed", K(ret), K(tenant_id));
662
  } else {
663
    refresh_interval_ = refresh_interval * 1000L * 1000L;
664
    LOG_INFO("succ to admin update tenant memory", K(tenant_id), K(memory_size));
665
  }
666

667
  return ret;
668
}
669

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.