oceanbase

Форк
0
/
ob_multi_tenant.cpp 
2569 строк · 100.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER_OMT
14

15
#include "ob_multi_tenant.h"
16

17
#include "lib/oblog/ob_log.h"
18
#include "lib/alloc/ob_malloc_allocator.h"
19
#include "lib/ob_running_mode.h"
20
#include "lib/file/file_directory_utils.h"
21
#include "lib/objectpool/ob_server_object_pool.h"
22
#include "share/ob_tenant_mgr.h"
23
#include "observer/ob_server.h"
24
#include "observer/ob_server_struct.h"
25
#include "share/resource_manager/ob_cgroup_ctrl.h"
26
#include "ob_tenant.h"
27
#include "rpc/ob_request.h"
28
#include "rpc/obmysql/ob_sql_nio_server.h"
29
#include "storage/tx/ob_ts_mgr.h"
30
#include "storage/ob_disk_usage_reporter.h"
31
#include "storage/slog/ob_storage_log.h"
32
#include "storage/slog/ob_storage_logger.h"
33
#include "share/schema/ob_tenant_schema_service.h"
34
#include "storage/slog/ob_storage_logger_manager.h"
35
#include "observer/mysql/ob_mysql_request_manager.h"
36
#include "observer/mysql/obsm_conn_callback.h"
37
#include "sql/dtl/ob_dtl_fc_server.h"
38
#include "sql/dtl/ob_dtl_interm_result_manager.h"
39
#include "sql/das/ob_das_id_service.h"
40
#include "sql/das/ob_data_access_service.h"
41
#include "sql/engine/ob_tenant_sql_memory_manager.h"
42
#include "sql/engine/px/ob_px_admission.h"
43
#include "share/ob_get_compat_mode.h"
44
#include "storage/tx/wrs/ob_tenant_weak_read_service.h"   // ObTenantWeakReadService
45
#include "share/allocator/ob_shared_memory_allocator_mgr.h"   // ObSharedMemAllocMgr
46
#include "share/allocator/ob_tenant_mutil_allocator.h"
47
#include "share/allocator/ob_tenant_mutil_allocator_mgr.h"
48
#include "share/stat/ob_opt_stat_monitor_manager.h"
49
#include "share/ob_global_autoinc_service.h"
50
#include "lib/thread/ob_thread_name.h"
51
#include "logservice/ob_log_service.h"
52
#include "logservice/archiveservice/ob_archive_service.h"    // ObArchiveService
53
#include "logservice/data_dictionary/ob_data_dict_service.h" // ObDataDictService
54
#include "ob_tenant_mtl_helper.h"
55
#include "storage/blocksstable/ob_decode_resource_pool.h"
56
#include "storage/ddl/ob_direct_insert_sstable_ctx_new.h"
57
#include "storage/multi_data_source/runtime_utility/mds_tenant_service.h"
58
#include "storage/tx_storage/ob_ls_service.h"
59
#include "storage/tx_storage/ob_access_service.h"
60
#include "storage/tx_storage/ob_tenant_freezer.h"
61
#include "storage/concurrency_control/ob_multi_version_garbage_collector.h"
62
#include "storage/tx/ob_xa_service.h"
63
#include "storage/tx/ob_tx_loop_worker.h"
64
#include "storage/tx/ob_timestamp_service.h"
65
#include "storage/tx/ob_standby_timestamp_service.h"
66
#include "storage/tx/ob_timestamp_access.h"
67
#include "storage/tx/ob_trans_id_service.h"
68
#include "storage/tx/ob_trans_service.h"
69
#include "storage/tx/ob_unique_id_service.h"
70
#include "storage/tx/ob_trans_part_ctx.h"
71
#include "storage/compaction/ob_tenant_tablet_scheduler.h"
72
#include "storage/compaction/ob_tenant_medium_checker.h"
73
#include "storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.h"
74
#include "share/scheduler/ob_tenant_dag_scheduler.h"
75
#include "storage/ob_file_system_router.h"
76
#include "storage/compaction/ob_tenant_freeze_info_mgr.h"
77
#include "storage/tx_storage/ob_checkpoint_service.h"
78
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
79
#include "storage/tx_storage/ob_tenant_memory_printer.h"
80
#include "storage/tx/ob_id_service.h"
81
#include "storage/compaction/ob_tenant_compaction_progress.h"
82
#include "storage/compaction/ob_server_compaction_event_history.h"
83
#include "storage/compaction/ob_compaction_tablet_diagnose.h"
84
#include "storage/compaction/ob_compaction_suggestion.h"
85
#include "storage/ob_tenant_tablet_stat_mgr.h"
86
#include "storage/compaction/ob_compaction_memory_pool.h"
87
#include "storage/memtable/ob_lock_wait_mgr.h"
88
#include "storage/slog_ckpt/ob_server_checkpoint_slog_handler.h"
89
#include "storage/tablelock/ob_table_lock_service.h"
90
#include "storage/ob_file_system_router.h"
91
#include "storage/compaction/ob_sstable_merge_info_mgr.h" // ObTenantSSTableMergeInfoMgr
92
#include "share/scheduler/ob_dag_warning_history_mgr.h"
93
#include "storage/compaction/ob_compaction_diagnose.h"
94
#include "storage/access/ob_table_scan_iterator.h"
95
#include "share/scheduler/ob_dag_warning_history_mgr.h"
96
#include "storage/compaction/ob_compaction_diagnose.h"
97
#include "share/io/ob_io_manager.h"
98
#include "share/ob_ddl_sim_point.h"
99
#include "rootserver/freeze/ob_major_freeze_service.h"
100
#include "observer/omt/ob_tenant_config_mgr.h"
101
#include "observer/omt/ob_tenant_srs.h"
102
#include "observer/report/ob_tenant_meta_checker.h"
103
#include "observer/report/ob_tablet_table_updater.h"
104
#include "storage/high_availability/ob_storage_ha_service.h"
105
#include "rootserver/ob_tenant_info_loader.h"//ObTenantInfoLoader
106
#include "rootserver/ob_tenant_balance_service.h"//ObTenantBalanceService
107
#include "rootserver/ob_ls_recovery_reportor.h"//ObLSRecoveryReportor
108
#include "rootserver/ob_standby_schema_refresh_trigger.h"//ObStandbySchemaRefreshTrigger
109
#include "rootserver/ob_tenant_info_loader.h"//ObTenantInfoLoader
110
#include "rootserver/ob_create_standby_from_net_actor.h" // ObCreateStandbyFromNetActor
111
#include "rootserver/ob_primary_ls_service.h"//ObLSService
112
#include "rootserver/ob_recovery_ls_service.h"//ObRecoveryLSService
113
#include "rootserver/ob_common_ls_service.h"//ObCommonLSService
114
#include "rootserver/restore/ob_restore_service.h" //ObRestoreService
115
#include "rootserver/ob_tenant_transfer_service.h" // ObTenantTransferService
116
#include "rootserver/ob_balance_task_execute_service.h" //ObBalanceTaskExecuteService
117
#include "rootserver/backup/ob_backup_service.h" //ObBackupDataService and ObBackupCleanService
118
#include "rootserver/backup/ob_backup_task_scheduler.h" // ObBackupTaskScheduler
119
#include "rootserver/backup/ob_archive_scheduler_service.h" // ObArchiveSchedulerService
120
#include "logservice/leader_coordinator/ob_leader_coordinator.h"
121
#include "storage/lob/ob_lob_manager.h"
122
#include "share/deadlock/ob_deadlock_detector_mgr.h"
123
#include "rootserver/tenant_snapshot/ob_tenant_snapshot_scheduler.h"
124
#include "rootserver/restore/ob_clone_scheduler.h"
125
#ifdef OB_BUILD_SPM
126
#include "sql/spm/ob_plan_baseline_mgr.h"
127
#endif
128
#ifdef OB_BUILD_ARBITRATION
129
#include "rootserver/ob_arbitration_service.h"
130
#endif
131
#ifdef OB_BUILD_DBLINK
132
#include "lib/oracleclient/ob_oci_environment.h"
133
#endif
134
#include "lib/mysqlclient/ob_tenant_oci_envs.h"
135
#include "sql/udr/ob_udr_mgr.h"
136
#include "storage/blocksstable/ob_shared_macro_block_manager.h"
137
#include "storage/tx_storage/ob_tablet_gc_service.h"
138
#include "share/ob_occam_time_guard.h"
139
#include "storage/high_availability/ob_transfer_service.h"
140
#include "storage/high_availability/ob_rebuild_service.h"
141
#include "observer/table_load/ob_table_load_service.h"
142
#include "sql/plan_cache/ob_plan_cache.h"
143
#include "sql/plan_cache/ob_ps_cache.h"
144
#include "rootserver/ob_rs_event_history_table_operator.h"
145
#include "rootserver/ob_heartbeat_service.h"
146
#include "share/detect/ob_detect_manager.h"
147
#include "storage/access/ob_empty_read_bucket.h"
148
#include "observer/table/ttl/ob_ttl_service.h"
149
#include "sql/dtl/ob_dtl_interm_result_manager.h"
150
#include "storage/tablet/ob_tablet_memtable_mgr.h"
151
#ifdef ERRSIM
152
#include "share/errsim_module/ob_tenant_errsim_module_mgr.h"
153
#include "share/errsim_module/ob_tenant_errsim_event_mgr.h"
154
#endif
155
#include "observer/table/ob_htable_lock_mgr.h"
156
#include "observer/table/ob_table_session_pool.h"
157
#include "observer/ob_server_event_history_table_operator.h"
158
#include "storage/tenant_snapshot/ob_tenant_snapshot_service.h"
159
#include "share/index_usage/ob_index_usage_info_mgr.h"
160
#include "rootserver/mview/ob_mview_maintenance_service.h"
161

162
using namespace oceanbase;
163
using namespace oceanbase::lib;
164
using namespace oceanbase::common;
165
using namespace oceanbase::omt;
166
using namespace oceanbase::rpc;
167
using namespace oceanbase::share;
168
using namespace oceanbase::storage;
169
using namespace oceanbase::storage::checkpoint;
170
using namespace oceanbase::obmysql;
171
using namespace oceanbase::sql;
172
using namespace oceanbase::sql::dtl;
173
using namespace oceanbase::concurrency_control;
174
using namespace oceanbase::transaction;
175
using namespace oceanbase::transaction::tablelock;
176
using namespace oceanbase::logservice;
177
using namespace oceanbase::archive;
178
using namespace oceanbase::observer;
179
using namespace oceanbase::rootserver;
180
using namespace oceanbase::blocksstable;
181

182
#define OB_TENANT_LOCK_BUCKET_NUM 10000L
183

184
namespace oceanbase
185
{
186
namespace share
187
{
188
// Declared in share/ob_context.h
189
// Obtain tenant_ctx according to tenant_id (obtained from omt)
190
int __attribute__ ((weak)) get_tenant_ctx_with_tenant_lock(const uint64_t tenant_id,
191
                                                           ObLDHandle &handle,
192
                                                           ObTenantSpace *&tenant_ctx)
193
{
194
  int ret = OB_SUCCESS;
195
  tenant_ctx = nullptr;
196

197
  omt::ObTenant *tenant = nullptr;
198
  if (OB_ISNULL(GCTX.omt_)) {
199
    ret = OB_ERR_UNEXPECTED;
200
    LOG_WARN("null ptr", K(ret));
201
  } else if (OB_FAIL(GCTX.omt_->get_tenant_with_tenant_lock(tenant_id, handle, tenant))) {
202
    if (REACH_TIME_INTERVAL(1000 * 1000)) {
203
      LOG_WARN("get tenant from omt failed", K(ret), K(tenant_id));
204
    }
205
  } else if (OB_ISNULL(tenant)) {
206
    ret = OB_ERR_UNEXPECTED;
207
    LOG_WARN("null ptr", K(ret), K(tenant_id));
208
  } else {
209
    tenant_ctx = &tenant->ctx();;
210
  }
211

212
  return ret;
213
}
214

215
int __attribute__ ((weak)) get_tenant_base_with_lock(
216
    uint64_t tenant_id, ObLDHandle &handle, ObTenantBase *&tenant_base, ReleaseCbFunc &release_cb)
217
{
218
  int ret = OB_SUCCESS;
219
  omt::ObTenant *tenant = nullptr;
220
  if (OB_ISNULL(GCTX.omt_)) {
221
    ret = OB_ERR_UNEXPECTED;
222
    LOG_WARN("null ptr", K(ret));
223
  } else if (OB_FAIL(GCTX.omt_->get_tenant_with_tenant_lock(tenant_id, handle, tenant))) {
224
    if (REACH_TIME_INTERVAL(1000 * 1000)) {
225
      LOG_WARN("get tenant from omt failed", K(ret), K(tenant_id));
226
    }
227
  } else if (OB_ISNULL(tenant)) {
228
    ret = OB_ERR_UNEXPECTED;
229
    LOG_WARN("null ptr", K(ret), K(tenant_id));
230
  } else {
231
    tenant_base = static_cast<ObTenantBase*>(tenant);
232
    release_cb = [tenant] (ObLDHandle &h) {
233
      return tenant->unlock(h);
234
    };
235
  }
236
  return ret;
237
}
238
} // end of namespace share
239
} // end of namespace oceanbase
240

241
bool compare_tenant(const ObTenant *lhs,
242
                    const ObTenant *rhs)
243
{
244
  return lhs->id() < rhs->id();
245
}
246

247
bool equal_tenant(const ObTenant *lhs,
248
                  const ObTenant *rhs)
249
{
250
  return lhs->id() == rhs->id();
251
}
252

253
bool compare_with_tenant_id(const ObTenant *lhs,
254
                            const uint64_t &tenant_id)
255
{
256
  return NULL != lhs ? (lhs->id() < tenant_id) : false;
257
}
258

259
bool equal_with_tenant_id(const ObTenant *lhs,
260
                          const uint64_t &tenant_id)
261
{
262
  return NULL != lhs ? (lhs->id() == tenant_id) : false;
263
}
264

265
int ObCtxMemConfigGetter::get(int64_t tenant_id, int64_t tenant_limit, common::ObIArray<ObCtxMemConfig> &configs)
266
{
267
  int64_t ret = OB_SUCCESS;
268
  if (tenant_id > OB_USER_TENANT_ID) {
269
    ObCtxMemConfig cfg;
270
    cfg.ctx_id_ = ObCtxIds::WORK_AREA;
271
    cfg.idle_size_ = 0;
272
    cfg.limit_ = 5 * tenant_limit / 100;
273
    ret = configs.push_back(cfg);
274
  }
275
  return ret;
276
}
277

278
ObCtxMemConfigGetter g_default_mcg;
279
ObICtxMemConfigGetter *ObMultiTenant::mcg_ = &g_default_mcg;
280

281
ObMultiTenant::ObMultiTenant()
282
    : is_inited_(false),
283
      server_slogger_(nullptr),
284
      bucket_lock_(),
285
      lock_(ObLatchIds::MULTI_TENANT_LOCK),
286
      tenants_(0, nullptr, ObModIds::OMT),
287
      balancer_(nullptr),
288
      myaddr_(),
289
      cpu_dump_(false),
290
      has_synced_(false)
291
{
292
}
293

294
static int init_compat_mode(lib::Worker::CompatMode &compat_mode)
295
{
296
  int ret = OB_SUCCESS;
297

298
  ObTenant *tenant = static_cast<ObTenant*>(share::ObTenantEnv::get_tenant());
299
  const uint64_t tenant_id = MTL_ID();
300

301
  if (is_virtual_tenant_id(tenant_id) || OB_SYS_TENANT_ID == tenant_id) {
302
    compat_mode = lib::Worker::CompatMode::MYSQL;
303
  } else {
304
    compat_mode = tenant->get_compat_mode();
305
    if (lib::Worker::CompatMode::INVALID == compat_mode) {
306
      ret = OB_ERR_UNEXPECTED;
307
      LOG_ERROR("get compat mode failed", K(ret));
308
    }
309
  }
310
  LOG_INFO("finish init compatibility mode", K(tenant_id), K(compat_mode));
311
  return ret;
312
}
313

314
static int start_sql_nio_server(ObSqlNioServer *&sql_nio_server)
315
{
316
  int ret = OB_SUCCESS;
317
  const uint64_t tenant_id = MTL_ID();
318
  ObSrvNetworkFrame *net_frame = GCTX.net_frame_;
319
  if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) {
320
    sql_nio_server = OB_NEW(obmysql::ObSqlNioServer, "SqlNio",
321
                            obmysql::global_sm_conn_callback,
322
                            net_frame->get_mysql_handler(), tenant_id);
323
    if (OB_ISNULL(sql_nio_server)) {
324
      ret = OB_ALLOCATE_MEMORY_FAILED;
325
      LOG_ERROR("fail to new sql_nio_server", K(ret));
326
    } else {
327
      int net_thread_count = 0;
328
      omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
329
      if (tenant_config.is_valid()) {
330
        net_thread_count = tenant_config->tenant_sql_net_thread_count;
331
      }
332
      if (0 == net_thread_count) {
333
        ObTenantBase *tenant = MTL_CTX();
334
        net_thread_count = tenant ? std::max((int)tenant->unit_min_cpu(), 1) : 1;
335
      }
336
      sql_nio_server->get_nio()->set_run_wrapper(MTL_CTX());
337
      if (OB_FAIL(sql_nio_server->start(-1, &net_frame->get_deliver(),
338
                                        net_thread_count))) {
339
        LOG_WARN("sql nio server start failed", K(ret));
340
      } else {
341
        LOG_INFO("tenant sql_nio_server mtl_start success", K(ret),
342
                 K(tenant_id), K(net_thread_count));
343
      }
344
    }
345
  }
346
  return ret;
347
}
348

349
template<typename T>
350
static int server_obj_pool_mtl_new(common::ObServerObjectPool<T> *&pool)
351
{
352
  int ret = common::OB_SUCCESS;
353
  uint64_t tenant_id = MTL_ID();
354
  pool = MTL_NEW(common::ObServerObjectPool<T>, "TntSrvObjPool", tenant_id, false/*regist*/,
355
                 MTL_IS_MINI_MODE(), MTL_CPU_COUNT());
356
  if (OB_ISNULL(pool)) {
357
    ret = common::OB_ALLOCATE_MEMORY_FAILED;
358
  } else {
359
    ret = pool->init();
360
  }
361
  return ret;
362
}
363

364
template<typename T>
365
static void server_obj_pool_mtl_destroy(common::ObServerObjectPool<T> *&pool)
366
{
367
  using Pool = common::ObServerObjectPool<T>;
368
  MTL_DELETE(Pool, "TntSrvObjPool", pool);
369
  pool = nullptr;
370
}
371

372
static int start_mysql_queue(QueueThread *&qthread)
373
{
374
  int ret = OB_SUCCESS;
375
  const uint64_t tenant_id = MTL_ID();
376
  if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) {
377
    qthread = OB_NEW(QueueThread, ObMemAttr(tenant_id, ObModIds::OB_RPC), "MysqlQueueTh", tenant_id);
378
    if (OB_ISNULL(qthread)) {
379
      ret = OB_ALLOCATE_MEMORY_FAILED;
380
      LOG_WARN("fail to new qthread", K(ret), K(tenant_id));
381
    } else if (OB_FAIL(qthread->init())) {
382
      LOG_WARN("init qthread failed", K(tenant_id), K(ret));
383
    } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::MysqlQueueTh,
384
                                        qthread->tg_id_))) {
385
      LOG_WARN("mysql queue init failed", K(ret), K(tenant_id),
386
               K(qthread->tg_id_));
387
    } else {
388
      qthread->queue_.set_qhandler(&GCTX.net_frame_->get_deliver().get_qhandler());
389

390
      int sql_thread_count = 0;
391
      omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
392
      if (tenant_config.is_valid()) {
393
        sql_thread_count = tenant_config->tenant_sql_login_thread_count;
394
      }
395
      if (0 == sql_thread_count) {
396
        ObTenantBase *tenant = MTL_CTX();
397
        sql_thread_count = tenant ? std::max((int)tenant->unit_min_cpu(), 1) : 1;
398
      }
399

400
      if (OB_FAIL(TG_SET_RUNNABLE(qthread->tg_id_, qthread->thread_))) {
401
        LOG_WARN("fail to set runnable", K(ret), K(tenant_id), K(qthread->tg_id_));
402
      } else if (OB_FAIL(qthread->set_thread_count(sql_thread_count))) {
403
        LOG_WARN("fail to set thread count", K(ret), K(tenant_id), K(qthread->tg_id_));
404
      } else if(OB_FAIL(TG_START(qthread->tg_id_))) {
405
        LOG_ERROR("fail to start qthread", K(ret), K(tenant_id), K(qthread->tg_id_));
406
      } else {
407
        LOG_INFO("tenant mysql_queue mtl_start success", K(ret),
408
                  K(tenant_id), K(qthread->tg_id_), K(sql_thread_count));
409
      }
410
    }
411
  }
412
  return ret;
413
}
414

415
int ObMultiTenant::init(ObAddr myaddr,
416
                        ObMySQLProxy *sql_proxy,
417
                        bool mtl_bind_flag)
418
{
419
  int ret = OB_SUCCESS;
420

421
  if (is_inited_) {
422
    ret = OB_INIT_TWICE;
423
    LOG_WARN("ObMultiTenant has been inited", K(ret));
424
  } else if (OB_FAIL(SLOGGERMGR.get_server_slogger(server_slogger_))) {
425
    LOG_WARN("fail to get server slogger", K(ret));
426
  } else if (OB_FAIL(bucket_lock_.init(OB_TENANT_LOCK_BUCKET_NUM))) {
427
    LOG_WARN("fail to init bucket lock", K(ret));
428
  } else {
429
    myaddr_ = myaddr;
430
    if (NULL != sql_proxy) {
431
      if (OB_FAIL(ObTenantNodeBalancer::get_instance().init(this, *sql_proxy, myaddr))) {
432

433
        LOG_WARN("failed to init tenant node balancer", K(ret));
434
      }
435
    } else {
436
      // unset sql_proxy to disable quota balance among nodes
437
    }
438
  }
439

440
  if (OB_SUCC(ret) && mtl_bind_flag) {
441
    MTL_BIND2(ObTenantIOManager::mtl_new, ObTenantIOManager::mtl_init, mtl_start_default, mtl_stop_default, nullptr, ObTenantIOManager::mtl_destroy);
442

443
    // base mtl
444
    MTL_BIND2(mtl_new_default, storage::mds::ObTenantMdsService::mtl_init, storage::mds::ObTenantMdsService::mtl_start, storage::mds::ObTenantMdsService::mtl_stop, storage::mds::ObTenantMdsService::mtl_wait, mtl_destroy_default);
445
    MTL_BIND2(mtl_new_default, ObStorageLogger::mtl_init, ObStorageLogger::mtl_start, ObStorageLogger::mtl_stop, ObStorageLogger::mtl_wait, mtl_destroy_default);
446
    MTL_BIND2(ObTenantMetaMemMgr::mtl_new, mtl_init_default, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
447
    MTL_BIND2(mtl_new_default, share::ObSharedMemAllocMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
448
    MTL_BIND2(mtl_new_default, ObTransService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
449
    MTL_BIND2(mtl_new_default, ObLogService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, ObLogService::mtl_destroy);
450
    MTL_BIND2(mtl_new_default, logservice::ObGarbageCollector::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
451
    MTL_BIND2(mtl_new_default, ObLSService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
452
    MTL_BIND2(mtl_new_default, ObTenantCheckpointSlogHandler::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
453

454
    // other mtl
455
    MTL_BIND2(mtl_new_default, ObArchiveService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
456
    MTL_BIND2(mtl_new_default, datadict::ObDataDictService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
457
    MTL_BIND2(mtl_new_default, compaction::ObTenantTabletScheduler::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
458
    MTL_BIND2(mtl_new_default, compaction::ObTenantMediumChecker::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
459
    MTL_BIND2(mtl_new_default, ObTabletTableUpdater::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
460
    MTL_BIND2(mtl_new_default, ObTenantDagScheduler::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
461
    MTL_BIND2(mtl_new_default, ObTenantFreezeInfoMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
462
    MTL_BIND2(mtl_new_default, ObTxLoopWorker::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); // ObTxLoopWorker
463
    MTL_BIND2(mtl_new_default, compaction::ObTenantCompactionProgressMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
464
    MTL_BIND2(mtl_new_default, compaction::ObServerCompactionEventHistory::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
465
    MTL_BIND2(mtl_new_default, storage::ObTenantTabletStatMgr::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
466
    MTL_BIND2(mtl_new_default, storage::ObTenantCompactionMemPool::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
467
    MTL_BIND2(mtl_new_default, storage::ObTenantSSTableMergeInfoMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
468
    MTL_BIND2(mtl_new_default, share::ObDagWarningHistoryManager::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
469
    MTL_BIND2(mtl_new_default, compaction::ObScheduleSuspectInfoMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
470
    MTL_BIND2(mtl_new_default, compaction::ObCompactionSuggestionMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
471
    MTL_BIND2(mtl_new_default, compaction::ObDiagnoseTabletMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
472
    MTL_BIND2(mtl_new_default, memtable::ObLockWaitMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
473
    MTL_BIND2(mtl_new_default, ObTableLockService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
474
    MTL_BIND2(mtl_new_default, rootserver::ObPrimaryMajorFreezeService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
475
    MTL_BIND2(mtl_new_default, rootserver::ObRestoreMajorFreezeService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
476
    MTL_BIND2(mtl_new_default, ObTenantMetaChecker::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
477
    MTL_BIND2(mtl_new_default, rootserver::ObLSRecoveryReportor::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
478
    MTL_BIND2(mtl_new_default, rootserver::ObStandbySchemaRefreshTrigger::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
479
    MTL_BIND2(mtl_new_default, rootserver::ObTenantInfoLoader::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
480
    MTL_BIND2(mtl_new_default, rootserver::ObCreateStandbyFromNetActor::mtl_init, nullptr, rootserver::ObCreateStandbyFromNetActor::mtl_stop, rootserver::ObCreateStandbyFromNetActor::mtl_wait, mtl_destroy_default);
481
    MTL_BIND2(mtl_new_default, rootserver::ObPrimaryLSService::mtl_init, nullptr, rootserver::ObPrimaryLSService::mtl_stop, rootserver::ObPrimaryLSService::mtl_wait, mtl_destroy_default);
482
    MTL_BIND2(mtl_new_default, rootserver::ObCommonLSService::mtl_init, nullptr, rootserver::ObCommonLSService::mtl_stop, rootserver::ObCommonLSService::mtl_wait, mtl_destroy_default);
483
#ifdef OB_BUILD_ARBITRATION
484
    MTL_BIND2(mtl_new_default, rootserver::ObArbitrationService::mtl_init, mtl_start_default, rootserver::ObArbitrationService::mtl_stop, rootserver::ObArbitrationService::mtl_wait, mtl_destroy_default);
485
#endif
486
    MTL_BIND2(mtl_new_default, rootserver::ObBalanceTaskExecuteService::mtl_init, nullptr, rootserver::ObBalanceTaskExecuteService::mtl_stop, rootserver::ObBalanceTaskExecuteService::mtl_wait, mtl_destroy_default);
487
    MTL_BIND2(mtl_new_default, rootserver::ObTenantBalanceService::mtl_init, nullptr, rootserver::ObTenantBalanceService::mtl_stop, rootserver::ObTenantBalanceService::mtl_wait, mtl_destroy_default);
488
    MTL_BIND2(mtl_new_default, rootserver::ObRecoveryLSService::mtl_init, nullptr, rootserver::ObRecoveryLSService::mtl_stop, rootserver::ObRecoveryLSService::mtl_wait, mtl_destroy_default);
489
    MTL_BIND2(mtl_new_default, rootserver::ObRestoreService::mtl_init, nullptr, rootserver::ObRestoreService::mtl_stop, rootserver::ObRestoreService::mtl_wait, mtl_destroy_default);
490
    MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, coordinator::ObLeaderCoordinator::mtl_start, coordinator::ObLeaderCoordinator::mtl_stop, coordinator::ObLeaderCoordinator::mtl_wait, mtl_destroy_default);
491
    MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, coordinator::ObFailureDetector::mtl_start, coordinator::ObFailureDetector::mtl_stop, coordinator::ObFailureDetector::mtl_wait, mtl_destroy_default);
492
    MTL_BIND2(ObLobManager::mtl_new, mtl_init_default, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
493
    MTL_BIND2(mtl_new_default, ObStorageHAService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
494
    MTL_BIND2(mtl_new_default, rootserver::ObBackupTaskScheduler::mtl_init, nullptr, rootserver::ObBackupTaskScheduler::mtl_stop, rootserver::ObBackupTaskScheduler::mtl_wait, mtl_destroy_default);
495
    MTL_BIND2(mtl_new_default, rootserver::ObBackupDataService::mtl_init, nullptr, rootserver::ObBackupDataService::mtl_stop, rootserver::ObBackupDataService::mtl_wait, mtl_destroy_default);
496
    MTL_BIND2(mtl_new_default, rootserver::ObBackupCleanService::mtl_init, nullptr, rootserver::ObBackupCleanService::mtl_stop, rootserver::ObBackupCleanService::mtl_wait, mtl_destroy_default);
497
    MTL_BIND2(mtl_new_default, rootserver::ObArchiveSchedulerService::mtl_init, nullptr, rootserver::ObArchiveSchedulerService::mtl_stop, rootserver::ObArchiveSchedulerService::mtl_wait, mtl_destroy_default);
498
    MTL_BIND2(mtl_new_default, ObGlobalAutoIncService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
499
    MTL_BIND2(mtl_new_default, share::detector::ObDeadLockDetectorMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
500
    MTL_BIND2(mtl_new_default, rootserver::ObTenantSnapshotScheduler::mtl_init, nullptr, rootserver::ObTenantSnapshotScheduler::mtl_stop, rootserver::ObTenantSnapshotScheduler::mtl_wait, mtl_destroy_default);
501
    MTL_BIND2(mtl_new_default, rootserver::ObCloneScheduler::mtl_init, nullptr, rootserver::ObCloneScheduler::mtl_stop, rootserver::ObCloneScheduler::mtl_wait, mtl_destroy_default);
502
#ifdef OB_BUILD_ARBITRATION
503
    MTL_BIND2(mtl_new_default, ObPlanBaselineMgr::mtl_init, nullptr, ObPlanBaselineMgr::mtl_stop, ObPlanBaselineMgr::mtl_wait, mtl_destroy_default);
504
#endif
505
    MTL_BIND2(mtl_new_default, ObTenantSchemaService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
506
    MTL_BIND2(mtl_new_default, ObTimestampService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
507
    MTL_BIND2(mtl_new_default, ObStandbyTimestampService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
508
    MTL_BIND2(mtl_new_default, ObTimestampAccess::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
509
    MTL_BIND2(mtl_new_default, ObTransIDService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
510
    MTL_BIND2(mtl_new_default, ObUniqueIDService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
511
    MTL_BIND2(mtl_new_default, ObXAService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
512
    MTL_BIND2(mtl_new_default, ObTabletGCService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
513
    MTL_BIND2(mtl_new_default, ObTenantFreezer::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
514
    MTL_BIND2(mtl_new_default, ObDataAccessService::mtl_init, nullptr, nullptr, nullptr, ObDataAccessService::mtl_destroy);
515
    MTL_BIND2(mtl_new_default, ObDASIDService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
516
    MTL_BIND2(mtl_new_default, ObAccessService::mtl_init, nullptr, mtl_stop_default, nullptr, mtl_destroy_default);
517
    MTL_BIND2(mtl_new_default, ObCheckPointService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
518
    MTL_BIND2(mtl_new_default, ObTransferService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
519
    MTL_BIND2(mtl_new_default, rootserver::ObTenantTransferService::mtl_init, nullptr, rootserver::ObTenantTransferService::mtl_stop, rootserver::ObTenantTransferService::mtl_wait, mtl_destroy_default);
520
    MTL_BIND2(mtl_new_default, ObRebuildService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
521
    MTL_BIND2(mtl_new_default, ObMultiVersionGarbageCollector::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
522
    MTL_BIND2(mtl_new_default, ObUDRMgr::mtl_init, nullptr, ObUDRMgr::mtl_stop, nullptr, mtl_destroy_default);
523
    MTL_BIND2(mtl_new_default, ObTenantCGReadInfoMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
524
    MTL_BIND2(mtl_new_default, ObDecodeResourcePool::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
525
    MTL_BIND2(mtl_new_default, ObPxPools::mtl_init, nullptr, ObPxPools::mtl_stop, nullptr, ObPxPools::mtl_destroy);
526
    MTL_BIND2(ObTenantDfc::mtl_new, ObTenantDfc::mtl_init, nullptr, nullptr, nullptr, ObTenantDfc::mtl_destroy);
527
    MTL_BIND2(nullptr, init_compat_mode, nullptr, nullptr, nullptr, nullptr);
528
    MTL_BIND2(ObMySQLRequestManager::mtl_new, ObMySQLRequestManager::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, ObMySQLRequestManager::mtl_destroy);
529
    MTL_BIND2(mtl_new_default, ObTenantWeakReadService::mtl_init, mtl_start_default,
530
              mtl_stop_default,
531
              mtl_wait_default,
532
              mtl_destroy_default);
533
    //MTL_BIND2(ObTransAuditRecordMgr::mtl_init, ObTransAuditRecordMgr::mtl_destroy);
534
    MTL_BIND2(ObTenantSqlMemoryManager::mtl_new, ObTenantSqlMemoryManager::mtl_init, nullptr, nullptr, nullptr, ObTenantSqlMemoryManager::mtl_destroy);
535
    MTL_BIND2(mtl_new_default, ObPlanMonitorNodeList::mtl_init, nullptr, nullptr, nullptr, ObPlanMonitorNodeList::mtl_destroy);
536
    MTL_BIND2(mtl_new_default, ObTableLoadService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
537
    MTL_BIND2(mtl_new_default, ObSharedMacroBlockMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
538
    MTL_BIND2(mtl_new_default, ObFLTSpanMgr::mtl_init, nullptr, nullptr, nullptr, ObFLTSpanMgr::mtl_destroy);
539
    MTL_BIND2(common::sqlclient::ObTenantOciEnvs::mtl_new, common::sqlclient::ObTenantOciEnvs::mtl_init,
540
        nullptr, nullptr, nullptr, common::sqlclient::ObTenantOciEnvs::mtl_destroy);
541
    MTL_BIND2(mtl_new_default, ObPlanCache::mtl_init, nullptr, ObPlanCache::mtl_stop, nullptr, mtl_destroy_default);
542
    MTL_BIND2(mtl_new_default, ObPsCache::mtl_init, nullptr, ObPsCache::mtl_stop, nullptr, mtl_destroy_default);
543
    MTL_BIND2(server_obj_pool_mtl_new<ObPartTransCtx>, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy<ObPartTransCtx>);
544
    MTL_BIND2(server_obj_pool_mtl_new<ObTableScanIterator>, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy<ObTableScanIterator>);
545
    MTL_BIND2(mtl_new_default, ObTenantDirectLoadMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
546
    MTL_BIND2(ObDetectManager::mtl_new, ObDetectManager::mtl_init, nullptr, nullptr, nullptr, ObDetectManager::mtl_destroy);
547
    MTL_BIND2(ObTenantSQLSessionMgr::mtl_new, ObTenantSQLSessionMgr::mtl_init, nullptr, nullptr, nullptr, ObTenantSQLSessionMgr::mtl_destroy);
548
    MTL_BIND2(mtl_new_default, ObDTLIntermResultManager::mtl_init, ObDTLIntermResultManager::mtl_start,
549
    ObDTLIntermResultManager::mtl_stop, ObDTLIntermResultManager::mtl_wait, ObDTLIntermResultManager::mtl_destroy);
550
    if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) {
551
      MTL_BIND2(nullptr, nullptr, start_mysql_queue, mtl_stop_default,
552
                mtl_wait_default, mtl_destroy_default);
553
      // MTL_BIND2(nullptr, nullptr, start_sql_nio_server, mtl_stop_default,
554
      //           mtl_wait_default, mtl_destroy_default);
555
    }
556
    MTL_BIND2(mtl_new_default, rootserver::ObHeartbeatService::mtl_init, nullptr, rootserver::ObHeartbeatService::mtl_stop, rootserver::ObHeartbeatService::mtl_wait, mtl_destroy_default);
557
    MTL_BIND2(mtl_new_default, table::ObTTLService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
558
    MTL_BIND2(mtl_new_default, ObEmptyReadBucket::mtl_init, nullptr, nullptr, nullptr, ObEmptyReadBucket::mtl_destroy);
559

560
#ifdef ERRSIM
561
    MTL_BIND2(mtl_new_default, ObTenantErrsimModuleMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
562
    MTL_BIND2(mtl_new_default, ObTenantErrsimEventMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
563
#endif
564

565
    MTL_BIND2(mtl_new_default, table::ObHTableLockMgr::mtl_init, nullptr, nullptr, nullptr, table::ObHTableLockMgr::mtl_destroy);
566
    MTL_BIND2(mtl_new_default, ObSharedTimer::mtl_init, ObSharedTimer::mtl_start, ObSharedTimer::mtl_stop, ObSharedTimer::mtl_wait, mtl_destroy_default);
567
    MTL_BIND2(mtl_new_default, ObOptStatMonitorManager::mtl_init, ObOptStatMonitorManager::mtl_start, ObOptStatMonitorManager::mtl_stop, ObOptStatMonitorManager::mtl_wait, mtl_destroy_default);
568
    MTL_BIND2(mtl_new_default, ObTenantSrs::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
569
    MTL_BIND2(mtl_new_default, table::ObTableApiSessPoolMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
570
    MTL_BIND2(mtl_new_default, ObTenantSnapshotService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
571
    MTL_BIND2(mtl_new_default, ObIndexUsageInfoMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
572
    MTL_BIND2(mtl_new_default, storage::ObTabletMemtableMgrPool::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
573
    MTL_BIND2(mtl_new_default, rootserver::ObMViewMaintenanceService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
574
  }
575

576
  if (OB_SUCC(ret)) {
577
    is_inited_ = true;
578
    LOG_INFO("succ to init multi tenant");
579
  }
580
  return ret;
581
}
582

583
int ObMultiTenant::start()
584
{
585
  int ret = OB_SUCCESS;
586

587
  ObTenantMemoryPrinter &printer = ObTenantMemoryPrinter::get_instance();
588

589
  if (IS_NOT_INIT) {
590
    ret = OB_NOT_INIT;
591
    LOG_WARN("not init", K(ret));
592
  } else if (OB_FAIL(create_virtual_tenants())) {
593
    LOG_ERROR("create virtual tenants failed", K(ret));
594
  } else if (OB_FAIL(ObThreadPool::start())) {
595
    LOG_ERROR("start multi tenant thread fail", K(ret));
596
  } else if (OB_FAIL(ObTenantNodeBalancer::get_instance().start())) {
597
    LOG_ERROR("start tenant node balancer thread failed", K(ret));
598
  // start memstore print timer.
599
  } else if (OB_FAIL(printer.register_timer_task(lib::TGDefIDs::MemDumpTimer))) {
600
    LOG_ERROR("Fail to register timer task", K(ret));
601
  } else {
602
    LOG_INFO("succ to start multi tenant");
603
  }
604

605

606
  if (OB_FAIL(ret)) {
607
    stop();
608
  }
609
  return ret;
610
}
611

612
void ObMultiTenant::stop()
613
{
614
  // Stop balancer so that tenants' quota will fixed. It's not
615
  // necessary to put ahead, but it isn't harmful and can exclude
616
  // affection for balancer.
617
  ObTenantNodeBalancer::get_instance().stop();
618
  // Stop workers of all tenants thus no request of tenant would be
619
  // processed any more. All tenants will be removed indeed.
620
  {
621
    TenantIdList ids;
622
    ids.set_label(ObModIds::OMT);
623
    get_tenant_ids(ids);
624
    bool lock_succ = false;
625
    while (ids.size() > 0) {
626
      LOG_INFO("there're some tenants need destroy", "count", ids.size());
627

628
      for (TenantIdList::iterator it = ids.begin(); it != ids.end(); it++) {
629
        uint64_t id = *it;
630
        remove_tenant(id, lock_succ);
631
      }
632
      get_tenant_ids(ids);
633
    }
634
  }
635
  // No tenant exist right now, so we just stop the scheduler.
636
  ObThreadPool::stop();
637
}
638

639
void ObMultiTenant::wait()
640
{
641
  ObTenantNodeBalancer::get_instance().wait();
642
  ObThreadPool::wait();
643
}
644

645

646
void ObMultiTenant::destroy()
647
{
648
  {
649
    SpinWLockGuard guard(lock_);
650
    tenants_.clear();
651
    is_inited_ = false;
652
  }
653
}
654

655
int ObMultiTenant::construct_meta_for_hidden_sys(ObTenantMeta &meta)
656
{
657
  int ret = OB_SUCCESS;
658

659
  const uint64_t tenant_id = OB_SYS_TENANT_ID;
660
  ObTenantSuperBlock super_block(tenant_id, true/*is_hidden*/);
661
  share::ObUnitInfoGetter::ObTenantConfig unit;
662
  const bool has_memstore = true;
663
  const int64_t create_timestamp = ObTimeUtility::current_time();
664
  uint64_t unit_id = 1000;
665

666
  share::ObUnitConfig unit_config;
667
  const bool is_hidden_sys = true;
668
  if (OB_FAIL(unit_config.gen_sys_tenant_unit_config(is_hidden_sys))) {
669
    LOG_WARN("gen sys tenant unit config fail", KR(ret), K(is_hidden_sys));
670
  } else if (OB_FAIL(unit.init(tenant_id,
671
                        unit_id,
672
                        share::ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL,
673
                        unit_config,
674
                        lib::Worker::CompatMode::MYSQL,
675
                        create_timestamp,
676
                        has_memstore,
677
                        false /*is_removed*/))) {
678
    LOG_WARN("fail to init hidden sys tenant unit", K(ret), K(tenant_id));
679
  } else if (OB_FAIL(meta.build(unit, super_block))) {
680
    LOG_WARN("fail to build tenant meta", K(ret), K(tenant_id));
681
  }
682

683
  return ret;
684
}
685

686
int ObMultiTenant::construct_meta_for_virtual_tenant(const uint64_t tenant_id,
687
                                                     const double min_cpu,
688
                                                     const double max_cpu,
689
                                                     const int64_t mem_limit,
690
                                                     ObTenantMeta &meta)
691
{
692
  int ret = OB_SUCCESS;
693

694
  ObTenantSuperBlock super_block(tenant_id, true/*is_hidden*/);
695
  share::ObUnitInfoGetter::ObTenantConfig unit;
696
  uint64_t unit_id = 1000;
697

698
  share::ObUnitConfig unit_config;
699
  const bool has_memstore = true;
700
  const int64_t create_timestamp = ObTimeUtility::current_time();
701
  if (OB_FAIL(unit_config.gen_virtual_tenant_unit_config(max_cpu, min_cpu, mem_limit))) {
702
    LOG_WARN("generate virtual tenant unit config fail", KR(ret), K(max_cpu), K(min_cpu),
703
        K(mem_limit));
704
  } else if (OB_FAIL(unit.init(tenant_id,
705
                        unit_id,
706
                        share::ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL,
707
                        unit_config,
708
                        lib::Worker::CompatMode::MYSQL,
709
                        create_timestamp,
710
                        has_memstore,
711
                        false /*is_removed*/))) {
712
    LOG_WARN("fail to init virtual tenant unit", K(ret), K(tenant_id));
713
  } else if (OB_FAIL(meta.build(unit, super_block))) {
714
    LOG_WARN("fail to build tenant meta", K(ret), K(tenant_id));
715
  }
716

717
  return ret;
718
}
719

720
int ObMultiTenant::create_hidden_sys_tenant()
721
{
722
  int ret = OB_SUCCESS;
723
  const uint64_t tenant_id = OB_SYS_TENANT_ID;
724
  omt::ObTenant *tenant = nullptr;
725
  ObTenantMeta meta;
726
  if (OB_FAIL(construct_meta_for_hidden_sys(meta))) {
727
    LOG_ERROR("fail to construct meta", K(ret));
728
  } else if (OB_FAIL(create_tenant(meta, true /* write_slog */))) {
729
    LOG_ERROR("create hidden sys tenant failed", K(ret));
730
  }
731
  return ret;
732
}
733

734
int ObMultiTenant::update_hidden_sys_tenant()
735
{
736
  int ret = OB_SUCCESS;
737
  const uint64_t tenant_id = OB_SYS_TENANT_ID;
738
  omt::ObTenant *tenant = nullptr;
739
  ObTenantMeta meta;
740
  if (OB_FAIL(get_tenant(tenant_id, tenant))) { // sys tenant will not be deleted
741
    LOG_WARN("failed to get sys tenant", K(ret));
742
  } else if (OB_FAIL(construct_meta_for_hidden_sys(meta))) {
743
    LOG_ERROR("fail to construct meta", K(ret));
744
  } else {
745
    int64_t bucket_lock_idx = -1;
746
    bool lock_succ = false;
747
    if (OB_FAIL(bucket_lock_.wrlock(bucket_lock_idx = get_tenant_lock_bucket_idx(tenant_id)))) {
748
      LOG_WARN("fail to try_wrlock for update tenant unit", K(ret), K(tenant_id), K(bucket_lock_idx));
749
    } else if (FALSE_IT(lock_succ = true)) {
750
    } else if (!tenant->is_hidden() || meta.unit_ == tenant->get_unit()) {
751
      // do nothing
752
    } else if (OB_FAIL(update_tenant_unit_no_lock(meta.unit_))) {
753
      LOG_WARN("fail to update tenant unit", K(ret), K(tenant_id));
754
    }
755
    if (lock_succ) {
756
      bucket_lock_.unlock(bucket_lock_idx);
757
    }
758
  }
759
  return ret;
760
}
761

762
int ObMultiTenant::create_virtual_tenants()
763
{
764
  int ret = OB_SUCCESS;
765
  const int64_t phy_cpu_cnt = sysconf(_SC_NPROCESSORS_ONLN);
766
  const double data_cpu = (phy_cpu_cnt <= 4) ? 1.0 : OB_DATA_CPU;
767
  const double dtl_cpu = (phy_cpu_cnt <= 4) ? 1.0 : OB_DTL_CPU;
768

769
  if (OB_FAIL(create_tenant_without_unit(
770
                         OB_DATA_TENANT_ID,
771
                         data_cpu,
772
                         data_cpu))) {
773
    LOG_ERROR("add data tenant fail", K(ret));
774

775
  } else if (OB_FAIL(create_tenant_without_unit(
776
                         OB_DTL_TENANT_ID,
777
                         dtl_cpu,
778
                         dtl_cpu))) {
779
    LOG_ERROR("add DTL tenant fail", K(ret));
780

781
  } else {
782
    // init allocator for OB_SERVER_TENANT_ID
783
    ObMallocAllocator *allocator = ObMallocAllocator::get_instance();
784
    if (!OB_ISNULL(allocator)) {
785
      allocator->set_tenant_limit(OB_SERVER_TENANT_ID, INT64_MAX);
786
    }
787

788
    // set tenant mem limits
789
    ObVirtualTenantManager &omti = ObVirtualTenantManager::get_instance();
790
    if (OB_FAIL(omti.add_tenant(OB_SERVER_TENANT_ID))) {
791
      LOG_ERROR("Fail to add server tenant to tenant manager, ", K(ret));
792
    } else if (OB_FAIL(omti.set_tenant_mem_limit(OB_SERVER_TENANT_ID, 0, INT64_MAX))) {
793
      LOG_ERROR("Fail to set tenant mem limit, ", K(ret));
794
    }
795
  }
796

797
  return ret;
798
}
799

800

801
int ObMultiTenant::create_tenant_without_unit(const uint64_t tenant_id,
802
                                              const double min_cpu,
803
                                              const double max_cpu)
804
{
805
  int ret = OB_SUCCESS;
806
  ObTenantMeta meta;
807
  uint64_t mem_limit = 0;
808

809
  if (OB_SERVER_TENANT_ID == tenant_id) {
810
    mem_limit = INT64_MAX;
811
  } else {
812
    static const int64_t VIRTUAL_TENANT_MEMORY_LIMTI = 1L << 30;
813
    mem_limit = VIRTUAL_TENANT_MEMORY_LIMTI;
814
  }
815
  if (OB_FAIL(construct_meta_for_virtual_tenant(tenant_id, min_cpu, max_cpu, mem_limit, meta))) {
816
    LOG_WARN("fail to construct_meta_for_virtual_tenant", K(ret), K(tenant_id));
817
  } else if (OB_FAIL(create_tenant(meta, false))) {
818
    LOG_WARN("fail to create virtual tenant", K(ret), K(tenant_id));
819
  }
820
  if (OB_SUCC(ret) && is_virtual_tenant_id(tenant_id)) {
821
    ObVirtualTenantManager &omti = ObVirtualTenantManager::get_instance();
822
    if (OB_FAIL(omti.add_tenant(tenant_id))) {
823
      LOG_ERROR("Fail to add virtual tenant to tenant manager, ", K(ret));
824
    } else if (OB_FAIL(omti.set_tenant_mem_limit(tenant_id, 0, mem_limit))) {
825
      LOG_ERROR("Fail to set virtual tenant mem limit, ", K(ret));
826
    }
827
  }
828
  return ret;
829
}
830

831
int ObMultiTenant::convert_hidden_to_real_sys_tenant(const ObUnitInfoGetter::ObTenantConfig &unit,
832
                                                     const int64_t abs_timeout_us)
833
{
834
  int ret = OB_SUCCESS;
835

836
  ObTenant *tenant = nullptr;
837
  const double min_cpu = static_cast<double>(unit.config_.min_cpu());
838
  const double max_cpu = static_cast<double>(unit.config_.max_cpu());
839
  const uint64_t tenant_id = unit.tenant_id_;
840
  int64_t allowed_mem_limit = 0;
841
  ObTenantSuperBlock new_super_block;
842
  bool lock_succ = false;
843
  int64_t bucket_lock_idx = -1;
844
  int64_t lock_timeout_ts = abs_timeout_us - 3000000; // reserve 3s for converting tenant
845

846
  if (IS_NOT_INIT) {
847
    ret = OB_NOT_INIT;
848
    LOG_WARN("not init", K(ret));
849
  } else if (OB_FAIL(bucket_lock_.wrlock(bucket_lock_idx = get_tenant_lock_bucket_idx(tenant_id), lock_timeout_ts))) {
850
    LOG_WARN("fail to wrlock for convert_hidden_to_real_sys_tenant", K(ret), K(bucket_lock_idx), K(lock_timeout_ts));
851
  } else if (FALSE_IT(lock_succ = true)) {
852
  } else if (OB_FAIL(get_tenant(tenant_id, tenant))) {
853
    LOG_WARN("fail to get sys tenant", K(tenant_id), K(ret));
854
  } else if (!tenant->is_hidden()) {
855
    ret = OB_ERR_UNEXPECTED;
856
    LOG_WARN("must be hidden sys tenant", K(ret));
857
  } else if (FALSE_IT(new_super_block = tenant->get_super_block())) {
858
  } else if (FALSE_IT(new_super_block.is_hidden_ = false)) {
859
  } else if (OB_FAIL(update_tenant_unit_no_lock(unit))) {
860
    LOG_WARN("fail to update_tenant_unit_no_lock", K(ret), K(unit));
861
  } else if (OB_FAIL(ObServerCheckpointSlogHandler::get_instance()
862
      .write_tenant_super_block_slog(new_super_block))) {
863
    LOG_WARN("fail to write_tenant_super_block_slog", K(ret), K(new_super_block));
864
  } else {
865
    tenant->set_tenant_super_block(new_super_block);
866
  }
867

868
  if (lock_succ) {
869
    bucket_lock_.unlock(bucket_lock_idx);
870
  }
871

872
  FLOG_INFO("finish convert_hidden_to_real_sys_tenant", K(ret), K(new_super_block), K(bucket_lock_idx));
873

874
  return ret;
875
}
876

877
int ObMultiTenant::create_tenant(const ObTenantMeta &meta, bool write_slog, const int64_t abs_timeout_us)
878
{
879
  int ret = OB_SUCCESS;
880
  int tmp_ret = OB_SUCCESS;
881

882
  const double min_cpu = static_cast<double>(meta.unit_.config_.min_cpu());
883
  const double max_cpu = static_cast<double>(meta.unit_.config_.max_cpu());
884
  const uint64_t tenant_id = meta.unit_.tenant_id_;
885
  ObTenant *tenant = nullptr;
886
  int64_t allowed_mem_limit = 0;
887
  ObMallocAllocator *malloc_allocator = ObMallocAllocator::get_instance();
888
  ObTenantCreateStep create_step = ObTenantCreateStep::STEP_BEGIN;  // step0
889
  bool lock_succ = false;
890
  int64_t bucket_lock_idx = -1;
891
  const int64_t log_disk_size = meta.unit_.config_.log_disk_size();
892
  int64_t lock_timeout_ts = abs_timeout_us - 5000000; // reserve 5s for creating tenant
893

894
  if (IS_NOT_INIT) {
895
    ret = OB_NOT_INIT;
896
    LOG_ERROR("not init", K(ret));
897
  } else if (!meta.is_valid()) {
898
    ret = OB_INVALID_ARGUMENT;
899
    LOG_ERROR("invalid argument", K(ret), K(meta));
900
  } else if (OB_ISNULL(malloc_allocator)) {
901
    ret = OB_ERR_UNEXPECTED;
902
    LOG_ERROR("malloc allocator is NULL", K(ret));
903
  } else if (OB_FAIL(bucket_lock_.wrlock(bucket_lock_idx = get_tenant_lock_bucket_idx(tenant_id), lock_timeout_ts))) {
904
    LOG_WARN("fail to wrlock for create tenant", K(ret), K(tenant_id), K(bucket_lock_idx), K(lock_timeout_ts));
905
  } else if (FALSE_IT(lock_succ = true)) {
906
  } else if (OB_SUCC(get_tenant(tenant_id, tenant))) {
907
    ret = OB_TENANT_EXIST;
908
    LOG_WARN("tenant exist", K(ret), K(tenant_id));
909
  } else {
910
    ret = OB_SUCCESS;
911
  }
912

913
  tenant = nullptr;
914

915
  bool tenant_allocator_created = false;
916
  if (OB_SUCC(ret)) {
917
    if (OB_FAIL(malloc_allocator->create_and_add_tenant_allocator(tenant_id))) {
918
      LOG_ERROR("create and add tenant allocator failed", K(ret), K(tenant_id));
919
    } else {
920
      tenant_allocator_created = true;
921
    }
922
    if (OB_SUCC(ret)) {
923
      int64_t memory_size = meta.unit_.config_.memory_size();
924
      if (is_sys_tenant(tenant_id) && !meta.super_block_.is_hidden_) {
925
        memory_size += GMEMCONF.get_extra_memory();
926
      }
927
      if (OB_FAIL(update_tenant_memory(tenant_id, memory_size, allowed_mem_limit))) {
928
        LOG_WARN("fail to update tenant memory", K(ret), K(tenant_id));
929
      }
930
    }
931
  }
932
  if (OB_SUCC(ret)) {
933
    ObSEArray<ObCtxMemConfig, ObCtxIds::MAX_CTX_ID> configs;
934
    if (OB_FAIL(mcg_->get(tenant_id, allowed_mem_limit, configs))) {
935
      LOG_ERROR("get ctx mem config failed", K(ret));
936
    }
937
    for (int64_t i = 0; OB_SUCC(ret) && i < configs.count(); i++) {
938
      const uint64_t ctx_id = configs.at(i).ctx_id_;
939
      const int64_t idle_size = configs.at(i).idle_size_;
940
      const int64_t limit = configs.at(i).limit_;
941
      const bool reserve = true;
942
      if (OB_FAIL(malloc_allocator->set_tenant_ctx_idle(tenant_id, ctx_id, idle_size, reserve))) {
943
        LOG_ERROR("set tenant ctx idle failed", K(ret));
944
      } else if (OB_FAIL(set_ctx_limit(tenant_id, ctx_id, limit))) {
945
        LOG_ERROR("set tenant ctx limit failed", K(ret), K(limit));
946
      }
947
      LOG_INFO("init ctx memory finish", K(ret), K(tenant_id), K(i), K(configs.at(i)));
948
    }
949
    if (OB_SUCC(ret)) {
950
      create_step = ObTenantCreateStep::STEP_CTX_MEM_CONFIG_SETTED; // step1
951
    }
952
  }
953
  if (OB_SUCC(ret)) {
954
    if (!is_virtual_tenant_id(tenant_id)
955
        && OB_FAIL(GCTX.log_block_mgr_->create_tenant(log_disk_size))) {
956
      LOG_ERROR("create_tenant in ObServerLogBlockMgr failed", KR(ret));
957
    }
958
    // if create_tenant in ObServerLogBlockMGR success, the log disk size need by this tenant has been pinned,
959
    // otherwise, the assigned log disk size of ObServerLogBlockMGR is origin.
960
    if (OB_SUCC(ret)) {
961
      create_step = ObTenantCreateStep::STEP_LOG_DISK_SIZE_PINNED;  // step2
962
    }
963
  }
964

965
  if (OB_FAIL(ret)) {
966
    // do nothing
967
  } else if (OB_ISNULL(GCTX.cgroup_ctrl_)) {
968
    ret = OB_NOT_INIT;
969
    LOG_WARN("group ctrl not init", K(ret));
970
  } else if (OB_ISNULL(tenant = OB_NEW(
971
    ObTenant, ObModIds::OMT, tenant_id, GCONF.workers_per_cpu_quota.get_value(), *GCTX.cgroup_ctrl_))) {
972
    ret = OB_ALLOCATE_MEMORY_FAILED;
973
    LOG_WARN("new tenant fail", K(ret));
974
  } else if (FALSE_IT(create_step = ObTenantCreateStep::STEP_TENANT_NEWED)) { //step3
975

976
  } else if (OB_FAIL(tenant->init_ctx())) {
977
    LOG_WARN("init ctx fail", K(tenant_id), K(ret));
978
  } else if (write_slog) {
979
    if (OB_FAIL(write_create_tenant_prepare_slog(meta))) {
980
      LOG_ERROR("fail to write create tenant prepare slog", K(ret));
981
    } else {
982
      create_step = ObTenantCreateStep::STEP_WRITE_PREPARE_SLOG; // step4
983
    }
984
  }
985

986
  if (OB_SUCC(ret)) {
987
    if (OB_FAIL(OTC_MGR.add_tenant_config(tenant_id))) {
988
      LOG_ERROR("add tenant config fail", K(tenant_id), K(ret));
989
    }
990
  }
991

992
  if (OB_SUCC(ret)) {
993
    CREATE_WITH_TEMP_ENTITY(RESOURCE_OWNER, tenant->id()) {
994
      WITH_ENTITY(&tenant->ctx()) {
995
        if (OB_FAIL(tenant->init(meta))) {
996
          LOG_ERROR("init tenant fail", K(tenant_id), K(ret));
997
        }
998
      }
999
    }
1000
  }
1001

1002
  if (OB_FAIL(ret)) {
1003
    // do nothing
1004
#ifdef OMT_UNITTEST
1005
   } else if (!is_virtual_tenant_id(tenant_id) &&
1006
       OB_FAIL(OTC_MGR.got_version(tenant_id, common::ObSystemConfig::INIT_VERSION))) {
1007
     LOG_ERROR("failed to got version", K(tenant_id), K(ret));
1008
#endif
1009
  } else if (!is_virtual_tenant_id(tenant_id)) {
1010
    ObTenantSwitchGuard guard(tenant);
1011
    if (OB_FAIL(MTL(ObTenantFreezer *)->set_tenant_mem_limit(meta.unit_.config_.memory_size(), allowed_mem_limit))) {
1012
      LOG_WARN("fail to set_tenant_mem_limit", K(ret), K(tenant_id));
1013
    }
1014
  }
1015
  if (OB_SUCC(ret)) {
1016
    if (write_slog && OB_FAIL(write_create_tenant_commit_slog(tenant_id))) {
1017
      LOG_ERROR("fail to write create tenant commit slog", K(ret), K(tenant_id));
1018
    } else {
1019
      tenant->set_create_status(ObTenantCreateStatus::CREATE_COMMIT);
1020
      create_step = ObTenantCreateStep::STEP_FINISH; // step5
1021
    }
1022
  }
1023

1024
  if (OB_SUCC(ret)) {
1025
    SpinWLockGuard guard(lock_);
1026
    ObTenant *tmp_tenant = NULL;
1027
    TenantIterator iter;
1028
    if (OB_SUCC(get_tenant_unsafe(tenant_id, tmp_tenant))) {
1029
      ret = OB_TENANT_EXIST;
1030
      LOG_ERROR("tenant exist", K(ret), K(tenant_id));
1031
    } else if (OB_FAIL(tenants_.insert(tenant, iter, compare_tenant))) {
1032
      LOG_ERROR("fail to insert tenant", K(ret), K(tenant_id));
1033
    }
1034
  }
1035
  // TODO: @lingyang 预期不能失败
1036
  if (!is_virtual_tenant_id(tenant_id) && OB_TMP_FAIL(update_tenant_config(tenant_id))) {
1037
    LOG_WARN("update tenant config fail", K(tenant_id), K(tmp_ret));
1038
  }
1039

1040
  if (OB_FAIL(ret)) {
1041
    do {
1042
      tmp_ret = OB_SUCCESS;
1043
      if (create_step >= ObTenantCreateStep::STEP_TENANT_NEWED) {
1044
        if (OB_NOT_NULL(tenant)) {
1045
          tenant->stop();
1046
          while (OB_SUCCESS != tenant->try_wait()) {
1047
            ob_usleep(100 * 1000);
1048
          }
1049
          tenant->destroy();
1050
          ob_delete(tenant);
1051
          tenant = nullptr;
1052
        }
1053
        // no need rollback when replaying slog and creating a virtual tenant,
1054
        // in which two case the write_slog flag is set to false
1055
        if (write_slog && OB_SUCCESS != (tmp_ret = clear_persistent_data(tenant_id))) {
1056
          LOG_ERROR("fail to clear persistent data", K(tenant_id), K(tmp_ret));
1057
          SLEEP(1);
1058
        }
1059
      }
1060
    } while (OB_SUCCESS != tmp_ret);
1061

1062
    do {
1063
      tmp_ret = OB_SUCCESS;
1064
      if (create_step >= ObTenantCreateStep::STEP_CTX_MEM_CONFIG_SETTED) {
1065
        for (uint64_t ctx_id = 0; ctx_id < ObCtxIds::MAX_CTX_ID; ctx_id++) {
1066
          if (NULL == malloc_allocator->get_tenant_ctx_allocator(tenant_id, ctx_id)) {
1067
            // do-nothing
1068
          } else if (OB_SUCCESS != (tmp_ret = malloc_allocator->set_tenant_ctx_idle(tenant_id, ctx_id, 0))) {
1069
            LOG_ERROR("fail to cleanup ctx mem config", K(tmp_ret), K(tenant_id), K(ctx_id));
1070
            SLEEP(1);
1071
          }
1072
        }
1073
      }
1074
    } while (OB_SUCCESS != tmp_ret);
1075

1076
    do {
1077
      tmp_ret = OB_SUCCESS;
1078
      if (create_step >= ObTenantCreateStep::STEP_LOG_DISK_SIZE_PINNED) {
1079
        if (!is_virtual_tenant_id(tenant_id)) {
1080
          GCTX.log_block_mgr_->abort_create_tenant(log_disk_size);
1081
        }
1082
      }
1083
    } while (OB_SUCCESS != tmp_ret);
1084

1085
    if (create_step >= ObTenantCreateStep::STEP_WRITE_PREPARE_SLOG) {
1086
      if (OB_SUCCESS != (tmp_ret = write_create_tenant_abort_slog(tenant_id))) {
1087
        LOG_ERROR("fail to write create tenant abort slog", K(tmp_ret));
1088
      }
1089
    }
1090
  }
1091
  if (OB_FAIL(ret) && tenant_allocator_created) {
1092
    malloc_allocator->recycle_tenant_allocator(tenant_id);
1093
  }
1094
  if (lock_succ) {
1095
    bucket_lock_.unlock(bucket_lock_idx);
1096
  }
1097

1098
  FLOG_INFO("finish create new tenant", K(ret), K(tenant_id), K(write_slog), K(create_step), K(bucket_lock_idx));
1099

1100
  return ret;
1101
}
1102

1103
int ObMultiTenant::update_tenant_unit_no_lock(const ObUnitInfoGetter::ObTenantConfig &unit)
1104
{
1105
  int ret = OB_SUCCESS;
1106

1107
  ObTenant *tenant = nullptr;
1108
  const double min_cpu = static_cast<double>(unit.config_.min_cpu());
1109
  const double max_cpu = static_cast<double>(unit.config_.max_cpu());
1110
  const uint64_t tenant_id = unit.tenant_id_;
1111
  ObUnitInfoGetter::ObTenantConfig allowed_new_unit;
1112
  ObUnitInfoGetter::ObTenantConfig old_unit;
1113
  int64_t allowed_new_log_disk_size = 0;
1114
  if (IS_NOT_INIT) {
1115
    ret = OB_NOT_INIT;
1116
    LOG_WARN("not init", K(ret));
1117
  } else if (OB_FAIL(get_tenant(tenant_id, tenant))) {
1118
    LOG_WARN("fail to get tenant", K(tenant_id), K(ret));
1119
  } else if (OB_ISNULL(tenant)) {
1120
    ret = OB_ERR_UNEXPECTED;
1121
    LOG_ERROR("tenant is nullptr", K(tenant_id));
1122
  } else if (OB_FAIL(old_unit.assign(tenant->get_unit()))) {
1123
    LOG_ERROR("fail to assign old unit failed", K(tenant_id), K(unit));
1124
  } else if (OB_FAIL(update_tenant_log_disk_size(tenant_id,
1125
                                                 old_unit.config_.log_disk_size(),
1126
                                                 unit.config_.log_disk_size(),
1127
                                                 allowed_new_log_disk_size))) {
1128
    LOG_WARN("fail to update tenant log disk size", K(ret), K(tenant_id));
1129
  } else if (OB_FAIL(construct_allowed_unit_config(allowed_new_log_disk_size,
1130
                                                   unit,
1131
                                                   allowed_new_unit))) {
1132
    LOG_WARN("fail to construct_allowed_unit_config", K(allowed_new_log_disk_size),
1133
             K(allowed_new_unit));
1134
  } else if (OB_FAIL(write_update_tenant_unit_slog(allowed_new_unit))) {
1135
    LOG_WARN("fail to write tenant meta slog", K(ret), K(tenant_id));
1136
  } else if (OB_FAIL(tenant->update_thread_cnt(max_cpu))) {
1137
    LOG_WARN("fail to update mtl module thread_cnt", K(ret), K(tenant_id));
1138
  } else {
1139
    if (tenant->unit_min_cpu() != min_cpu) {
1140
      tenant->set_unit_min_cpu(min_cpu);
1141
    }
1142
    if (tenant->unit_max_cpu() != max_cpu) {
1143
      tenant->set_unit_max_cpu(max_cpu);
1144
    }
1145
    tenant->set_tenant_unit(allowed_new_unit);
1146
    LOG_INFO("succecc to set tenant unit config", K(unit));
1147
  }
1148

1149
  return ret;
1150
}
1151

1152
int ObMultiTenant::update_tenant_memory(const ObUnitInfoGetter::ObTenantConfig &unit,
1153
                                        const int64_t extra_memory /* = 0 */)
1154
{
1155
  int ret = OB_SUCCESS;
1156
  ObTenant *tenant = nullptr;
1157
  const uint64_t tenant_id = unit.tenant_id_;
1158
  int64_t allowed_mem_limit = 0;
1159
  int64_t memory_size = unit.config_.memory_size() + extra_memory;
1160
  if (IS_NOT_INIT) {
1161
    ret = OB_NOT_INIT;
1162
    LOG_WARN("not init", K(ret));
1163
  } else if (OB_FAIL(get_tenant(tenant_id, tenant))) {
1164
    LOG_WARN("fail to get tenant", K(tenant_id), K(ret));
1165
  } else if (OB_ISNULL(tenant)) {
1166
    ret = OB_ERR_UNEXPECTED;
1167
    LOG_ERROR("tenant is nullptr", K(tenant_id));
1168
  } else if (OB_FAIL(update_tenant_memory(tenant_id, memory_size, allowed_mem_limit))) {
1169
    LOG_WARN("fail to update tenant memory", K(ret), K(tenant_id));
1170
  } else if (OB_FAIL(update_tenant_freezer_mem_limit(tenant_id, memory_size, allowed_mem_limit))) {
1171
    LOG_WARN("fail to update_tenant_freezer_mem_limit", K(ret), K(tenant_id));
1172
  } else if (OB_FAIL(update_throttle_config_(tenant_id))) {
1173
    LOG_WARN("update throttle config failed", K(ret), K(tenant_id));
1174
  } else if (FALSE_IT(tenant->set_unit_memory_size(allowed_mem_limit))) {
1175
    // unreachable
1176
  }
1177
  return ret;
1178
}
1179

1180
int ObMultiTenant::construct_allowed_unit_config(const int64_t allowed_new_log_disk_size,
1181
                                                 const ObUnitInfoGetter::ObTenantConfig &expected_unit_config,
1182
                                                 ObUnitInfoGetter::ObTenantConfig &allowed_new_unit)
1183
{
1184
  int ret = OB_SUCCESS;
1185
  if (0 > allowed_new_log_disk_size
1186
      || !expected_unit_config.is_valid()) {
1187
    ret= OB_INVALID_ARGUMENT;
1188
  } else if (OB_FAIL(allowed_new_unit.assign(expected_unit_config))) {
1189
    LOG_ERROR("fail to assign new unit", K(allowed_new_log_disk_size), K(expected_unit_config));
1190
  } else {
1191
    // construct allowed resource.
1192
    ObUnitResource allowed_resource(
1193
        expected_unit_config.config_.max_cpu(),
1194
        expected_unit_config.config_.min_cpu(),
1195
        expected_unit_config.config_.memory_size(),
1196
        allowed_new_log_disk_size,
1197
        expected_unit_config.config_.max_iops(),
1198
        expected_unit_config.config_.min_iops(),
1199
        expected_unit_config.config_.iops_weight());
1200
    if (OB_FAIL(allowed_new_unit.config_.update_unit_resource(allowed_resource))) {
1201
      LOG_WARN("update_unit_resource failed", K(allowed_new_log_disk_size), K(allowed_new_unit),
1202
               K(allowed_resource));
1203
    }
1204
  }
1205
  return ret;
1206
}
1207

1208
int ObMultiTenant::update_tenant_unit(const ObUnitInfoGetter::ObTenantConfig &unit)
1209
{
1210
  int ret = OB_SUCCESS;
1211
  const uint64_t tenant_id = unit.tenant_id_;
1212
  int64_t bucket_lock_idx = -1;
1213
  bool lock_succ = false;
1214

1215
  if (IS_NOT_INIT) {
1216
    ret = OB_NOT_INIT;
1217
    LOG_WARN("not init", K(ret));
1218
  } else if (OB_FAIL(bucket_lock_.wrlock(bucket_lock_idx = get_tenant_lock_bucket_idx(tenant_id)))) {
1219
    LOG_WARN("fail to try_wrlock for update tenant unit", K(ret), K(tenant_id), K(bucket_lock_idx));
1220
  } else if (FALSE_IT(lock_succ = true)) {
1221
  } else if (OB_FAIL(update_tenant_unit_no_lock(unit))) {
1222
    LOG_WARN("fail to update_tenant_unit_no_lock", K(ret), K(unit));
1223
  }
1224

1225
  if (lock_succ) {
1226
    bucket_lock_.unlock(bucket_lock_idx);
1227
  }
1228

1229
  LOG_INFO("OMT finish update tenant unit config", K(ret), K(unit), K(bucket_lock_idx));
1230

1231
  return ret;
1232
}
1233

1234
int ObMultiTenant::update_tenant_memory(const uint64_t tenant_id, const int64_t mem_limit, int64_t &allowed_mem_limit)
1235
{
1236
  int ret = OB_SUCCESS;
1237

1238
  ObMallocAllocator *malloc_allocator = ObMallocAllocator::get_instance();
1239

1240
  allowed_mem_limit = mem_limit;
1241
  const int64_t pre_mem_limit = malloc_allocator->get_tenant_limit(tenant_id);
1242
  const int64_t mem_hold = malloc_allocator->get_tenant_hold(tenant_id);
1243
  const int64_t target_mem_limit = mem_limit;
1244

1245
  if (OB_SUCC(ret)) {
1246
    // make sure half reserve memory available
1247
    if (target_mem_limit < pre_mem_limit) {
1248
      allowed_mem_limit = mem_hold + static_cast<int64_t>(
1249
          static_cast<double>(target_mem_limit) * TENANT_RESERVE_MEM_RATIO / 2.0);
1250
      if (allowed_mem_limit < target_mem_limit) {
1251
        allowed_mem_limit = target_mem_limit;
1252
      }
1253
      if (allowed_mem_limit < pre_mem_limit) {
1254
        LOG_INFO("reduce memory quota", K(mem_limit), K(pre_mem_limit), K(target_mem_limit), K(mem_hold));
1255
      } else {
1256
        allowed_mem_limit = pre_mem_limit;
1257
        LOG_WARN("try to reduce memory quota, but free memory not enough",
1258
                 K(allowed_mem_limit), K(pre_mem_limit), K(target_mem_limit), K(mem_hold));
1259
      }
1260
    }
1261

1262
    if (allowed_mem_limit != pre_mem_limit) {
1263
      malloc_allocator->set_tenant_limit(tenant_id, allowed_mem_limit);
1264
    }
1265
  }
1266

1267
  return ret;
1268
}
1269

1270
int ObMultiTenant::update_tenant_log_disk_size(const uint64_t tenant_id,
1271
                                               const int64_t old_log_disk_size,
1272
                                               const int64_t new_log_disk_size,
1273
                                               int64_t &allowed_new_log_disk_size)
1274
{
1275
  int ret = OB_SUCCESS;
1276
  MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
1277
  if (OB_SUCC(guard.switch_to(tenant_id))) {
1278
    ObLogService *log_service = MTL(ObLogService *);
1279
    if (OB_ISNULL(log_service)) {
1280
      ret = OB_ERR_UNEXPECTED;
1281
    } else if (OB_FAIL(GCTX.log_block_mgr_->update_tenant(old_log_disk_size, new_log_disk_size,
1282
                                                          allowed_new_log_disk_size, log_service))) {
1283
      LOG_WARN("fail to update_tenant", K(tenant_id), K(old_log_disk_size), K(new_log_disk_size),
1284
               K(allowed_new_log_disk_size));
1285
    } else {
1286
      LOG_INFO("update_tenant_log_disk_size success", K(tenant_id), K(old_log_disk_size),
1287
               K(new_log_disk_size), K(allowed_new_log_disk_size));
1288
    }
1289
  }
1290
  return ret;
1291
}
1292

1293
int ObMultiTenant::update_tenant_config(uint64_t tenant_id)
1294
{
1295
  int ret = OB_SUCCESS;
1296
  int tmp_ret = OB_SUCCESS;
1297
  ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
1298
  if (false == tenant_config.is_valid()) {
1299
  } else {
1300
    MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
1301
    if (OB_SUCC(guard.switch_to(tenant_id))) {
1302
      if (OB_TMP_FAIL(update_palf_config())) {
1303
        LOG_WARN("failed to update palf disk config", K(tmp_ret), K(tenant_id));
1304
      }
1305
      if (OB_TMP_FAIL(update_tenant_dag_scheduler_config())) {
1306
        LOG_WARN("failed to update tenant dag scheduler config", K(tmp_ret), K(tenant_id));
1307
      }
1308
      if (OB_TMP_FAIL(update_tenant_ddl_config())) {
1309
        LOG_WARN("failed to update tenant ddl config", K(tmp_ret), K(tenant_id));
1310
      }
1311
      if (OB_TMP_FAIL(update_tenant_freezer_config_())) {
1312
        LOG_WARN("failed to update tenant tenant freezer config", K(tmp_ret), K(tenant_id));
1313
      }
1314
      if (OB_TMP_FAIL(update_throttle_config_(tenant_id))) {
1315
        LOG_WARN("update throttle config failed", K(ret), K(tenant_id));
1316
      }
1317
    }
1318
  }
1319
  LOG_INFO("update_tenant_config success", K(tenant_id));
1320
  return ret;
1321
}
1322

1323
int ObMultiTenant::update_palf_config()
1324
{
1325
  int ret = OB_SUCCESS;
1326
  ObLogService *log_service = MTL(ObLogService *);
1327
  if (NULL == log_service) {
1328
    ret = OB_ERR_UNEXPECTED;
1329
  } else {
1330
    ret = log_service->update_palf_options_except_disk_usage_limit_size();
1331
  }
1332
  return ret;
1333
}
1334

1335
int ObMultiTenant::update_tenant_dag_scheduler_config()
1336
{
1337
  int ret = OB_SUCCESS;
1338
  ObTenantDagScheduler *dag_scheduler = MTL(ObTenantDagScheduler*);
1339
  if (OB_FAIL(ret)) {
1340
    // do nothing
1341
  } else if (OB_ISNULL(dag_scheduler)) {
1342
    ret = OB_ERR_UNEXPECTED;
1343
    LOG_WARN("dag scheduler should not be null", K(ret));
1344
  } else {
1345
    dag_scheduler->reload_config();
1346
  }
1347
  return ret;
1348
}
1349

1350
int ObMultiTenant::update_tenant_ddl_config()
1351
{
1352
  int ret = OB_SUCCESS;
1353
  const uint64_t tenant_id = MTL_ID();
1354
  omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
1355
#ifdef ERRSIM
1356
  if (tenant_config.is_valid()) {
1357
    if (OB_FAIL(ObDDLSimPointMgr::get_instance().set_tenant_param(tenant_id,
1358
                                                                  tenant_config->errsim_ddl_sim_point_random_control,
1359
                                                                  tenant_config->errsim_ddl_sim_point_fixed_list))) {
1360
      LOG_WARN("set tenant param for ddl sim point failed", K(ret),
1361
          K(tenant_id), K(tenant_config->errsim_ddl_sim_point_random_control), K(tenant_config->errsim_ddl_sim_point_fixed_list));
1362
    }
1363
  }
1364
#endif
1365
  return ret;
1366
}
1367

1368
int ObMultiTenant::update_tenant_freezer_config_()
1369
{
1370
  int ret = OB_SUCCESS;
1371
  ObTenantFreezer *freezer = MTL(ObTenantFreezer*);
1372
  if (NULL == freezer) {
1373
    ret = OB_ERR_UNEXPECTED;
1374
    LOG_ERROR("tenant freezer should not be null", K(ret));
1375
  } else if (OB_FAIL(freezer->reload_config())) {
1376
    LOG_WARN("tenant freezer config update failed", K(ret));
1377
  }
1378
  return ret;
1379
}
1380

1381
int ObMultiTenant::update_throttle_config_(const uint64_t tenant_id)
1382
{
1383
  int ret = OB_SUCCESS;
1384

1385
  MTL_SWITCH(tenant_id) {
1386
    ObSharedMemAllocMgr *share_mem_alloc_mgr = MTL(ObSharedMemAllocMgr *);
1387

1388
    if (OB_ISNULL(share_mem_alloc_mgr)) {
1389
      ret = OB_ERR_UNEXPECTED;
1390
      LOG_ERROR("share mem alloc mgr should not be null", K(ret));
1391
    } else {
1392
      (void)share_mem_alloc_mgr->update_throttle_config();
1393
    }
1394
  }
1395
  return ret;
1396
}
1397

1398
int ObMultiTenant::update_tenant_freezer_mem_limit(const uint64_t tenant_id,
1399
                                                   const int64_t tenant_min_mem,
1400
                                                   const int64_t tenant_max_mem)
1401
{
1402
  int ret = OB_SUCCESS;
1403

1404
  MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
1405
  ObTenantFreezer *freezer = nullptr;
1406
  if (tenant_id != MTL_ID() && OB_FAIL(guard.switch_to(tenant_id))) {
1407
    LOG_WARN("switch tenant failed", K(ret), K(tenant_id));
1408
  } else if (FALSE_IT(freezer = MTL(ObTenantFreezer *))) {
1409
  } else if (freezer->is_tenant_mem_changed(tenant_min_mem, tenant_max_mem)) {
1410
    if (OB_FAIL(freezer->set_tenant_mem_limit(tenant_min_mem, tenant_max_mem))) {
1411
      LOG_WARN("set tenant mem limit failed", K(ret));
1412
    }
1413
  }
1414
  return ret;
1415
}
1416

1417

1418
int ObMultiTenant::get_tenant_unit(const uint64_t tenant_id, ObUnitInfoGetter::ObTenantConfig &unit)
1419
{
1420
  int ret = OB_SUCCESS;
1421
  ObTenant *tenant = nullptr;
1422

1423
  SpinRLockGuard guard(lock_);
1424
  if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
1425
    LOG_WARN("fail to get tenant", K(tenant_id), K(ret));
1426
  } else {
1427
    unit = tenant->get_unit();
1428
  }
1429

1430
  return ret;
1431
}
1432

1433
int ObMultiTenant::get_unit_id(const uint64_t tenant_id, uint64_t &unit_id)
1434
{
1435
  int ret = OB_SUCCESS;
1436
  ObTenant *tenant = nullptr;
1437

1438
  SpinRLockGuard guard(lock_);
1439
  if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
1440
    LOG_WARN("fail to get tenant", K(tenant_id), K(ret));
1441
  } else {
1442
    unit_id = tenant->get_unit_id();
1443
  }
1444
  return ret;
1445
}
1446

1447
int ObMultiTenant::get_tenant_units(share::TenantUnits &units, bool include_hidden_sys)
1448
{
1449
  int ret = OB_SUCCESS;
1450
  SpinRLockGuard guard(lock_);
1451
  for (TenantList::iterator it = tenants_.begin(); it != tenants_.end() && OB_SUCC(ret); it++) {
1452
    if (OB_ISNULL(*it)) {
1453
      ret = OB_ERR_UNEXPECTED;
1454
      LOG_ERROR("tenant is nullptr", K(ret));
1455
    } else if (is_virtual_tenant_id((*it)->id()) || (!include_hidden_sys && (*it)->is_hidden())) {
1456
      // skip
1457
    } else if (OB_FAIL(units.push_back((*it)->get_unit()))) {
1458
      LOG_WARN("fail to push back unit", K(ret));
1459
    }
1460
  }
1461
  return ret;
1462
}
1463

1464
int ObMultiTenant::get_tenant_metas(common::ObIArray<ObTenantMeta> &metas)
1465
{
1466
  int ret = OB_SUCCESS;
1467
  SpinRLockGuard guard(lock_);
1468
  for (TenantList::iterator it = tenants_.begin(); it != tenants_.end() && OB_SUCC(ret); it++) {
1469
    if (OB_ISNULL(*it)) {
1470
      ret = OB_ERR_UNEXPECTED;
1471
      LOG_ERROR("tenant is nullptr", K(ret));
1472
    } else if (is_virtual_tenant_id((*it)->id()) || (*it)->is_hidden()) {
1473
      // skip
1474
    } else if (OB_FAIL(metas.push_back((*it)->get_tenant_meta()))) {
1475
      LOG_WARN("fail to push back tenant meta", K(ret));
1476
    }
1477
  }
1478
  return ret;
1479
}
1480

1481
int ObMultiTenant::get_tenant_metas_for_ckpt(common::ObIArray<ObTenantMeta> &metas)
1482
{
1483
  int ret = OB_SUCCESS;
1484
  // Ensure that no tenants are being created or deleted
1485
  ObBucketTryRLockAllGuard all_tenant_guard(bucket_lock_);
1486
  if (OB_FAIL(all_tenant_guard.get_ret())) {
1487
    LOG_WARN("fail to try rlock all tenant for ckpt", K(ret));
1488
  } else {
1489
    SpinRLockGuard guard(lock_);
1490
    for (TenantList::iterator it = tenants_.begin(); it != tenants_.end() && OB_SUCC(ret); it++) {
1491
      if (OB_ISNULL(*it)) {
1492
        ret = OB_ERR_UNEXPECTED;
1493
        LOG_ERROR("tenant is nullptr", K(ret));
1494
      } else if (is_virtual_tenant_id((*it)->id())) {
1495
        // skip
1496
      } else if (OB_FAIL(metas.push_back((*it)->get_tenant_meta()))) {
1497
        LOG_WARN("fail to push back tenant meta", K(ret));
1498
      }
1499
    }
1500
  }
1501

1502
  return ret;
1503
}
1504

1505

1506
//Don't call this, please call ObCompatModeGetter::get_tenant_compat_mode
1507
int ObMultiTenant::get_compat_mode(const uint64_t tenant_id, lib::Worker::CompatMode &compat_mode)
1508
{
1509
  int ret = OB_SUCCESS;
1510
  ObTenant *tenant = nullptr;
1511
  SpinRLockGuard guard(lock_);
1512
  if (IS_NOT_INIT) {
1513
    ret = OB_NOT_INIT;
1514
    LOG_WARN("not init", K(ret));
1515
  } else if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
1516
    LOG_WARN("fail to get tenant", K(tenant_id), K(ret));
1517
  } else {
1518
    compat_mode = tenant->get_compat_mode();
1519
  }
1520

1521
  return ret;
1522
}
1523

1524
int ObMultiTenant::update_tenant_cpu(const uint64_t tenant_id, const double min_cpu, const double max_cpu)
1525
{
1526
  int ret = OB_SUCCESS;
1527

1528
  ObTenant *tenant = NULL;
1529
  bool do_update = false;
1530
  SpinRLockGuard guard(lock_);
1531

1532
  if (IS_NOT_INIT) {
1533
    ret = OB_NOT_INIT;
1534
    LOG_WARN("not init", K(ret));
1535
  } else if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
1536
    LOG_WARN("can't modify tenant which doesn't exist", K(tenant_id), K(ret));
1537
  } else if (OB_ISNULL(tenant)) {
1538
    ret = OB_ERR_UNEXPECTED;
1539
    LOG_ERROR("unexpected condition, tenant is NULL", K(tenant));
1540
  } else {
1541
    if (tenant->unit_min_cpu() != min_cpu) {
1542
      tenant->set_unit_min_cpu(min_cpu);
1543
      do_update = true;
1544
    }
1545
    if (tenant->unit_max_cpu() != max_cpu) {
1546
      tenant->set_unit_max_cpu(max_cpu);
1547
      do_update = true;
1548
    }
1549
  }
1550

1551
  if (OB_FAIL(ret)) {
1552
    LOG_ERROR("update tenant cpu failed", K(tenant_id), K(ret));
1553
  } else if (do_update) {
1554
    LOG_INFO("update tenant cpu", K(tenant_id), K(min_cpu), K(max_cpu), K(ret));
1555
  }
1556

1557
  return ret;
1558
}
1559

1560
int ObMultiTenant::modify_tenant_io(const uint64_t tenant_id, const ObUnitConfig &unit_config)
1561
{
1562
  int ret = OB_SUCCESS;
1563
  ObTenant *tenant = NULL;
1564

1565
  if (OB_FAIL(get_tenant(tenant_id, tenant))) {
1566
    LOG_WARN("can't modify tenant which doesn't exist", K(tenant_id), K(ret));
1567
  } else if (OB_ISNULL(tenant)) {
1568
    ret = OB_ERR_UNEXPECTED;
1569
    LOG_ERROR("unexpected condition, tenant is NULL", K(tenant));
1570
  } else {
1571
    ObTenantIOConfig io_config;
1572
    io_config.memory_limit_ = unit_config.memory_size();
1573
    io_config.unit_config_.min_iops_ = unit_config.min_iops();
1574
    io_config.unit_config_.max_iops_ = unit_config.max_iops();
1575
    io_config.unit_config_.weight_ = unit_config.iops_weight();
1576
    ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
1577
    if (!tenant_config.is_valid()) {
1578
      ret = OB_ERR_UNEXPECTED;
1579
      LOG_WARN("tenant config is invalid", K(ret), K(tenant_id));
1580
    } else {
1581
      io_config.callback_thread_count_ = tenant_config->_io_callback_thread_count;
1582
      static const char *trace_mod_name = "io_tracer";
1583
      io_config.enable_io_tracer_ = 0 == strncasecmp(trace_mod_name, GCONF.leak_mod_to_check.get_value(), strlen(trace_mod_name));
1584
      if (OB_FAIL(OB_IO_MANAGER.refresh_tenant_io_config(tenant_id, io_config))) {
1585
        LOG_WARN("refresh tenant io config failed", K(ret), K(tenant_id), K(io_config));
1586
      }
1587
    }
1588
  }
1589
  return ret;
1590
}
1591

1592
bool ObMultiTenant::has_tenant(uint64_t tenant_id) const
1593
{
1594
  ObTenant *tenant = NULL;
1595
  int ret = get_tenant(tenant_id, tenant);
1596
  return OB_SUCCESS == ret && NULL != tenant;
1597
}
1598

1599
bool ObMultiTenant::is_available_tenant(uint64_t tenant_id) const
1600
{
1601
  ObTenant *tenant = NULL;
1602
  bool available = false;
1603
  SpinRLockGuard guard(lock_);
1604
  int ret = get_tenant_unsafe(tenant_id, tenant);
1605
  if (OB_SUCCESS == ret && NULL != tenant) {
1606
    if (tenant->get_create_status() == ObTenantCreateStatus::CREATE_COMMIT) {
1607
      ObUnitInfoGetter::ObUnitStatus unit_status = tenant->get_unit().unit_status_;
1608
      available = share::ObUnitInfoGetter::is_valid_tenant(unit_status);
1609
    }
1610
  }
1611
  return available;
1612
}
1613

1614
int ObMultiTenant::check_if_hidden_sys(const uint64_t tenant_id, bool &is_hidden_sys)
1615
{
1616
  int ret = OB_SUCCESS;
1617
  ObTenant *tenant = nullptr;
1618

1619
  SpinRLockGuard guard(lock_);
1620

1621
  if (IS_NOT_INIT) {
1622
    ret = OB_NOT_INIT;
1623
    LOG_WARN("not init", K(ret));
1624
  } else if (OB_SYS_TENANT_ID != tenant_id) {
1625
    is_hidden_sys = false;
1626
  } else if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
1627
    LOG_WARN("fail to get tennat", K(ret), K(tenant_id));
1628
  } else {
1629
    is_hidden_sys = tenant->is_hidden();
1630
  }
1631

1632
  return ret;
1633
}
1634

1635
int ObMultiTenant::mark_del_tenant(const uint64_t tenant_id)
1636
{
1637
  int ret = OB_SUCCESS;
1638
  ObTenant *tenant = NULL;
1639

1640
  SpinRLockGuard guard(lock_);
1641

1642
  if (IS_NOT_INIT) {
1643
    ret = OB_NOT_INIT;
1644
    LOG_WARN("not init", K(ret));
1645
  } else if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
1646
    if (OB_TENANT_NOT_IN_SERVER == ret) {
1647
      ret = OB_SUCCESS;
1648
      LOG_INFO("tenant has already been removed, no need to mark_del", KR(ret), K(tenant_id));
1649
    } else {
1650
      LOG_WARN("fail to get tenant", K(ret), K(tenant_id));
1651
    }
1652
  } else {
1653
    tenant->mark_tenant_is_removed();
1654
    // only write slog when del tenant, no need to write here
1655
  }
1656

1657
  return ret;
1658
}
1659

1660
// 确保remove_tenant函数可以重复调用, 因为在删除租户时失败会不断重试,
1661
// 这里只是删除内存结构,持久化的数据还在。
1662
int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &remove_tenant_succ)
1663
{
1664
  int ret = OB_SUCCESS;
1665
  int tmp_ret = OB_SUCCESS;
1666
  ObTenant *removed_tenant = nullptr;
1667
  remove_tenant_succ = false;
1668

1669
  if (IS_NOT_INIT) {
1670
    ret = OB_NOT_INIT;
1671
    LOG_WARN("not init", K(ret));
1672
  } else if (OB_FAIL(get_tenant(tenant_id, removed_tenant))) {
1673
    if (OB_TENANT_NOT_IN_SERVER == ret) {
1674
      LOG_WARN("tenant has been removed", K(tenant_id), K(ret));
1675
      removed_tenant = nullptr;
1676
      remove_tenant_succ = true;
1677
      ret = OB_SUCCESS;
1678
    } else {
1679
      LOG_WARN("remove tenant failed", K(tenant_id), K(ret));
1680
    }
1681
  } else if (OB_ISNULL(GCTX.session_mgr_)) {
1682
    ret = OB_ERR_UNEXPECTED;
1683
    LOG_ERROR("unexpected condition", K(ret));
1684
  } else {
1685
    LOG_INFO("removed_tenant begin to stop", K(tenant_id));
1686
    {
1687
      SpinWLockGuard guard(lock_); //add a lock when set tenant stop, omt will check tenant has stop before calling timeup()
1688
      removed_tenant->stop();
1689
    }
1690
    if (!is_virtual_tenant_id(tenant_id)) {
1691
      LOG_INFO("removed_tenant begin to kill tenant session", K(tenant_id));
1692
      if (OB_FAIL(GCTX.session_mgr_->kill_tenant(tenant_id))) {
1693
        LOG_ERROR("fail to kill tenant session", K(ret), K(tenant_id));
1694
        {
1695
          SpinWLockGuard guard(lock_);
1696
          removed_tenant->start();
1697
        }
1698
      }
1699
    }
1700
  }
1701

1702
  if (OB_SUCC(ret) && OB_NOT_NULL(removed_tenant)) {
1703
    ObLDHandle handle;
1704
    if (OB_FAIL(removed_tenant->try_wait())) {
1705
      LOG_WARN("remove tenant try_wait failed", K(ret), K(tenant_id));
1706
    } else if (OB_FAIL(removed_tenant->try_wrlock(handle))) {
1707
      LOG_WARN("can't get tenant wlock to remove tenant", K(ret), K(tenant_id),
1708
          KP(removed_tenant), K(removed_tenant->lock_));
1709
      removed_tenant->lock_.ld_.print();
1710
    } else {
1711
      ObTenant *removed_tenant_tmp = nullptr;
1712
      SpinWLockGuard guard(lock_);
1713
      // This locking should be held after tenant->wait
1714
      // because there maybe locking during tenant thread stopping.
1715

1716
      if (OB_FAIL(tenants_.remove_if(tenant_id, compare_with_tenant_id, equal_with_tenant_id, removed_tenant_tmp))) {
1717
        LOG_WARN("fail to remove tenant", K(tenant_id), K(ret));
1718
      } else if (removed_tenant_tmp != removed_tenant) {
1719
        ret = OB_ERR_UNEXPECTED;
1720
        LOG_WARN("must be same tenant", K(tenant_id), K(ret));
1721
      } else {
1722
        remove_tenant_succ = true;
1723
      }
1724
    }
1725

1726
    if (OB_SUCC(ret) && OB_NOT_NULL(GCTX.dblink_proxy_)) {
1727
      if (OB_FAIL(GCTX.dblink_proxy_->clean_dblink_connection(tenant_id))) {
1728
        LOG_WARN("failed to clean dblink connection", K(ret), K(tenant_id));
1729
      }
1730
    }
1731

1732
    if (OB_SUCC(ret)) {
1733
      const share::ObUnitInfoGetter::ObTenantConfig &config = removed_tenant->get_unit();
1734
      const int64_t log_disk_size = config.config_.log_disk_size();
1735
      if (!is_virtual_tenant_id(tenant_id)) {
1736
        GCTX.log_block_mgr_->remove_tenant(log_disk_size);
1737
      }
1738
      removed_tenant->destroy();
1739
      ob_delete(removed_tenant);
1740
      LOG_INFO("remove tenant success", K(tenant_id));
1741
    }
1742
  }
1743

1744
  if (OB_SUCC(ret)) {
1745
    ObMallocAllocator *malloc_allocator = ObMallocAllocator::get_instance();
1746
    if (OB_ISNULL(malloc_allocator)) {
1747
      ret = OB_INVALID_ARGUMENT;
1748
      LOG_ERROR("malloc allocator is NULL", K(ret));
1749
    } else {
1750
      auto& cache_washer = ObKVGlobalCache::get_instance();
1751
      if (OB_FAIL(cache_washer.sync_flush_tenant(tenant_id))) {
1752
        LOG_WARN("Fail to sync flush tenant cache", K(ret));
1753
      }
1754
    }
1755
  }
1756

1757
  if (OB_SUCC(ret)) {
1758
    if (is_virtual_tenant_id(tenant_id) &&
1759
        OB_FAIL(ObVirtualTenantManager::get_instance().del_tenant(tenant_id))) {
1760
      if (OB_ENTRY_NOT_EXIST == ret) {
1761
        ret = OB_SUCCESS;
1762
      } else {
1763
        LOG_WARN("virtual tenant manager delete tenant failed", K(ret), K(tenant_id));
1764
      }
1765
    }
1766
  }
1767
  if (OB_SUCC(ret)) {
1768
    if (OB_ISNULL(GCTX.disk_reporter_)) {
1769
      ret = OB_ERR_UNEXPECTED;
1770
      LOG_ERROR("disk reporter is null", K(ret));
1771
    } else if (OB_FAIL(GCTX.disk_reporter_->delete_tenant_usage_stat(tenant_id))) {
1772
      LOG_WARN("failed to delete_tenant_usage_stat", K(ret), K(tenant_id));
1773
    }
1774
  }
1775

1776
  if (OB_SUCC(ret)) {
1777
    // only report event when ret = success
1778
    ROOTSERVICE_EVENT_ADD("remove_tenant", "remove_tenant",
1779
        "tenant_id", tenant_id,
1780
        "addr", GCTX.self_addr(),
1781
        "result", ret);
1782
  }
1783

1784
  if (OB_SUCC(ret) && OB_NOT_NULL(GCTX.dblink_proxy_)) {
1785
    if (OB_FAIL(GCTX.dblink_proxy_->clean_dblink_connection(tenant_id))) {
1786
      LOG_WARN("failed to clean dblink connection", K(ret), K(tenant_id));
1787
    }
1788
  }
1789
  return ret;
1790
}
1791

1792
int ObMultiTenant::clear_persistent_data(const uint64_t tenant_id)
1793
{
1794
  int ret = OB_SUCCESS;
1795
  char tenant_clog_dir[MAX_PATH_SIZE] = {0};
1796
  char tenant_slog_dir[MAX_PATH_SIZE] = {0};
1797
  bool exist = true;
1798

1799
  if (OB_FAIL(OB_FILE_SYSTEM_ROUTER.get_tenant_clog_dir(tenant_id, tenant_clog_dir))) {
1800
    LOG_WARN("fail to get tenant clog dir", K(ret));
1801
  } else if (OB_FAIL(FileDirectoryUtils::is_exists(tenant_clog_dir, exist))) {
1802
    LOG_WARN("fail to check exist", K(ret));
1803
  } else if (exist) {
1804
    // defense code begin
1805
    int tmp_ret = OB_SUCCESS;
1806
    bool directory_empty = true;
1807
    if (OB_TMP_FAIL(FileDirectoryUtils::is_empty_directory(tenant_clog_dir, directory_empty))) {
1808
      LOG_WARN("fail to check directory whether is empty", KR(tmp_ret), K(tenant_clog_dir));
1809
    }
1810
    if (!directory_empty) {
1811
      LOG_DBA_ERROR(OB_ERR_UNEXPECTED, "msg", "clog directory must be empty when delete tenant", K(tenant_clog_dir));
1812
    }
1813
    // defense code end
1814
    if (OB_FAIL(FileDirectoryUtils::delete_directory_rec(tenant_clog_dir))) {
1815
      LOG_WARN("fail to delete clog dir", K(ret), K(tenant_clog_dir));
1816
    }
1817
  }
1818

1819
  if (OB_FAIL(ret)) {
1820
    // do nothing
1821
  } else if (OB_FAIL(SLOGGERMGR.get_tenant_slog_dir(tenant_id, tenant_slog_dir))) {
1822
    LOG_WARN("fail to get tenant slog dir", K(ret));
1823
  } else if (OB_FAIL(FileDirectoryUtils::is_exists(tenant_slog_dir, exist))) {
1824
    LOG_WARN("fail to check exist", K(ret));
1825
  } else if (exist) {
1826
    if (OB_FAIL(FileDirectoryUtils::delete_directory_rec(tenant_slog_dir))) {
1827
      LOG_WARN("fail to delete slog dir", K(ret), K(tenant_slog_dir));
1828
    }
1829
  }
1830

1831
  return ret;
1832
}
1833

1834
int ObMultiTenant::del_tenant(const uint64_t tenant_id)
1835
{
1836
  int ret = OB_SUCCESS;
1837
  LOG_INFO("[DELETE_TENANT] OMT begin to delete tenant", K(tenant_id));
1838

1839
  ObTenant *tenant = nullptr;
1840
  bool lock_succ = false;
1841
  int64_t bucket_lock_idx = -1;
1842
  TIMEGUARD_INIT(SERVER_OMT, 60_s, 120_s); // report hung cost more than 120s
1843

1844
  if (IS_NOT_INIT) {
1845
    ret = OB_NOT_INIT;
1846
    LOG_WARN("not init", K(ret));
1847
  } else if (OB_FAIL(bucket_lock_.try_wrlock(bucket_lock_idx = get_tenant_lock_bucket_idx(tenant_id)))) {
1848
    LOG_WARN("fail to try_wrlock for delete tenant", K(ret), K(tenant_id), K(bucket_lock_idx));
1849
  } else if (FALSE_IT(lock_succ = true)) {
1850
  } else if (OB_FAIL(get_tenant(tenant_id, tenant))) {
1851
    LOG_WARN("fail to get tenant", K(ret), K(tenant_id));
1852
  } else if (tenant->is_hidden()) {
1853
    ret = OB_ERR_UNEXPECTED;
1854
    LOG_WARN("hidden tenant can't be deleted", K(ret), K(tenant_id));
1855
  } else  {
1856
    const ObUnitInfoGetter::ObTenantConfig local_unit = tenant->get_unit();
1857
    const ObUnitInfoGetter::ObUnitStatus local_unit_status = local_unit.unit_status_;
1858
    // add a event when try to gc for the first time
1859
    if (local_unit_status != ObUnitInfoGetter::ObUnitStatus::UNIT_WAIT_GC_IN_OBSERVER &&
1860
        local_unit_status != ObUnitInfoGetter::ObUnitStatus::UNIT_DELETING_IN_OBSERVER) {
1861
      SERVER_EVENT_ADD("unit", "start unit gc", "tenant_id", tenant_id,
1862
          "unit_id", local_unit.unit_id_, "unit_status", "DELETING");
1863
    }
1864

1865
    // Ensure to write delete_tenant_prepare_slog only once
1866
    if (local_unit_status != ObUnitInfoGetter::UNIT_DELETING_IN_OBSERVER) {
1867
      tenant->set_unit_status(ObUnitInfoGetter::UNIT_DELETING_IN_OBSERVER);
1868
      tenant->set_create_status(ObTenantCreateStatus::DELETING);
1869
      if (OB_FAIL(write_delete_tenant_prepare_slog(tenant_id))) {
1870
        LOG_WARN("fail to write delete tenant slog", K(ret), K(tenant_id), K(local_unit_status));
1871
        tenant->set_unit_status(local_unit_status);
1872
      }
1873
    }
1874

1875
    if (OB_SUCC(ret)) {
1876
      do {
1877
        // 保证remove_tenant, clear_persistent_data可以幂等重试,
1878
        // 如果失败会但不是加锁失败会一直无限重试, 保证如果prepare log写成功一定会有commit日志,
1879
        // 即使这个过程中宕机重启, 重启回放日志时会继续删除并且补一条delete commit log
1880
        bool remove_tenant_succ = false;
1881
        if (OB_FAIL(remove_tenant(tenant_id, remove_tenant_succ))) {
1882
          LOG_WARN("fail to remove tenant", K(ret), K(tenant_id));
1883
          // If lock failed, the tenant is not removed from tenants_list,
1884
          // Here can break and leave ObTenantNodeBalancer::check_del_tenant to retry again,
1885
          // in this case, the deletion of other tenants does not get stuck.
1886
          // Otherwise it will have to retry indefinitely here, because the tenant cannot be obtained
1887
          if (false == remove_tenant_succ) {
1888
            break;
1889
          } else {
1890
            SLEEP(1);
1891
          }
1892
        } else if (OB_FAIL(clear_persistent_data(tenant_id))) {
1893
          LOG_ERROR("fail to clear persistent_data", K(ret), K(tenant_id));
1894
          SLEEP(1);
1895
        } else if (OB_FAIL(write_delete_tenant_commit_slog(tenant_id))) {
1896
          LOG_WARN("fail to write delete tenant commit slog", K(ret), K(tenant_id));
1897
        }
1898
      } while (OB_FAIL(ret));
1899

1900
      if (OB_SUCC(ret)) {
1901
        lib::ObMallocAllocator::get_instance()->recycle_tenant_allocator(tenant_id);
1902
        // add a event when finish gc unit
1903
        SERVER_EVENT_ADD("unit", "finish unit gc", "tenant_id", tenant_id,
1904
            "unit_id", local_unit.unit_id_, "unit_status", "DELETED");
1905
      }
1906
    }
1907
  }
1908

1909
  if (lock_succ) {
1910
    bucket_lock_.unlock(bucket_lock_idx);
1911
  }
1912

1913
  LOG_INFO("[DELETE_TENANT] OMT finish delete tenant", KR(ret), K(tenant_id), K(bucket_lock_idx));
1914

1915
  return ret;
1916
}
1917

1918
int ObMultiTenant::convert_real_to_hidden_sys_tenant()
1919
{
1920
  int ret = OB_SUCCESS;
1921
  const uint64_t tenant_id = OB_SYS_TENANT_ID;
1922
  ObTenant *tenant = nullptr;
1923
  int64_t bucket_lock_idx = -1;
1924
  bool lock_succ = false;
1925
  ObTenantMeta tenant_meta;
1926

1927
  if (IS_NOT_INIT) {
1928
    ret = OB_NOT_INIT;
1929
    LOG_WARN("not init", K(ret));
1930
  } else if (OB_FAIL(get_tenant(tenant_id, tenant))) {
1931
    LOG_WARN("fail to get tenant", K(ret), K(tenant_id));
1932
  } else if (tenant->is_hidden()) {
1933
    ret = OB_ERR_UNEXPECTED;
1934
    LOG_WARN("has been hidden sys", K(ret));
1935
  } else if (OB_FAIL(construct_meta_for_hidden_sys(tenant_meta))) {
1936
    LOG_WARN("fail to construct_meta_for_hidden_sys", K(ret));
1937
  } else if (OB_FAIL(bucket_lock_.try_wrlock(bucket_lock_idx = get_tenant_lock_bucket_idx(tenant_id)))) {
1938
    LOG_WARN("fail to try_wrlock for delete tenant", K(ret), K(tenant_id), K(bucket_lock_idx));
1939
  } else if (FALSE_IT(lock_succ = true)) {
1940
  } else if (OB_FAIL(update_tenant_unit_no_lock(tenant_meta.unit_))) {
1941
    LOG_WARN("fail to update_tenant_unit_no_lock", K(ret), K(tenant_meta));
1942
  } else {
1943
    ObTenantSwitchGuard guard(tenant);
1944
    if (OB_FAIL(MTL(ObStorageLogger *)->get_active_cursor(tenant_meta.super_block_.replay_start_point_))) {
1945
      LOG_WARN("get slog current cursor fail", K(ret));
1946
    } else if (OB_UNLIKELY(!tenant_meta.super_block_.replay_start_point_.is_valid())) {
1947
      ret = OB_ERR_UNEXPECTED;
1948
      LOG_WARN("cur_cursor is invalid", K(ret), K(tenant_meta));
1949
    }
1950
  }
1951

1952
  if (OB_FAIL(ret)) {
1953
  } else if (OB_FAIL(ObServerCheckpointSlogHandler::get_instance()
1954
      .write_tenant_super_block_slog(tenant_meta.super_block_))) {
1955
    LOG_WARN("fail to write_tenant_super_block_slog", K(ret), K(tenant_meta));
1956
  } else {
1957
    tenant->set_tenant_super_block(tenant_meta.super_block_);
1958
  }
1959

1960
  if (lock_succ) {
1961
    bucket_lock_.unlock(bucket_lock_idx);
1962
  }
1963

1964
  LOG_INFO("[DELETE_TENANT] OMT finish convert_real_to_hidden_sys_tenant", K(ret), K(tenant_meta), K(bucket_lock_idx));
1965

1966
  return ret;
1967
}
1968

1969
int ObMultiTenant::update_tenant(uint64_t tenant_id, std::function<int(ObTenant&)> &&func)
1970
{
1971
  int ret = OB_SUCCESS;
1972
  ObTenant *tenant = nullptr;
1973
  SpinRLockGuard guard(lock_);
1974

1975
  if (IS_NOT_INIT) {
1976
    ret = OB_NOT_INIT;
1977
    LOG_WARN("not init", K(ret));
1978
  } else if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
1979
    LOG_WARN("get tenant by tenant id fail", K(ret));
1980
  } else {
1981
    ret = func(*tenant);
1982
  }
1983
  return ret;
1984
}
1985

1986
int ObMultiTenant::get_tenant(
1987
    const uint64_t tenant_id, ObTenant *&tenant) const
1988
{
1989
  SpinRLockGuard guard(lock_);
1990
  return get_tenant_unsafe(tenant_id, tenant);
1991
}
1992

1993
int ObMultiTenant::get_tenant_with_tenant_lock(
1994
  const uint64_t tenant_id, ObLDHandle &handle, ObTenant *&tenant) const
1995
{
1996
  SpinRLockGuard guard(lock_);
1997
  ObTenant *tenant_tmp = nullptr;
1998
  int ret = get_tenant_unsafe(tenant_id, tenant_tmp);
1999
  if (OB_SUCC(ret)) {
2000
    if (OB_FAIL(tenant_tmp->try_rdlock(handle))) {
2001
      if (tenant_tmp->has_stopped()) {
2002
        // in some cases this error code is handled specially
2003
        ret = OB_TENANT_NOT_IN_SERVER;
2004
        LOG_WARN("fail to try rdlock tenant", K(ret), K(tenant_id));
2005
      }
2006
    } else {
2007
      // assign tenant when get rdlock succ
2008
      tenant = tenant_tmp;
2009
    }
2010
    if (OB_UNLIKELY(tenant_tmp->has_stopped())) {
2011
      LOG_WARN("get rdlock when tenant has stopped", K(tenant_id), K(lbt()));
2012
    }
2013
  }
2014
  return ret;
2015
}
2016

2017
int ObMultiTenant::get_active_tenant_with_tenant_lock(
2018
  const uint64_t tenant_id, ObLDHandle &handle, ObTenant *&tenant) const
2019
{
2020
  SpinRLockGuard guard(lock_);
2021
  ObTenant *tenant_tmp = nullptr;
2022
  int ret = get_tenant_unsafe(tenant_id, tenant_tmp);
2023
  if (OB_SUCC(ret)) {
2024
    if (tenant_tmp->has_stopped()) {
2025
      ret = OB_TENANT_NOT_IN_SERVER;
2026
    } else if (OB_FAIL(tenant_tmp->try_rdlock(handle))) {
2027
      if (tenant_tmp->has_stopped()) {
2028
        // in some cases this error code is handled specially
2029
        ret = OB_TENANT_NOT_IN_SERVER;
2030
        LOG_WARN("fail to try rdlock tenant", K(ret), K(tenant_id));
2031
      }
2032
    } else {
2033
      // assign tenant when get rdlock succ
2034
      tenant = tenant_tmp;
2035
    }
2036
    if (OB_UNLIKELY(tenant_tmp->has_stopped())) {
2037
      LOG_WARN("get rdlock when tenant has stopped", K(tenant_id), K(lbt()));
2038
    }
2039
  }
2040
  return ret;
2041
}
2042

2043
int ObMultiTenant::get_tenant_unsafe( const uint64_t tenant_id, ObTenant *&tenant) const
2044
{
2045
  int ret = OB_SUCCESS;
2046

2047
  tenant = NULL;
2048
  for (TenantList::iterator it = tenants_.begin();
2049
       it != tenants_.end() && NULL == tenant;
2050
       it++) {
2051
    if (OB_ISNULL(*it)) {
2052
      // process the remains anyway
2053
      LOG_ERROR("unexpected condition");
2054
    } else if ((*it)->id() == tenant_id) {
2055
      tenant = *it;
2056
    }
2057
  }
2058

2059
  if (NULL == tenant) {
2060
    ret = OB_TENANT_NOT_IN_SERVER;
2061
  }
2062

2063
  return ret;
2064
}
2065

2066
int ObMultiTenant::write_create_tenant_prepare_slog(const ObTenantMeta &meta)
2067
{
2068
  int ret = OB_SUCCESS;
2069
  ObStorageLogParam log_param;
2070
  int32_t cmd = ObIRedoModule::gen_cmd(ObRedoLogMainType::OB_REDO_LOG_SERVER_TENANT,
2071
      ObRedoLogSubType::OB_REDO_LOG_CREATE_TENANT_PREPARE);
2072
  ObCreateTenantPrepareLog log_entry(*const_cast<ObTenantMeta*>(&meta));
2073
  log_param.data_ = &log_entry;
2074
  log_param.cmd_ = cmd;
2075
  if (OB_FAIL(server_slogger_->write_log(log_param))) {
2076
    LOG_WARN("failed to write put tenant slog", K(ret), K(log_param));
2077
  }
2078

2079
  return ret;
2080
}
2081
int ObMultiTenant::write_create_tenant_commit_slog(uint64_t tenant_id)
2082
{
2083
  int ret = OB_SUCCESS;
2084
  ObStorageLogParam log_param;
2085
  int32_t cmd = ObIRedoModule::gen_cmd(ObRedoLogMainType::OB_REDO_LOG_SERVER_TENANT,
2086
      ObRedoLogSubType::OB_REDO_LOG_CREATE_TENANT_COMMIT);
2087
  ObCreateTenantCommitLog log_entry(tenant_id);
2088
  log_param.data_ = &log_entry;
2089
  log_param.cmd_ = cmd;
2090
  if (OB_FAIL(server_slogger_->write_log(log_param))) {
2091
    LOG_WARN("failed to write slog", K(ret), K(log_param));
2092
  }
2093

2094
  return ret;
2095
}
2096
int ObMultiTenant::write_create_tenant_abort_slog(uint64_t tenant_id)
2097
{
2098
  int ret = OB_SUCCESS;
2099
  ObStorageLogParam log_param;
2100
  int32_t cmd = ObIRedoModule::gen_cmd(ObRedoLogMainType::OB_REDO_LOG_SERVER_TENANT,
2101
      ObRedoLogSubType::OB_REDO_LOG_CREATE_TENANT_ABORT);
2102
  ObCreateTenantAbortLog log_entry(tenant_id);
2103
  log_param.data_ = &log_entry;
2104
  log_param.cmd_ = cmd;
2105
  if (OB_FAIL(server_slogger_->write_log(log_param))) {
2106
    LOG_WARN("failed to write slog", K(ret), K(log_param));
2107
  }
2108

2109
  return ret;
2110
}
2111

2112
int ObMultiTenant::write_delete_tenant_prepare_slog(uint64_t tenant_id)
2113
{
2114
  int ret = OB_SUCCESS;
2115
  ObStorageLogParam log_param;
2116
  int32_t cmd = ObIRedoModule::gen_cmd(ObRedoLogMainType::OB_REDO_LOG_SERVER_TENANT,
2117
      ObRedoLogSubType::OB_REDO_LOG_DELETE_TENANT_PREPARE);
2118
  ObDeleteTenantPrepareLog log_entry(tenant_id);
2119
  log_param.data_ = &log_entry;
2120
  log_param.cmd_ = cmd;
2121
  if (OB_FAIL(server_slogger_->write_log(log_param))) {
2122
    LOG_WARN("failed to write slog", K(ret), K(log_param));
2123
  }
2124

2125
  return ret;
2126
}
2127
int ObMultiTenant::write_delete_tenant_commit_slog(uint64_t tenant_id)
2128
{
2129
  int ret = OB_SUCCESS;
2130
  ObStorageLogParam log_param;
2131
  int32_t cmd = ObIRedoModule::gen_cmd(ObRedoLogMainType::OB_REDO_LOG_SERVER_TENANT,
2132
      ObRedoLogSubType::OB_REDO_LOG_DELETE_TENANT_COMMIT);
2133
  ObDeleteTenantCommitLog log_entry(tenant_id);
2134
  log_param.data_ = &log_entry;
2135
  log_param.cmd_ = cmd;
2136
  if (OB_FAIL(server_slogger_->write_log(log_param))) {
2137
    LOG_WARN("failed to write slog", K(ret), K(log_param));
2138
  }
2139

2140
  return ret;
2141
}
2142

2143
int ObMultiTenant::write_update_tenant_unit_slog(const ObUnitInfoGetter::ObTenantConfig &unit)
2144
{
2145
  int ret = OB_SUCCESS;
2146
  ObStorageLogParam log_param;
2147
  int32_t cmd = ObIRedoModule::gen_cmd(ObRedoLogMainType::OB_REDO_LOG_SERVER_TENANT,
2148
      ObRedoLogSubType::OB_REDO_LOG_UPDATE_TENANT_UNIT);
2149
  ObUpdateTenantUnitLog log_entry(*const_cast<ObUnitInfoGetter::ObTenantConfig*>(&unit));
2150
  log_param.data_ = &log_entry;
2151
  log_param.cmd_ = cmd;
2152
  if (OB_FAIL(server_slogger_->write_log(log_param))) {
2153
    LOG_WARN("failed to write tenant unit slog", K(ret), K(log_param));
2154
  }
2155

2156
  return ret;
2157
}
2158

2159

2160
int ObMultiTenant::recv_request(const uint64_t tenant_id, ObRequest &req)
2161
{
2162
  int ret = OB_SUCCESS;
2163
  ObTenant *tenant = NULL;
2164
  SpinRLockGuard guard(lock_);
2165
  if (IS_NOT_INIT) {
2166
    ret = OB_NOT_INIT;
2167
    LOG_WARN("not init", K(ret));
2168
  } else if (!is_valid_tenant_id(tenant_id)) {
2169
    ret = OB_INVALID_ARGUMENT;
2170
    LOG_ERROR("invalid argument", K(ret), K(tenant_id));
2171
  } else if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
2172
    LOG_ERROR("get tenant failed", K(ret), K(tenant_id));
2173
  } else if (NULL == tenant) {
2174
    ret = OB_ERR_UNEXPECTED;
2175
    LOG_ERROR("tenant is null", K(ret), K(tenant_id));
2176
  } else if (OB_FAIL(tenant->recv_request(req))) {
2177
    LOG_ERROR("recv request failed", K(ret), K(tenant_id));
2178
  } else {
2179
    // do nothing
2180
  }
2181
  return ret;
2182
}
2183

2184
void ObMultiTenant::get_tenant_ids(TenantIdList &id_list)
2185
{
2186
  int ret = OB_SUCCESS;
2187
  SpinRLockGuard guard(lock_);
2188
  id_list.clear();
2189
  for (TenantList::iterator it = tenants_.begin();
2190
       it != tenants_.end() && OB_SUCCESS == ret;
2191
       it++) {
2192
    if (OB_ISNULL(*it)) {
2193
      ret = OB_ERR_UNEXPECTED;
2194
      LOG_ERROR("unexpected condition", K(ret), K(*it));
2195
    } else if (OB_FAIL(id_list.push_back((*it)->id()))) {
2196
      LOG_ERROR("push tenant id to id list fail", K(ret));
2197
    }
2198
    ret = OB_SUCCESS;  // process anyway
2199
  }
2200
}
2201

2202
int ObMultiTenant::get_mtl_tenant_ids(ObIArray<uint64_t> &tenant_ids)
2203
{
2204
  int ret = OB_SUCCESS;
2205
  SpinRLockGuard guard(lock_);
2206
  for (TenantList::iterator it = tenants_.begin();
2207
       it != tenants_.end() && OB_SUCC(ret);
2208
       it++) {
2209
    if (OB_ISNULL(*it)) {
2210
      LOG_ERROR("unexpected condition", K(*it));
2211
    } else if (is_virtual_tenant_id((*it)->id())) {
2212
      // do nothing
2213
    } else if (OB_FAIL(tenant_ids.push_back((*it)->id()))) {
2214
      LOG_ERROR("push tenant id to tenant_ids fail", K(ret));
2215
    }
2216
  }
2217
  return ret;
2218
}
2219

2220
int ObMultiTenant::for_each(std::function<int(ObTenant &)> func)
2221
{
2222
  int ret = OB_SUCCESS;
2223
  SpinRLockGuard guard(lock_);
2224
  for (TenantList::iterator it = tenants_.begin();
2225
       it != tenants_.end() && OB_SUCCESS == ret;
2226
       it++) {
2227
    if (OB_ISNULL(*it)) {
2228
      ret = OB_ERR_UNEXPECTED;
2229
      LOG_ERROR("unexpected condition", K(ret), K(*it));
2230
    } else if (OB_FAIL(func(**it))) {
2231
      LOG_ERROR("invoke func failed", K(ret), K(**it));
2232
    }
2233
  }
2234
  return ret;
2235
}
2236

2237
int ObMultiTenant::operate_in_each_tenant(const std::function<int()> &func, bool skip_virtual_tenant)
2238
{
2239
  int ret = OB_SUCCESS;
2240
  TenantIdList id_list;
2241
  get_tenant_ids(id_list);
2242
  MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
2243
  for (int64_t i = 0; i < id_list.size(); i++) {
2244
    auto id = id_list[i];
2245
    int tmp_ret = OB_SUCCESS;
2246
    if (skip_virtual_tenant && is_virtual_tenant_id(id)) {
2247
      continue;
2248
    }
2249
    if (OB_SUCCESS != (tmp_ret = guard.switch_to(id))) {
2250
      LOG_WARN("switch to tenant failed", K(tmp_ret), K(id));
2251
    } else if (OB_FAIL(func())) {
2252
      LOG_WARN("execute func failed", K(ret), K(id));
2253
    } else {
2254
    }
2255
  }
2256
  return ret;
2257
}
2258

2259
int ObMultiTenant::operate_each_tenant_for_sys_or_self(const std::function<int()> &func, bool skip_virtual_tenant)
2260
{
2261
  int ret = OB_SUCCESS;
2262
  if (MTL_ID() == OB_SYS_TENANT_ID) {
2263
    ret = operate_in_each_tenant(func, skip_virtual_tenant);
2264
  } else {
2265
    int id = MTL_ID();
2266
    if (skip_virtual_tenant && is_virtual_tenant_id(id)) {
2267
    } else if (OB_FAIL(func())) {
2268
      LOG_WARN("execute func failed", K(ret), K(id));
2269
    }
2270
  }
2271
  return ret;
2272
}
2273

2274
int ObMultiTenant::get_tenant_cpu_usage(const uint64_t tenant_id, double &usage) const
2275
{
2276
  int ret = OB_SUCCESS;
2277
  ObTenant *tenant = nullptr;
2278
  usage = 0.;
2279
  if (!lock_.try_rdlock()) {
2280
    ret = OB_EAGAIN;
2281
  } else {
2282
    if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
2283
    } else {
2284
      usage = tenant->get_token_usage() * tenant->unit_min_cpu();
2285
    }
2286
    lock_.unlock();
2287
  }
2288

2289
  return ret;
2290
}
2291

2292
int ObMultiTenant::get_tenant_worker_time(const uint64_t tenant_id, int64_t &worker_time) const
2293
{
2294
  int ret = OB_SUCCESS;
2295
  ObTenant *tenant = nullptr;
2296
  worker_time = 0.;
2297
  if (!lock_.try_rdlock()) {
2298
    ret = OB_EAGAIN;
2299
  } else {
2300
    if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
2301
    } else {
2302
      worker_time = tenant->get_worker_time();
2303
    }
2304
    lock_.unlock();
2305
  }
2306

2307
  return ret;
2308
}
2309

2310
int ObMultiTenant::get_tenant_cpu_time(const uint64_t tenant_id, int64_t &cpu_time) const
2311
{
2312
  int ret = OB_SUCCESS;
2313
  ObTenant *tenant = nullptr;
2314
  cpu_time = 0;
2315
  if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid()) {
2316
    ret = GCTX.cgroup_ctrl_->get_cpu_time(tenant_id, cpu_time);
2317
  } else {
2318
    if (!lock_.try_rdlock()) {
2319
      ret = OB_EAGAIN;
2320
    } else {
2321
      if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
2322
      } else {
2323
        cpu_time = tenant->get_cpu_time();
2324
      }
2325
      lock_.unlock();
2326
    }
2327
  }
2328
  return ret;
2329
}
2330

2331

2332
int ObMultiTenant::get_tenant_cpu(
2333
    const uint64_t tenant_id, double &min_cpu, double &max_cpu) const
2334
{
2335
  int ret = OB_SUCCESS;
2336
  ObTenant *tenant = NULL;
2337

2338
  if (!lock_.try_rdlock()) {
2339
    ret = OB_EAGAIN;
2340
  } else {
2341
    if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
2342
    } else if (NULL != tenant) {
2343
      min_cpu = tenant->unit_min_cpu();
2344
      max_cpu = tenant->unit_max_cpu();
2345
    }
2346
    lock_.unlock();
2347
  }
2348

2349
  return ret;
2350
}
2351

2352
void ObMultiTenant::run1()
2353
{
2354
  lib::set_thread_name("MultiTenant");
2355
  while (!has_set_stop()) {
2356
    {
2357
      SpinRLockGuard guard(lock_);
2358
      for (TenantList::iterator it = tenants_.begin(); it != tenants_.end(); it++) {
2359
        if (OB_ISNULL(*it)) {
2360
          LOG_ERROR_RET(OB_ERR_UNEXPECTED, "unexpected condition");
2361
        } else if ((*it)->has_stopped()) {
2362
          // skip stopped tenant
2363
        } else {
2364
          (*it)->timeup();
2365
        }
2366
      }
2367
    }
2368
    ob_usleep(TIME_SLICE_PERIOD);
2369

2370
    if (REACH_TIME_INTERVAL(10000000L)) {  // every 10s
2371
      SpinRLockGuard guard(lock_);
2372
      for (TenantList::iterator it = tenants_.begin(); it != tenants_.end(); it++) {
2373
        if (!OB_ISNULL(*it)) {
2374
          ObTaskController::get().allow_next_syslog();
2375
          LOG_INFO("dump tenant info", "tenant", **it);
2376
        }
2377
      }
2378
    }
2379
  }
2380
  LOG_INFO("OMT quit");
2381
}
2382

2383
uint32_t ObMultiTenant::get_tenant_lock_bucket_idx(const uint64_t tenant_id)
2384
{
2385
  uint64_t hash_tenant_id = tenant_id * 13;
2386
  return common::murmurhash(&hash_tenant_id, sizeof(uint64_t), 0) % OB_TENANT_LOCK_BUCKET_NUM;
2387
}
2388
int ObMultiTenant::check_if_unit_id_exist(const uint64_t unit_id, bool &exist)
2389
{
2390
  int ret = OB_SUCCESS;
2391
  exist = false;
2392
  SpinRLockGuard guard(lock_);
2393
  for (TenantList::iterator it = tenants_.begin(); it != tenants_.end() && OB_SUCCESS == ret; it++) {
2394
    ObTenant *tenant = *it;
2395
    if (OB_ISNULL(tenant)) {
2396
      ret = OB_ERR_UNEXPECTED;
2397
      LOG_ERROR("unexpected condition", K(ret), K(tenant));
2398
    } else if (tenant->is_hidden() || is_virtual_tenant_id(tenant->id())) {
2399
      // do nothing
2400
    } else if (tenant->get_unit_id() == unit_id) {
2401
      exist = true;
2402
      break;
2403
    }
2404
  }
2405
  return ret;
2406
}
2407

2408
int obmysql::sql_nio_add_cgroup(const uint64_t tenant_id)
2409
{
2410
  int ret = OB_SUCCESS;
2411
  if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread &&
2412
      nullptr != GCTX.cgroup_ctrl_ &&
2413
      OB_LIKELY(GCTX.cgroup_ctrl_->is_valid())) {
2414
    ret = GCTX.cgroup_ctrl_->add_self_to_cgroup(tenant_id, OBCG_SQL_NIO);
2415
  }
2416
  return ret;
2417
}
2418

2419
int ObSrvNetworkFrame::reload_tenant_sql_thread_config(const uint64_t tenant_id)
2420
{
2421
  int ret = OB_SUCCESS;
2422
  omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
2423

2424
  // reload tenant_sql_login_thread_count
2425
  int sql_login_thread_count = 0;
2426
  if (tenant_config.is_valid()) {
2427
    sql_login_thread_count = tenant_config->tenant_sql_login_thread_count;
2428
  }
2429

2430
  ObTenantBase *tenant = NULL;
2431
  MTL_SWITCH(tenant_id) {
2432
    if (0 == sql_login_thread_count) {
2433
      tenant = MTL_CTX();
2434
      sql_login_thread_count = tenant ? std::max((int)tenant->unit_min_cpu(), 1) : 1;
2435
    }
2436

2437
    QueueThread *mysql_queue = MTL(QueueThread *);
2438
    if (OB_NOT_NULL(mysql_queue) && mysql_queue->set_thread_count(sql_login_thread_count)) {
2439
      LOG_WARN("update tenant_sql_login_thread_count fail", K(ret));
2440
    }
2441
  }
2442

2443
  // // reload tenant_sql_net_thread_count
2444
  // int sql_net_thread_count = 0;
2445
  // if (tenant_config.is_valid()) {
2446
  //   sql_net_thread_count = tenant_config->tenant_sql_net_thread_count;
2447

2448
  //   MTL_SWITCH(tenant_id) {
2449
  //   if (0 == sql_net_thread_count) {
2450
  //     sql_net_thread_count =
2451
  //         NULL == tenant ? 1 : std::max((int)tenant->unit_min_cpu(), 1);
2452
  //   }
2453
  //     ObSqlNioServer *sql_nio_server = MTL(ObSqlNioServer *);
2454
  //     int cur_sql_net_thread_count =
2455
  //         sql_nio_server->get_nio()->get_thread_count();
2456
  //     if (sql_net_thread_count < cur_sql_net_thread_count) {
2457
  //       LOG_WARN("decrease tenant_sql_net_thread_count not allowed", K(ret),
2458
  //                K(sql_net_thread_count), K(cur_sql_net_thread_count));
2459
  //       tenant_config->tenant_sql_net_thread_count = cur_sql_net_thread_count;
2460
  //     } else if (OB_FAIL(
2461
  //                    sql_nio_server->set_thread_count(sql_net_thread_count))) {
2462
  //       LOG_WARN("update tenant_sql_net_thread_count fail", K(ret),
2463
  //                K(sql_net_thread_count));
2464
  //     }
2465
  //   }
2466

2467
  return ret;
2468
}
2469

2470
int ObSrvNetworkFrame::reload_sql_thread_config()
2471
{
2472
  int ret = OB_SUCCESS;
2473
  int cnt = deliver_.get_mysql_login_thread_count_to_set(
2474
      GCONF.sql_login_thread_count);
2475
  if (OB_FAIL(deliver_.set_mysql_login_thread_count(cnt))) {
2476
    LOG_WARN("update sql_login_thread_count error", K(ret));
2477
  }
2478

2479
  int sql_net_thread_count = (int)GCONF.sql_net_thread_count;
2480
  if (sql_net_thread_count == 0) {
2481
    if (GCONF.net_thread_count == 0) {
2482
      sql_net_thread_count = get_default_net_thread_count();
2483
    } else {
2484
      sql_net_thread_count = GCONF.net_thread_count;
2485
    }
2486
  }
2487

2488
  if (OB_NOT_NULL(obmysql::global_sql_nio_server)) {
2489
    int cur_sql_net_thread_count =
2490
        obmysql::global_sql_nio_server->get_nio()->get_thread_count();
2491
    if (sql_net_thread_count < cur_sql_net_thread_count) {
2492
      LOG_WARN("decrease sql_net_thread_count not allowed", K(ret),
2493
               K(sql_net_thread_count), K(cur_sql_net_thread_count));
2494
      GCONF.sql_net_thread_count = cur_sql_net_thread_count;
2495
    } else if (OB_FAIL(obmysql::global_sql_nio_server->set_thread_count(
2496
                   sql_net_thread_count))) {
2497
      LOG_WARN("update sql_net_thread_count error", K(ret));
2498
    }
2499
  }
2500

2501
  if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) {
2502
    omt::TenantIdList ids;
2503
    if (OB_ISNULL(GCTX.omt_)) {
2504
      ret = OB_ERR_UNEXPECTED;
2505
      LOG_WARN("null ptr", K(ret));
2506
    } else {
2507
      GCTX.omt_->get_tenant_ids(ids);
2508
      for (int64_t i = 0; i < ids.size(); i++) {
2509
        int tenant_id = ids[i];
2510
        if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) {
2511
          reload_tenant_sql_thread_config(tenant_id);
2512
        }
2513
      }
2514
    }
2515
  }
2516
  return ret;
2517
}
2518

2519
int ObSharedTimer::mtl_init(ObSharedTimer *&st)
2520
{
2521
  int ret = common::OB_SUCCESS;
2522
  if (st != NULL) {
2523
    int &tg_id = st->tg_id_;
2524
    if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::TntSharedTimer, tg_id))) {
2525
      LOG_WARN("init shared timer failed", K(ret));
2526
    }
2527
  }
2528
  return ret;
2529
}
2530

2531
int ObSharedTimer::mtl_start(ObSharedTimer *&st)
2532
{
2533
  int ret = common::OB_SUCCESS;
2534
  if (st != NULL) {
2535
    int &tg_id = st->tg_id_;
2536
    if (OB_FAIL(TG_START(tg_id))) {
2537
      LOG_WARN("init shared timer failed", K(ret), K(tg_id));
2538
    }
2539
  }
2540
  return ret;
2541
}
2542

2543
void ObSharedTimer::mtl_stop(ObSharedTimer *&st)
2544
{
2545
  if (st != NULL) {
2546
    int &tg_id = st->tg_id_;
2547
    if (tg_id > 0) {
2548
      TG_STOP(tg_id);
2549
    }
2550
  }
2551
}
2552

2553
void ObSharedTimer::mtl_wait(ObSharedTimer *&st)
2554
{
2555
  if (st != NULL) {
2556
    int &tg_id = st->tg_id_;
2557
    if (tg_id > 0) {
2558
      TG_WAIT_ONLY(tg_id);
2559
    }
2560
  }
2561
}
2562

2563
void ObSharedTimer::destroy()
2564
{
2565
  if (tg_id_ > 0) {
2566
    TG_DESTROY(tg_id_);
2567
    tg_id_ = -1;
2568
  }
2569
}
2570

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.