oceanbase

Форк
0
/
ob_tenant.cpp 
1962 строки · 65.7 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER_OMT
14
#include "ob_tenant.h"
15

16
#include "share/ob_define.h"
17
#include "lib/container/ob_vector.h"
18
#include "lib/time/ob_time_utility.h"
19
#include "lib/stat/ob_diagnose_info.h"
20
#include "lib/stat/ob_session_stat.h"
21
#include "share/config/ob_server_config.h"
22
#include "sql/engine/px/ob_px_admission.h"
23
#include "share/interrupt/ob_global_interrupt_call.h"
24
#include "ob_th_worker.h"
25
#include "ob_multi_tenant.h"
26
#include "observer/ob_server_struct.h"
27
#include "share/schema/ob_schema_getter_guard.h"
28
#include "share/schema/ob_schema_struct.h"
29
#include "share/schema/ob_schema_utils.h"
30
#include "share/resource_manager/ob_resource_manager.h"
31
#include "sql/engine/px/ob_px_target_mgr.h"
32
#include "logservice/palf/palf_options.h"
33
#include "sql/dtl/ob_dtl_fc_server.h"
34
#include "observer/mysql/ob_mysql_request_manager.h"
35
#include "observer/ob_srv_deliver.h"
36
#include "observer/ob_srv_network_frame.h"
37
#include "storage/tx/wrs/ob_tenant_weak_read_service.h"
38
#include "sql/engine/ob_tenant_sql_memory_manager.h"
39
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
40
#include "lib/worker.h"
41
#include "ob_tenant_mtl_helper.h"
42
#include "storage/ob_file_system_router.h"
43
#include "storage/slog/ob_storage_logger.h"
44
#include "storage/slog/ob_storage_logger_manager.h"
45
#include "storage/ob_file_system_router.h"
46
#include "common/ob_smart_var.h"
47
#include "rpc/obmysql/ob_sql_nio_server.h"
48
#include "rpc/obrpc/ob_rpc_stat.h"
49
#include "rpc/obrpc/ob_rpc_packet.h"
50
#include "lib/container/ob_array.h"
51
#include "share/rc/ob_tenant_module_init_ctx.h"
52
#include "share/resource_manager/ob_cgroup_ctrl.h"
53
#include "sql/engine/px/ob_px_worker.h"
54
#include "lib/thread/protected_stack_allocator.h"
55

56
using namespace oceanbase::lib;
57
using namespace oceanbase::common;
58
using namespace oceanbase::omt;
59
using namespace oceanbase::rpc;
60
using namespace oceanbase::share;
61
using namespace oceanbase::share::schema;
62
using namespace oceanbase::storage;
63
using namespace oceanbase::sql::dtl;
64
using namespace oceanbase::obrpc;
65

66
#define EXPAND_INTERVAL (1 * 1000 * 1000)
67
#define SHRINK_INTERVAL (1 * 1000 * 1000)
68
#define SLEEP_INTERVAL (60 * 1000 * 1000)
69

70
int64_t FASTSTACK_REQ_QUEUE_SIZE_THRESHOLD = INT64_MAX;
71

72
extern "C" {
73
int ob_pthread_create(void **ptr, void *(*start_routine) (void *), void *arg);
74
int ob_pthread_tryjoin_np(void *ptr);
75
}
76
void MultiLevelReqCnt::atomic_inc(const int32_t level)
77
{
78
  if (level < 0 || level >= MAX_REQUEST_LEVEL) {
79
    LOG_WARN_RET(OB_ERR_UNEXPECTED, "unexpected level", K(level));
80
  } else {
81
    ATOMIC_INC(&cnt_[level]);
82
  }
83
}
84

85
int ObPxPools::init(uint64_t tenant_id)
86
{
87
  static int PX_POOL_COUNT = 128; // 128 groups, generally enough
88
  int ret = OB_SUCCESS;
89
  tenant_id_ = tenant_id;
90
  ObMemAttr attr(tenant_id, "PxPoolBkt");
91
  if (OB_FAIL(pool_map_.create(PX_POOL_COUNT, attr, attr))) {
92
    LOG_WARN("fail init pool map", K(ret));
93
  }
94
  return ret;
95
}
96

97
int ObPxPools::get_or_create(int64_t group_id, ObPxPool *&pool)
98
{
99
  int ret = OB_SUCCESS;
100
  if (!pool_map_.created()) {
101
    ret = OB_NOT_INIT;
102
  } else if (OB_FAIL(pool_map_.get_refactored(group_id, pool))) {
103
    if (OB_HASH_NOT_EXIST == ret) {
104
      if (OB_FAIL(create_pool(group_id, pool))) {
105
        LOG_WARN("fail create pool", K(ret), K(group_id));
106
      }
107
    } else {
108
      LOG_WARN("fail get group id from hashmap", K(ret), K(group_id));
109
    }
110
  }
111
  return ret;
112
}
113

114
int ObPxPools::create_pool(int64_t group_id, ObPxPool *&pool)
115
{
116
  static constexpr uint64_t MAX_TASKS_PER_CPU = 1;
117
  int ret = OB_SUCCESS;
118
  common::SpinWLockGuard g(lock_);
119
  if (OB_FAIL(pool_map_.get_refactored(group_id, pool))) {
120
    if (OB_HASH_NOT_EXIST == ret) {
121
      pool = OB_NEW(ObPxPool, ObMemAttr(tenant_id_, "PxPool"));
122
      if (OB_ISNULL(pool)) {
123
        ret = common::OB_ALLOCATE_MEMORY_FAILED;
124
      } else {
125
        pool->set_tenant_id(tenant_id_);
126
        pool->set_group_id(group_id);
127
        pool->set_run_wrapper(MTL_CTX());
128
        if (OB_FAIL(pool->start())) {
129
          LOG_WARN("fail startup px pool", K(group_id), K(tenant_id_), K(ret));
130
        } else if (OB_FAIL(pool_map_.set_refactored(group_id, pool))) {
131
          LOG_WARN("fail set pool to hashmap", K(group_id), K(ret));
132
        }
133
      }
134
    } else {
135
      LOG_WARN("fail get group id from hashmap", K(ret), K(group_id));
136
    }
137
  }
138
  return ret;
139
}
140

141
int ObPxPools::thread_recycle()
142
{
143
  int ret = OB_SUCCESS;
144
  common::SpinWLockGuard g(lock_);
145
  ThreadRecyclePoolFunc recycle_pool_func;
146
  if (OB_FAIL(pool_map_.foreach_refactored(recycle_pool_func))) {
147
    LOG_WARN("failed to do foreach", K(ret));
148
  }
149
  return ret;
150
}
151

152
int ObPxPools::ThreadRecyclePoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv)
153
{
154
  int ret = OB_SUCCESS;
155
  int64_t &group_id = kv.first;
156
  ObPxPool *pool = kv.second;
157
  if (NULL == pool) {
158
    LOG_WARN("pool is null", K(group_id));
159
  } else {
160
    IGNORE_RETURN pool->thread_recycle();
161
  }
162
  return ret;
163
}
164

165
int ObPxPools::StopPoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv)
166
{
167
  int ret = OB_SUCCESS;
168
  int64_t &group_id = kv.first;
169
  ObPxPool *pool = kv.second;
170
  if (NULL == pool) {
171
    LOG_WARN("pool is null", K(group_id));
172
  } else {
173
    pool->stop();
174
    LOG_INFO("DEL_POOL_STEP_1: mark px pool stop succ!", K(group_id));
175
  }
176
  return ret;
177
}
178

179
int ObPxPools::DeletePoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv)
180
{
181
  int ret = OB_SUCCESS;
182
  int64_t &group_id = kv.first;
183
  ObPxPool *pool = kv.second;
184
  if (NULL == pool) {
185
    LOG_WARN("pool is null", K(group_id));
186
  } else {
187
    pool->wait();
188
    LOG_INFO("DEL_POOL_STEP_2: wait pool empty succ!", K(group_id));
189
    pool->destroy();
190
    LOG_INFO("DEL_POOL_STEP_3: pool destroy succ!", K(group_id), K(pool->get_queue_size()));
191
    common::ob_delete(pool);
192
  }
193
  return ret;
194
}
195

196
void ObPxPools::mtl_stop(ObPxPools *&pools)
197
{
198
  int ret = OB_SUCCESS;
199
  if (OB_ISNULL(pools)) {
200
    // pools will be null if it's creating tenant and failed.
201
    LOG_WARN("pools is null");
202
  } else {
203
    common::SpinWLockGuard g(pools->lock_);
204
    StopPoolFunc stop_pool_func;
205
    if (OB_FAIL(pools->pool_map_.foreach_refactored(stop_pool_func))) {
206
      LOG_WARN("failed to do foreach", K(ret));
207
    }
208
  }
209
}
210

211
void ObPxPools::destroy()
212
{
213
  int ret = OB_SUCCESS;
214
  common::SpinWLockGuard g(lock_);
215
  DeletePoolFunc free_pool_func;
216
  if (OB_FAIL(pool_map_.foreach_refactored(free_pool_func))) {
217
    LOG_WARN("failed to do foreach", K(ret));
218
  } else {
219
    pool_map_.destroy();
220
    tenant_id_ = OB_INVALID_ID;
221
  }
222
}
223

224
int ObPxPool::submit(const RunFuncT &func)
225
{
226
  int ret = OB_SUCCESS;
227
  if (!is_inited_) {
228
    queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
229
    is_inited_ = true;
230
  }
231
  disable_recycle();
232
  ATOMIC_INC(&concurrency_);
233
  if (ATOMIC_LOAD(&active_threads_) < ATOMIC_LOAD(&concurrency_)) {
234
    ret = OB_SIZE_OVERFLOW;
235
  } else {
236
    Task *t = OB_NEW(Task, ObMemAttr(tenant_id_, "PxTask"), func);
237
    if (OB_ISNULL(t)) {
238
      ret = OB_ALLOCATE_MEMORY_FAILED;
239
    } else if (OB_FAIL(queue_.push(static_cast<ObLink*>(t), 0))) {
240
      LOG_ERROR("px push queue failed", K(ret));
241
    }
242
  }
243
  if (ret != OB_SUCCESS) {
244
    ATOMIC_DEC(&concurrency_);
245
  }
246
  enable_recycle();
247
  return ret;
248
}
249

250
void ObPxPool::handle(ObLink *task)
251
{
252
  Task *t  = static_cast<Task*>(task);
253
  if (t == nullptr) {
254
    LOG_ERROR_RET(OB_INVALID_ARGUMENT, "px task is invalid");
255
  } else {
256
    bool need_exec = true;
257
    t->func_(need_exec);
258
    OB_DELETE(Task, "PxTask", t);
259
  }
260
  ATOMIC_DEC(&concurrency_);
261
}
262

263
void ObPxPool::set_px_thread_name()
264
{
265
  char buf[32];
266
  snprintf(buf, 32, "PX_G%ld", group_id_);
267
  ob_get_tenant_id() = tenant_id_;
268
  lib::set_thread_name(buf);
269
}
270

271
void ObPxPool::run(int64_t idx)
272
{
273
  ATOMIC_INC(&active_threads_);
274
  set_thread_idx(idx);
275
  // Create worker for current thread.
276
  ObPxWorker worker;
277
  Worker::set_worker_to_thread_local(&worker);
278
  run1();
279
}
280

281

282
void ObPxPool::run1()
283
{
284
  int ret = OB_SUCCESS;
285
  set_px_thread_name();
286
  ObTLTaGuard ta_guard(tenant_id_);
287
  auto *pm = common::ObPageManager::thread_local_instance();
288
  if (OB_LIKELY(nullptr != pm)) {
289
    pm->set_tenant_ctx(tenant_id_, common::ObCtxIds::DEFAULT_CTX_ID);
290
  }
291
  //ObTaTLCacheGuard ta_guard(tenant_id_);
292
  CLEAR_INTERRUPTABLE();
293
  ObCgroupCtrl *cgroup_ctrl = GCTX.cgroup_ctrl_;
294
  LOG_INFO("run px pool", K(group_id_), K(tenant_id_), K_(active_threads));
295
  if (nullptr != cgroup_ctrl && OB_LIKELY(cgroup_ctrl->is_valid())) {
296
    cgroup_ctrl->add_self_to_cgroup(tenant_id_, group_id_);
297
    LOG_INFO("add thread to group succ", K(tenant_id_), K(group_id_));
298
  }
299

300
	if (!is_inited_) {
301
    queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
302
    is_inited_ = true;
303
  }
304

305
  ObLink *task = nullptr;
306
  int64_t idle_time = 0;
307
  while (!Thread::current().has_set_stop()) {
308
	  if (!is_inited_) {
309
      ob_usleep(10 * 1000L);
310
    } else {
311
      if (OB_SUCC(queue_.pop(task, QUEUE_WAIT_TIME))) {
312
        handle(task);
313
        idle_time = 0; // reset recycle timer
314
      } else {
315
        idle_time += QUEUE_WAIT_TIME;
316
        // if idle for more than 10 min, exit thread
317
        try_recycle(idle_time);
318
      }
319
    }
320
  }
321
}
322

323
void ObPxPool::try_recycle(int64_t idle_time)
324
{
325
  // recycle thread policy:
326
  // 1. first N threads reserved for first 10 min idle period
327
  // 2. no thread reserved after 1 hour idle period
328
  //
329
  // impl. note: must ensure active_threads_ > concurrency_, otherwise may hang task
330
  const int N = 8;
331
  if ((idle_time > 10LL * 60 * 1000 * 1000 && get_thread_count() >= N)
332
      || idle_time > 60LL * 60 * 1000 * 1000) {
333
    if (OB_SUCCESS == recycle_lock_.trylock()) {
334
      if (ATOMIC_LOAD(&active_threads_) > ATOMIC_LOAD(&concurrency_)) {
335
        ATOMIC_DEC(&active_threads_);
336
        // when thread marked as stopped,
337
        // it will exit the event loop and recycled by background deamon
338
        Thread::current().stop();
339
      }
340
      recycle_lock_.unlock();
341
    }
342
  }
343
}
344

345
void ObPxPool::stop()
346
{
347
  int ret = OB_SUCCESS;
348
  Threads::stop();
349
  ObLink *task = nullptr;
350
  bool need_exec = false;
351
  while (OB_SUCC(queue_.pop(task, QUEUE_WAIT_TIME))) {
352
    Task *t  = static_cast<Task*>(task);
353
    if (OB_NOT_NULL(t)) {
354
      t->func_(need_exec);
355
      OB_DELETE(Task, "PxTask", t);
356
    }
357
    ATOMIC_DEC(&concurrency_);
358
  }
359
}
360

361
ObResourceGroup::ObResourceGroup(int32_t group_id, ObTenant* tenant, share::ObCgroupCtrl *cgroup_ctrl):
362
  ObResourceGroupNode(group_id),
363
  workers_lock_(tenant->workers_lock_),
364
  inited_(false),
365
  recv_req_cnt_(0),
366
  shrink_(false),
367
  token_change_ts_(0),
368
  nesting_worker_cnt_(0),
369
  tenant_(tenant),
370
  cgroup_ctrl_(cgroup_ctrl)
371
{
372
}
373

374
int ObResourceGroup::init()
375
{
376
  int ret = OB_SUCCESS;
377
  if (nullptr == tenant_) {
378
    ret = OB_ERR_UNEXPECTED;
379
    LOG_ERROR("group init failed");
380
  } else if (FALSE_IT(multi_level_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size))) {
381
    LOG_WARN("multi level queue set limit failed", K(ret), K(tenant_->id()), K(group_id_), K(*this));
382
  } else {
383
    req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
384
    inited_ = true;
385
  }
386
  return ret;
387
}
388

389
void ObResourceGroup::update_queue_size()
390
{
391
  req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
392
}
393

394
int ObResourceGroup::acquire_level_worker(int32_t level)
395
{
396
  int ret = OB_SUCCESS;
397
  ObTenantSwitchGuard guard(tenant_);
398

399
  if (level <= 0 || level > MAX_REQUEST_LEVEL) {
400
    ret = OB_ERR_UNEXPECTED;
401
    LOG_ERROR("unexpected level", K(level), K(tenant_->id()));
402
  } else {
403
    ObThWorker *w = nullptr;
404
    if (OB_FAIL(create_worker(w, tenant_, group_id_, level, true /*ignore max worker limit*/, this))) {
405
      LOG_WARN("create worker failed", K(ret));
406
    } else if (!nesting_workers_.add_last(&w->worker_node_)) {
407
      OB_ASSERT(false);
408
      ret = OB_ERR_UNEXPECTED;
409
      LOG_ERROR("add worker to list fail", K(ret));
410
    }
411
  }
412
  return ret;
413
}
414

415

416
int ObResourceGroup::acquire_more_worker(int64_t num, int64_t &succ_num, bool force)
417
{
418
  int ret = OB_SUCCESS;
419
  ObTenantSwitchGuard guard(tenant_);
420

421
  const auto need_num = num;
422
  succ_num = 0;
423

424
  while (OB_SUCC(ret) && need_num > succ_num) {
425
    ObThWorker *w = nullptr;
426
    if (OB_FAIL(create_worker(w, tenant_, group_id_, INT32_MAX, force, this))) {
427
      LOG_WARN("create worker failed", K(ret));
428
    } else if (!workers_.add_last(&w->worker_node_)) {
429
      OB_ASSERT(false);
430
      ret = OB_ERR_UNEXPECTED;
431
      LOG_ERROR("add worker to list fail", K(ret));
432
    } else {
433
      succ_num++;
434
    }
435
  }
436

437
  if (need_num != num ||  // Reach worker count bound,
438
      succ_num != need_num  // or can't allocate enough worker.
439
     ) {
440
    if (TC_REACH_TIME_INTERVAL(10000000)) {
441
      LOG_WARN("Alloc group worker less than lack", K(num), K(need_num), K(succ_num));
442
    }
443
  }
444

445
  return ret;
446
}
447

448
void ObResourceGroup::check_worker_count()
449
{
450
  int ret = OB_SUCCESS;
451
  if (OB_SUCC(workers_lock_.trylock())) {
452
    if (is_user_group(group_id_)
453
      && nesting_worker_cnt_ < (MAX_REQUEST_LEVEL - GROUP_MULTI_LEVEL_THRESHOLD)) {
454
      for (int level = GROUP_MULTI_LEVEL_THRESHOLD + nesting_worker_cnt_; OB_SUCC(ret) && level < MAX_REQUEST_LEVEL; level++) {
455
        if (OB_SUCC(acquire_level_worker(level))) {
456
          nesting_worker_cnt_ = nesting_worker_cnt_ + 1;
457
        }
458
      }
459
    }
460
    int64_t now = ObTimeUtility::current_time();
461
    bool enable_dynamic_worker = true;
462
    int64_t threshold = 3 * 1000;
463
    {
464
      ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_->id()));
465
      enable_dynamic_worker = tenant_config.is_valid() ? tenant_config->_ob_enable_dynamic_worker : true;
466
      threshold = tenant_config.is_valid() ? tenant_config->_stall_threshold_for_dynamic_worker : 3 * 1000;
467
    }
468
    int64_t blocking_cnt = 0;
469
    DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
470
      const auto w = static_cast<ObThWorker*>(wnode->get_data());
471
      if (w->has_set_stop()) {
472
        workers_.remove(wnode);
473
        destroy_worker(w);
474
      } else if (w->has_req_flag()
475
                 && 0 != w->blocking_ts()
476
                 && now - w->blocking_ts() >= threshold
477
                 && enable_dynamic_worker) {
478
        ++blocking_cnt;
479
      }
480
    }
481

482
    int64_t target_min = 0;
483
    int64_t token = 0;
484
    bool is_group_critical = share::ObCgSet::instance().is_group_critical(group_id_);
485
    if (is_group_critical) {
486
      target_min = min_worker_cnt();
487
      token = 1 + blocking_cnt;
488
      token = std::min(token, max_worker_cnt());
489
      token = std::max(token, target_min);
490
    } else {
491
      target_min = std::min(req_queue_.size(), min_worker_cnt());
492
      if (blocking_cnt == 0 && req_queue_.size() == 0) {
493
        token = 0;
494
      } else {
495
        token = 1 + blocking_cnt;
496
        token = std::min(token, max_worker_cnt());
497
      }
498
    }
499

500
    int64_t succ_num = 0L;
501
    int64_t shrink_ts =
502
        (!is_group_critical && workers_.get_size() == 1 && token == 0) ? SLEEP_INTERVAL : SHRINK_INTERVAL;
503
    if (OB_UNLIKELY(workers_.get_size() < target_min)) {
504
      const int64_t diff = target_min - workers_.get_size();
505
      token_change_ts_ = now;
506
      ATOMIC_STORE(&shrink_, false);
507
      acquire_more_worker(diff, succ_num, /* force */ true);
508
      LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token));
509
    } else if (OB_UNLIKELY(workers_.get_size() < token) &&
510
               OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) >
511
                         ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05)) {
512
      ATOMIC_STORE(&shrink_, false);
513
      if (OB_LIKELY(now - token_change_ts_ >= EXPAND_INTERVAL)) {
514
        token_change_ts_ = now;
515
        acquire_more_worker(1, succ_num);
516
        LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token));
517
      }
518
    } else if (OB_UNLIKELY(workers_.get_size() > token) && OB_LIKELY(now - token_change_ts_ >= shrink_ts)) {
519
      token_change_ts_ = now;
520
      ATOMIC_STORE(&shrink_, true);
521
      LOG_INFO("worker thread began to shrink", K(tenant_->id()), K(group_id_), K(token));
522
    }
523
    IGNORE_RETURN workers_lock_.unlock();
524
  }
525
}
526

527
void ObResourceGroup::check_worker_count(ObThWorker &w)
528
{
529
  int ret = OB_SUCCESS;
530
  if (OB_UNLIKELY(ATOMIC_LOAD(&shrink_))
531
      && OB_LIKELY(ATOMIC_BCAS(&shrink_, true, false))) {
532
    w.stop();
533
    if (cgroup_ctrl_->is_valid() && OB_FAIL(cgroup_ctrl_->remove_self_from_cgroup(tenant_->id()))) {
534
      LOG_WARN("remove thread from cgroup failed", K(ret), "tenant:", tenant_->id(), K_(group_id));
535
    }
536
    LOG_INFO("worker thread exit", K(tenant_->id()), K(workers_.get_size()));
537
  }
538
}
539

540
int ObResourceGroup::clear_worker()
541
{
542
  int ret = OB_SUCCESS;
543
  ObMutexGuard guard(workers_lock_);
544
  while (req_queue_.size() > 0
545
    || (multi_level_queue_.get_total_size() > 0)) {
546
    ob_usleep(10L * 1000L);
547
  }
548
  while (nesting_workers_.get_size() > 0) {
549
    int ret = OB_SUCCESS;
550
    DLIST_FOREACH_REMOVESAFE(wnode, nesting_workers_) {
551
      ObThWorker *w = static_cast<ObThWorker*>(wnode->get_data());
552
      nesting_workers_.remove(wnode);
553
      destroy_worker(w);
554
    }
555
    if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
556
      LOG_INFO(
557
          "Tenant has some group nesting workers need stop",
558
          K(tenant_->id()),
559
          "group nesting workers", nesting_workers_.get_size(),
560
          "group id", get_group_id());
561
    }
562
  }
563
  while (workers_.get_size() > 0) {
564
    int ret = OB_SUCCESS;
565
    DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
566
      const auto w = static_cast<ObThWorker*>(wnode->get_data());
567
      workers_.remove(wnode);
568
      destroy_worker(w);
569
    }
570
    if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
571
      LOG_INFO(
572
          "Tenant has some group workers need stop",
573
          K(tenant_->id()),
574
          "group workers", workers_.get_size(),
575
          "group id", get_group_id());
576
    }
577
    ob_usleep(10L * 1000L);
578
  }
579
  return ret;
580
}
581

582
int GroupMap::create_and_insert_group(int32_t group_id, ObTenant *tenant, ObCgroupCtrl *cgroup_ctrl, ObResourceGroup *&group)
583
{
584
  int ret = OB_SUCCESS;
585
  if (nullptr == tenant
586
   || nullptr == cgroup_ctrl) {
587
    ret = OB_INVALID_ARGUMENT;
588
  } else {
589
    const int64_t alloc_size = sizeof(ObResourceGroup);
590
    ObResourceGroup *buf = nullptr;
591
    if (nullptr == (buf = (ObResourceGroup*)ob_malloc(alloc_size, ObMemAttr(tenant->id(), "ResourceGroup")))) {
592
      ret = OB_ALLOCATE_MEMORY_FAILED;
593
    } else {
594
      group = new(buf)ObResourceGroup(group_id, tenant, cgroup_ctrl);
595
      if (OB_FAIL(group->init())) {
596
        LOG_ERROR("group init failed", K(ret), K(group_id));
597
      } else if (OB_FAIL(err_code_map(insert(group)))) {
598
        LOG_WARN("groupmap insert group failed", K(ret), K(group->get_group_id()), K(tenant->id()));
599
      }
600
      if (OB_SUCCESS != ret) {
601
        group->~ObResourceGroup();
602
        ob_free(group);
603
      } else {
604
        group->check_worker_count();
605
      }
606
    }
607
  }
608
  return ret;
609
}
610

611
void GroupMap::wait_group()
612
{
613
  int ret = OB_SUCCESS;
614
  ObResourceGroupNode* iter = NULL;
615
  while (nullptr != (iter = quick_next(iter))) {
616
    ObResourceGroup *group = static_cast<ObResourceGroup*>(iter);
617
    if (OB_FAIL(group->clear_worker())) {
618
      LOG_ERROR("group clear worker failed", K(ret));
619
    }
620
  }
621
}
622

623
void GroupMap::destroy_group()
624
{
625
  int ret = OB_SUCCESS;
626
  ObResourceGroupNode* iter = NULL;
627
  while (nullptr != (iter = quick_next(iter))) {
628
    ObResourceGroup *group = static_cast<ObResourceGroup*>(iter);
629
    if (OB_SUCC(err_code_map(del(iter, iter)))) {
630
      group->~ObResourceGroup();
631
      ob_free(group);
632
      iter = NULL;
633
    } else {
634
      LOG_ERROR("drop group failed", K(ret));
635
    }
636
  }
637
}
638

639
int GroupMap::err_code_map(int err)
640
{
641
  int ret = OB_SUCCESS;
642
  switch (err) {
643
    case 0:          ret = OB_SUCCESS; break;
644
    case -ENOENT:    ret = OB_ENTRY_NOT_EXIST; break;
645
    case -EAGAIN:    ret = OB_EAGAIN; break;
646
    case -ENOMEM:    ret = OB_ALLOCATE_MEMORY_FAILED; break;
647
    case -EEXIST:    ret = OB_ENTRY_EXIST; break;
648
    case -EOVERFLOW: ret = OB_SIZE_OVERFLOW; break;
649
    default:         ret = OB_ERROR;
650
  }
651
  return ret;
652
}
653

654
int64_t RpcStatInfo::to_string(char *buf, const int64_t len) const
655
{
656
  int64_t pos = 0;
657
  int ret = OB_SUCCESS;
658
  struct PcodeDcount{
659
    obrpc::ObRpcPacketCode pcode_;
660
    int64_t dcount_;
661
    bool operator <(const PcodeDcount &other) const { return dcount_ > other.dcount_; }
662
    int64_t to_string(char* buf, const int64_t len) const { UNUSED(buf); UNUSED(len); return 0L; }
663
  };
664
  SMART_VAR(ObArray<PcodeDcount>, pd_array) {
665
    ObRpcPacketSet &set = ObRpcPacketSet::instance();
666
    for (int64_t pcode_idx = 0; (OB_SUCCESS == ret) && (pcode_idx < ObRpcPacketSet::THE_PCODE_COUNT); pcode_idx++) {
667
      PcodeDcount pd_item;
668
      RpcStatItem item;
669
      if (OB_FAIL(rpc_stat_srv_.get(pcode_idx, item))) {
670
        //continue
671
      } else if (item.dcount_ != 0) {
672
        pd_item.pcode_ = set.pcode_of_idx(pcode_idx);
673
        pd_item.dcount_ = item.dcount_;
674
        if (OB_FAIL(pd_array.push_back(pd_item))) {
675
          //break
676
        }
677
      }
678
    }
679
    if (OB_SUCC(ret) && pd_array.size() > 0) {
680
      std::make_heap(pd_array.begin(), pd_array.end());
681
      std::sort_heap(pd_array.begin(), pd_array.end());
682
      for (int i = 0; i < min(5, pd_array.size()); i++) {
683
        databuff_printf(buf, len, pos, " pcode=0x%x:cnt=%ld",
684
          pd_array.at(i).pcode_, pd_array.at(i).dcount_);
685
      }
686
    }
687
  }
688
  for (int64_t pcode_idx = 0; pcode_idx < ObRpcPacketSet::THE_PCODE_COUNT; pcode_idx++) {
689
    RpcStatPiece piece;
690
    piece.reset_dcount_ = true;
691
    rpc_stat_srv_.add(pcode_idx, piece);
692
  }
693
  return pos;
694
}
695

696

697
ObTenant::ObTenant(const int64_t id,
698
                   const int64_t times_of_workers,
699
                   ObCgroupCtrl &cgroup_ctrl)
700
    : ObTenantBase(id, true),
701
      meta_lock_(),
702
      tenant_meta_(),
703
      shrink_(0),
704
      total_worker_cnt_(0),
705
      gc_thread_(nullptr),
706
      has_created_(false),
707
      stopped_(0),
708
      wait_mtl_finished_(false),
709
      req_queue_(),
710
      multi_level_queue_(nullptr),
711
      recv_hp_rpc_cnt_(0),
712
      recv_np_rpc_cnt_(0),
713
      recv_lp_rpc_cnt_(0),
714
      recv_mysql_cnt_(0),
715
      recv_task_cnt_(0),
716
      recv_sql_task_cnt_(0),
717
      recv_large_req_cnt_(0),
718
      pause_cnt_(0),
719
      resume_cnt_(0),
720
      recv_retry_on_lock_rpc_cnt_(0),
721
      recv_retry_on_lock_mysql_cnt_(0),
722
      tt_large_quries_(0),
723
      pop_normal_cnt_(0),
724
      group_map_(group_map_buf_, sizeof(group_map_buf_)),
725
      lock_(),
726
      rpc_stat_info_(nullptr),
727
      mtl_init_ctx_(nullptr),
728
      workers_lock_(common::ObLatchIds::TENANT_WORKER_LOCK),
729
      cgroup_ctrl_(cgroup_ctrl),
730
      disable_user_sched_(false),
731
      token_usage_(.0),
732
      token_usage_check_ts_(0),
733
      token_change_ts_(0),
734
      ctx_(nullptr),
735
      st_metrics_(),
736
      sql_limiter_(),
737
      worker_us_(0),
738
      cpu_time_us_(0)
739
{
740
  token_usage_check_ts_ = ObTimeUtility::current_time();
741
  lock_.set_diagnose(true);
742
}
743

744
ObTenant::~ObTenant() {}
745

746
int ObTenant::init_ctx()
747
{
748
  int ret = OB_SUCCESS;
749
  if (OB_FAIL(CREATE_ENTITY(ctx_, this))) {
750
    LOG_WARN("create tenant ctx failed", K(ret));
751
  } else if (OB_ISNULL(ctx_)) {
752
    ret = OB_ERR_UNEXPECTED;
753
    LOG_WARN("NULL ptr", K(ret));
754
  }
755
  return ret;
756
}
757

758
int ObTenant::init(const ObTenantMeta &meta)
759
{
760
  int ret = OB_SUCCESS;
761

762
  if (OB_FAIL(ObTenantBase::init(&cgroup_ctrl_))) {
763
    LOG_WARN("fail to init tenant base", K(ret));
764
  } else if (FALSE_IT(req_queue_.set_limit(GCONF.tenant_task_queue_size))) {
765
  } else if (OB_ISNULL(multi_level_queue_ = OB_NEW(ObMultiLevelQueue, ObMemAttr(id_, "MulLevelQueue")))) {
766
    ret = OB_ALLOCATE_MEMORY_FAILED;
767
    LOG_WARN("alloc ObMultiLevelQueue failed", K(ret), K(*this));
768
  } else if (FALSE_IT(multi_level_queue_->set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size))) {
769
  } else if (OB_ISNULL(rpc_stat_info_ = OB_NEW(RpcStatInfo, ObMemAttr(id_, "RpcStatInfo"), id_))) {
770
    ret = OB_ALLOCATE_MEMORY_FAILED;
771
    LOG_WARN("alloc RpcStatInfo failed", K(ret), K(*this));
772
  } else if (OB_FAIL(construct_mtl_init_ctx(meta, mtl_init_ctx_))) {
773
    LOG_WARN("construct_mtl_init_ctx failed", KR(ret), K(*this));
774
  } else {
775
    ObTenantBase::mtl_init_ctx_ = mtl_init_ctx_;
776
    tenant_meta_ = meta;
777
    set_unit_min_cpu(meta.unit_.config_.min_cpu());
778
    set_unit_max_cpu(meta.unit_.config_.max_cpu());
779
    const int64_t memory_size = static_cast<double>(tenant_meta_.unit_.config_.memory_size());
780
    set_unit_memory_size(memory_size);
781
    constexpr static int64_t MINI_MEM_UPPER = 1L<<30; // 1G
782
    update_mini_mode(memory_size <= MINI_MEM_UPPER);
783

784
    if (!is_virtual_tenant_id(id_)) {
785
      if (OB_FAIL(create_tenant_module())) {
786
        // do nothing
787
      } else if (OB_FAIL(OB_PX_TARGET_MGR.add_tenant(id_))) {
788
        LOG_WARN("add tenant into px target mgr failed", K(ret), K(id_));
789
      } else if (OB_FAIL(G_RES_MGR.get_col_mapping_rule_mgr().add_tenant(id_))) {
790
        LOG_WARN("add tenant into res col maping rule mgr failed", K(ret), K(id_));
791
      }
792
    } else {
793
      disable_user_sched(); // disable_user_sched for virtual tenant
794
    }
795
  }
796

797
  if (OB_SUCC(ret)) {
798
    int64_t succ_cnt = 0L;
799
    if (OB_FAIL(acquire_more_worker(2, succ_cnt))) {
800
      LOG_WARN("create worker in init failed", K(ret), K(succ_cnt));
801
    } else {
802
      // there must be 2 workers.
803
      static_cast<ObThWorker*>(workers_.get_first()->get_data())->set_priority_limit(QQ_HIGH);
804
      static_cast<ObThWorker*>(workers_.get_last()->get_data())->set_priority_limit(QQ_NORMAL);
805
      for (int level = MULTI_LEVEL_THRESHOLD; OB_SUCC(ret) && level < MAX_REQUEST_LEVEL; level++) {
806
        if (OB_SUCC(acquire_level_worker(1, succ_cnt, level))) {
807
          succ_cnt = 0L;
808
        }
809
      }
810
      timeup();
811
    }
812
  }
813

814
  if (OB_FAIL(ret)) {
815
    LOG_ERROR("fail to create tenant module", K(ret));
816
  } else {
817
    start();
818
  }
819

820
  return ret;
821
}
822

823
int ObTenant::construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantModuleInitCtx *&ctx)
824
{
825
  int ret = OB_SUCCESS;
826
  if (OB_ISNULL(ctx = OB_NEW(share::ObTenantModuleInitCtx, ObMemAttr(id_, "ModuleInitCtx")))) {
827
    ret = OB_ALLOCATE_MEMORY_FAILED;
828
    LOG_WARN("alloc ObTenantModuleInitCtx failed", K(ret));
829
  } else if (OB_FAIL(OB_FILE_SYSTEM_ROUTER.get_tenant_clog_dir(id_, mtl_init_ctx_->tenant_clog_dir_))) {
830
    LOG_ERROR("get_tenant_clog_dir failed", K(ret));
831
  } else {
832
    mtl_init_ctx_->palf_options_.disk_options_.log_disk_usage_limit_size_ = meta.unit_.config_.log_disk_size();
833
    mtl_init_ctx_->palf_options_.disk_options_.log_disk_utilization_threshold_ = 80;
834
    mtl_init_ctx_->palf_options_.disk_options_.log_disk_utilization_limit_threshold_ = 95;
835
    mtl_init_ctx_->palf_options_.disk_options_.log_disk_throttling_percentage_ = 100;
836
    mtl_init_ctx_->palf_options_.disk_options_.log_disk_throttling_maximum_duration_ = 2 * 60 * 60 * 1000 * 1000L;//2h
837
    mtl_init_ctx_->palf_options_.disk_options_.log_writer_parallelism_ = 3;
838
    ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
839
    if (OB_UNLIKELY(!tenant_config.is_valid())) {
840
      ret = is_virtual_tenant_id(id_) ? OB_SUCCESS : OB_ENTRY_NOT_EXIST;
841
    } else {
842
      mtl_init_ctx_->palf_options_.disk_options_.log_writer_parallelism_ = tenant_config->_log_writer_parallelism;
843
    }
844
    LOG_INFO("construct_mtl_init_ctx success", "palf_options", mtl_init_ctx_->palf_options_.disk_options_);
845
  }
846
  return ret;
847
}
848
bool ObTenant::is_hidden()
849
{
850
  TCRLockGuard guard(meta_lock_);
851
  return tenant_meta_.super_block_.is_hidden_;
852
}
853

854
ObTenantCreateStatus ObTenant::get_create_status()
855
{
856
  TCRLockGuard guard(meta_lock_);
857
  return tenant_meta_.create_status_;
858
}
859
void ObTenant::set_create_status(const ObTenantCreateStatus status)
860
{
861
  TCWLockGuard guard(meta_lock_);
862
  LOG_INFO("set create status",
863
      "tenant_id", id_,
864
      "unit_id", tenant_meta_.unit_.unit_id_,
865
      "new_status", status,
866
      "old_status", tenant_meta_.create_status_,
867
      K_(tenant_meta));
868
  tenant_meta_.create_status_ = status;
869
}
870

871
ObTenantMeta ObTenant::get_tenant_meta()
872
{
873
  TCRLockGuard guard(meta_lock_);
874
  return tenant_meta_;
875
}
876

877
ObUnitInfoGetter::ObTenantConfig ObTenant::get_unit()
878
{
879
  TCRLockGuard guard(meta_lock_);
880
  return tenant_meta_.unit_;
881
}
882

883
uint64_t ObTenant::get_unit_id()
884
{
885
  TCRLockGuard guard(meta_lock_);
886
  return tenant_meta_.unit_.unit_id_;
887
}
888

889
ObTenantSuperBlock ObTenant::get_super_block()
890
{
891
  TCRLockGuard guard(meta_lock_);
892
  return tenant_meta_.super_block_;
893
}
894

895
void ObTenant::set_tenant_unit(const ObUnitInfoGetter::ObTenantConfig &unit)
896
{
897
  TCWLockGuard guard(meta_lock_);
898
  tenant_meta_.unit_ = unit;
899
}
900

901
void ObTenant::set_tenant_super_block(const ObTenantSuperBlock &super_block)
902
{
903
  TCWLockGuard guard(meta_lock_);
904
  tenant_meta_.super_block_ = super_block;
905
}
906

907
Worker::CompatMode ObTenant::get_compat_mode() const
908
{
909
  TCRLockGuard guard(meta_lock_);
910
  return tenant_meta_.unit_.mode_;
911
}
912

913
void ObTenant::set_unit_status(const ObUnitInfoGetter::ObUnitStatus status)
914
{
915
  TCWLockGuard guard(meta_lock_);
916
  LOG_INFO("set unit status",
917
      "tenant_id", id_,
918
      "unit_id", tenant_meta_.unit_.unit_id_,
919
      "new_status", ObUnitInfoGetter::get_unit_status_str(status),
920
      "old_status", ObUnitInfoGetter::get_unit_status_str(tenant_meta_.unit_.unit_status_),
921
      K_(tenant_meta));
922
  tenant_meta_.unit_.unit_status_ = status;
923
}
924

925
ObUnitInfoGetter::ObUnitStatus  ObTenant::get_unit_status()
926
{
927
  TCRLockGuard guard(meta_lock_);
928
  return tenant_meta_.unit_.unit_status_;
929
}
930

931
void ObTenant::mark_tenant_is_removed()
932
{
933
  TCWLockGuard guard(meta_lock_);
934
  LOG_INFO("mark tenant is removed",
935
      "tenant_id", id_,
936
      "unit_id", tenant_meta_.unit_.unit_id_,
937
      K_(tenant_meta));
938
  tenant_meta_.unit_.is_removed_ = true;
939
}
940

941
ERRSIM_POINT_DEF(CREATE_MTL_MODULE_FAIL)
942
// 初始化租户各子模块,保证初始化同步执行,因为依赖线程局部变量和栈上变量
943
int ObTenant::create_tenant_module()
944
{
945
  int ret = OB_SUCCESS;
946
  const uint64_t &tenant_id = id_;
947
  const double max_cpu = static_cast<double>(tenant_meta_.unit_.config_.max_cpu());
948
  // set tenant ctx to thread_local
949
  ObTenantSwitchGuard guard(this);
950
  // set tenant init param
951
  FLOG_INFO("begin create mtl module>>>>", K(tenant_id), K(MTL_ID()));
952

953
  bool mtl_init = false;
954
  if (OB_FAIL(ObTenantBase::create_mtl_module())) {
955
    LOG_ERROR("create mtl module failed", K(tenant_id), K(ret));
956
  } else if (CREATE_MTL_MODULE_FAIL) {
957
    ret = CREATE_MTL_MODULE_FAIL;
958
    LOG_ERROR("create_tenant_module failed because of tracepoint CREATE_MTL_MODULE_FAIL",
959
              K(tenant_id), K(ret));
960
  } else if (FALSE_IT(ObTenantEnv::set_tenant(this))) {
961
    // 上面通过ObTenantSwitchGuard中会创建一个新的TenantBase线程局部变量,而不是存TenantBase的指针,
962
    // 目的是通过MTL()访问时减少一次内存跳转,但是设置的时mtl模块的指针还是nullptr, 所以在mtl创建完成时
963
    // 还需要设置一次。
964
  } else if (FALSE_IT(mtl_init = true)) {
965
  } else if (OB_FAIL(ObTenantBase::init_mtl_module())) {
966
    LOG_ERROR("init mtl module failed", K(tenant_id), K(ret));
967
  } else if (OB_FAIL(ObTenantBase::start_mtl_module())) {
968
    LOG_ERROR("start mtl module failed", K(tenant_id), K(ret));
969
  } else if (OB_FAIL(update_thread_cnt(max_cpu))) {
970
    LOG_ERROR("update mtl module thread cnt fail", K(tenant_id), K(ret));
971
  }
972

973

974
  FLOG_INFO("finish create mtl module>>>>", K(tenant_id), K(MTL_ID()), K(ret));
975

976
  if (OB_FAIL(ret)) {
977
    if (mtl_init) {
978
      ObTenantBase::stop_mtl_module();
979
      ObTenantBase::wait_mtl_module();
980
    }
981
    ObTenantBase::destroy_mtl_module();
982
  }
983

984
  return ret;
985
}
986

987
void ObTenant::sleep_and_warn(ObTenant* tenant)
988
{
989
  ob_usleep(10_ms);
990
  const int64_t ts = ObTimeUtility::current_time() - tenant->stopped_;
991
  if (ts >= 3L * 60 * 1000 * 1000 && TC_REACH_TIME_INTERVAL(3L * 60 * 1000 * 1000)) {
992
    LOG_ERROR_RET(OB_SUCCESS, "tenant destructed for too long time.", K_(tenant->id), K(ts));
993
  }
994
}
995

996
void* ObTenant::wait(void* t)
997
{
998
  int ret = OB_SUCCESS;
999
  ObTenant* tenant = (ObTenant*)t;
1000
  ob_get_tenant_id() = tenant->id_;
1001
  lib::set_thread_name("UnitGC");
1002
  lib::Thread::update_loop_ts();
1003
  tenant->handle_retry_req(true);
1004
  while (tenant->req_queue_.size() > 0
1005
    || (tenant->multi_level_queue_ != nullptr && tenant->multi_level_queue_->get_total_size() > 0)) {
1006
    sleep_and_warn(tenant);
1007
  }
1008
  while (tenant->workers_.get_size() > 0) {
1009
    if (OB_SUCC(tenant->workers_lock_.trylock())) {
1010
      DLIST_FOREACH_REMOVESAFE(wnode, tenant->workers_) {
1011
        const auto w = static_cast<ObThWorker*>(wnode->get_data());
1012
        tenant->workers_.remove(wnode);
1013
        destroy_worker(w);
1014
      }
1015
      IGNORE_RETURN tenant->workers_lock_.unlock();
1016
      if (REACH_TIME_INTERVAL(10_s)) {
1017
        LOG_INFO(
1018
            "Tenant has some workers need stop", K_(tenant->id),
1019
            "workers", tenant->workers_.get_size(),
1020
            K_(tenant->req_queue));
1021
      }
1022
    }
1023
    sleep_and_warn(tenant);
1024
  }
1025
  LOG_INFO("start remove nesting", K(tenant->nesting_workers_.get_size()), K_(tenant->id));
1026
  while (tenant->nesting_workers_.get_size() > 0) {
1027
    int ret = OB_SUCCESS;
1028
    if (OB_SUCC(tenant->workers_lock_.trylock())) {
1029
      DLIST_FOREACH_REMOVESAFE(wnode, tenant->nesting_workers_) {
1030
        auto w = static_cast<ObThWorker*>(wnode->get_data());
1031
        tenant->nesting_workers_.remove(wnode);
1032
        destroy_worker(w);
1033
      }
1034
      IGNORE_RETURN tenant->workers_lock_.unlock();
1035
      if (REACH_TIME_INTERVAL(10_s)) {
1036
        LOG_INFO(
1037
            "Tenant has some nesting workers need stop",
1038
            K_(tenant->id),
1039
            "nesting workers", tenant->nesting_workers_.get_size(),
1040
            K_(tenant->req_queue));
1041
      }
1042
    }
1043
    sleep_and_warn(tenant);
1044
  }
1045
  LOG_INFO("finish remove nesting", K(tenant->nesting_workers_.get_size()), K_(tenant->id));
1046
  LOG_INFO("start remove group_map", K_(tenant->id));
1047
  tenant->group_map_.wait_group();
1048
  LOG_INFO("finish remove group_map", K_(tenant->id));
1049
  if (!is_virtual_tenant_id(tenant->id_) && !tenant->wait_mtl_finished_) {
1050
    ObTenantSwitchGuard guard(tenant);
1051
    tenant->stop_mtl_module();
1052
    OB_PX_TARGET_MGR.delete_tenant(tenant->id_);
1053
    G_RES_MGR.get_col_mapping_rule_mgr().drop_tenant(tenant->id_);
1054
    tenant->wait_mtl_module();
1055
    tenant->wait_mtl_finished_ = true;
1056
  }
1057
  LOG_INFO("finish waiting", K_(tenant->id));
1058
  return nullptr;
1059
}
1060

1061
int ObTenant::try_wait()
1062
{
1063
  int ret = OB_SUCCESS;
1064
  if (OB_ISNULL(ATOMIC_LOAD(&gc_thread_))) {
1065
    if (!ATOMIC_BCAS(&has_created_, false, true)) {
1066
      // there will be double-try_wait when kill -15 or failure of locking,
1067
      // so we have to tolerate that and return OB_SUCCESS although it is not correct.
1068
      // ret = OB_ERR_UNEXPECTED;
1069
      LOG_WARN("try_wait again after wait successfully, there may be `kill -15` or failure of locking", K(id_), K(wait_mtl_finished_));
1070
    } else {
1071
      // it may takes too much time for killing session after remove_tenant, we should recalculate.
1072
      ATOMIC_STORE(&stopped_, ObTimeUtility::current_time()); // update, it is not 0 before here.
1073
      if (OB_FAIL(ob_pthread_create(&gc_thread_, wait, this))) {
1074
        ATOMIC_STORE(&has_created_, false);
1075
        LOG_ERROR("tenant gc thread create failed", K(ret), K(errno), K(id_));
1076
      } else {
1077
        ret = OB_EAGAIN;
1078
        LOG_INFO("tenant pthread_create gc thread successfully", K(id_), K(gc_thread_));
1079
      }
1080
    }
1081
  } else {
1082
    if (OB_FAIL(ob_pthread_tryjoin_np(gc_thread_))) {
1083
      LOG_WARN("tenant pthread_tryjoin_np failed", K(errno), K(id_));
1084
    } else {
1085
      ATOMIC_STORE(&gc_thread_, nullptr); // avoid try_wait again after wait success
1086
      LOG_INFO("tenant pthread_tryjoin_np successfully", K(id_));
1087
    }
1088
    const int64_t ts = ObTimeUtility::current_time() - stopped_;
1089
    // only warn for one time in all tenant.
1090
    if (ts >= 3L * 60 * 1000 * 1000 && REACH_TIME_INTERVAL(3L * 60 * 1000 * 1000)) {
1091
      LOG_ERROR_RET(OB_SUCCESS, "tenant destructed for too long time.", K_(id), K(ts));
1092
    }
1093
  }
1094
  return ret;
1095
}
1096

1097
void ObTenant::destroy()
1098
{
1099
  int tmp_ret = OB_SUCCESS;
1100
  if (ctx_ != nullptr) {
1101
    DESTROY_ENTITY(ctx_);
1102
    ctx_ = nullptr;
1103
  }
1104
  if (cgroup_ctrl_.is_valid()
1105
      && OB_SUCCESS != (tmp_ret = cgroup_ctrl_.remove_tenant_cgroup(id_))) {
1106
    LOG_WARN_RET(tmp_ret, "remove tenant cgroup failed", K(tmp_ret), K_(id));
1107
  }
1108
  group_map_.destroy_group();
1109
  ObTenantSwitchGuard guard(this);
1110
  destroy_mtl_module();
1111
  // 1.some mtl module(eg: ObDataAccessService) remove tmp file when destroy,
1112
  //   so remove_tenant_file must be after destroy_mtl_module.
1113
  // 2.there is tg in ObTmpTenantMemBlockManager, so remove_tenant_file must be before
1114
  //   ObTenantBase::destroy() in which tg leak is checked.
1115
  if (OB_TMP_FAIL(FILE_MANAGER_INSTANCE_V2.remove_tenant_file(id_))) {
1116
    if (OB_ENTRY_NOT_EXIST == tmp_ret) {
1117
      tmp_ret = OB_SUCCESS;
1118
    } else {
1119
      LOG_WARN_RET(tmp_ret, "fail to free tmp tenant file store", K(ret), K_(id));
1120
    }
1121
  }
1122
  ObTenantBase::destroy();
1123

1124
  if (nullptr != multi_level_queue_) {
1125
    common::ob_delete(multi_level_queue_);
1126
    multi_level_queue_ = nullptr;
1127
  }
1128
  if (nullptr != rpc_stat_info_) {
1129
    common::ob_delete(rpc_stat_info_);
1130
    rpc_stat_info_ = nullptr;
1131
  }
1132
  if (nullptr != mtl_init_ctx_) {
1133
    common::ob_delete(mtl_init_ctx_);
1134
    mtl_init_ctx_ = nullptr;
1135
  }
1136
}
1137

1138
void ObTenant::set_unit_max_cpu(double cpu)
1139
{
1140
  int tmp_ret = OB_SUCCESS;
1141
  unit_max_cpu_ = cpu;
1142
  int32_t cfs_period_us = 0;
1143
  int32_t cfs_period_us_new = 0;
1144
  if (!cgroup_ctrl_.is_valid() || is_meta_tenant(id_)) {
1145
    // do nothing
1146
  } else if (is_sys_tenant(id_)) {
1147
    int32_t sys_cfs_quota_us = -1;
1148
    if (OB_TMP_FAIL(cgroup_ctrl_.set_cpu_cfs_quota(sys_cfs_quota_us, id_))) {
1149
      LOG_WARN_RET(tmp_ret, "set sys tennat cpu cfs quota failed", K(tmp_ret), K_(id), K(sys_cfs_quota_us));
1150
    }
1151
  } else if (OB_TMP_FAIL(cgroup_ctrl_.get_cpu_cfs_period(cfs_period_us_new, id_, INT64_MAX))) {
1152
    LOG_WARN_RET(tmp_ret, "fail get cpu cfs period", K_(id));
1153
  } else {
1154
    uint32_t loop_times = 0;
1155
    // to avoid kernel scaling cfs_period_us after get cpu_cfs_period,
1156
    // we should check whether cfs_period_us has been changed after set cpu_cfs_quota.
1157
    while (OB_SUCCESS == tmp_ret && cfs_period_us_new != cfs_period_us) {
1158
      cfs_period_us = cfs_period_us_new;
1159
      int32_t cfs_quota_us = static_cast<int32_t>(cfs_period_us * cpu);
1160
      if (OB_TMP_FAIL(cgroup_ctrl_.set_cpu_cfs_quota(cfs_quota_us, id_))) {
1161
        LOG_WARN_RET(tmp_ret, "set cpu cfs quota failed", K_(id), K(cfs_quota_us));
1162
      } else if (OB_TMP_FAIL(cgroup_ctrl_.get_cpu_cfs_period(cfs_period_us_new, id_, INT64_MAX))) {
1163
        LOG_ERROR_RET(tmp_ret, "fail get cpu cfs period", K_(id));
1164
      } else {
1165
        loop_times++;
1166
        if (loop_times > 3) {
1167
          tmp_ret = OB_ERR_UNEXPECTED;
1168
          LOG_ERROR_RET(tmp_ret, "cpu_cfs_period has been always changing, thread may be hung", K_(id), K(cfs_period_us), K(cfs_period_us_new), K(cfs_quota_us));
1169
        }
1170
      }
1171
    }
1172
  }
1173
}
1174

1175
void ObTenant::set_unit_min_cpu(double cpu)
1176
{
1177
  int tmp_ret = OB_SUCCESS;
1178
  unit_min_cpu_ = cpu;
1179
  const double default_cpu_shares = 1024.0;
1180
  int32_t cpu_shares = static_cast<int32_t>(default_cpu_shares * cpu);
1181
  if (cgroup_ctrl_.is_valid()
1182
      && OB_SUCCESS != (tmp_ret = cgroup_ctrl_.set_cpu_shares(cpu_shares, id_))) {
1183
    LOG_WARN_RET(tmp_ret, "set cpu shares failed", K(tmp_ret), K_(id), K(cpu_shares));
1184
  }
1185
}
1186

1187
int64_t ObTenant::cpu_quota_concurrency() const
1188
{
1189
  ObTenantConfigGuard tenant_config(TENANT_CONF(id_));
1190
  return static_cast<int64_t>((tenant_config.is_valid() ? tenant_config->cpu_quota_concurrency : 4));
1191
}
1192

1193
int64_t ObTenant::min_worker_cnt() const
1194
{
1195
  ObTenantConfigGuard tenant_config(TENANT_CONF(id_));
1196
  return 2 + std::max(1L, static_cast<int64_t>(unit_min_cpu() * (tenant_config.is_valid() ? tenant_config->cpu_quota_concurrency : 4)));
1197
}
1198

1199
int64_t ObTenant::max_worker_cnt() const
1200
{
1201
  return std::max(tenant_meta_.unit_.config_.memory_size() / 20 / (GCONF.stack_size + (3 << 20) + (512 << 10)),
1202
                  150L);
1203
}
1204

1205
int ObTenant::get_new_request(
1206
    ObThWorker &w,
1207
    int64_t timeout,
1208
    rpc::ObRequest *&req)
1209
{
1210
  int ret = OB_SUCCESS;
1211
  ObLink* task = nullptr;
1212

1213
  req = nullptr;
1214
  int wk_level = 0;
1215
  Thread::WaitGuard guard(Thread::WAIT_IN_TENANT_QUEUE);
1216
  if (w.is_group_worker()) {
1217
    w.set_large_query(false);
1218
    w.set_curr_request_level(0);
1219
    wk_level = w.get_worker_level();
1220
    if (wk_level < 0 || wk_level >= MAX_REQUEST_LEVEL) {
1221
      ret = OB_ERR_UNEXPECTED;
1222
      LOG_ERROR("unexpected level", K(wk_level), K(id_));
1223
    } else if (wk_level >= MAX_REQUEST_LEVEL - 1) {
1224
      ret = w.get_group()->multi_level_queue_.pop_timeup(task, wk_level, timeout);
1225
      if ((ret == OB_SUCCESS && nullptr == task) || ret == OB_ENTRY_NOT_EXIST) {
1226
        ret = OB_ENTRY_NOT_EXIST;
1227
        usleep(10 * 1000L);
1228
      } else if (ret == OB_SUCCESS){
1229
        rpc::ObRequest *tmp_req = static_cast<rpc::ObRequest*>(task);
1230
        LOG_WARN("req is timeout and discard", "tenant_id", id_, K(tmp_req));
1231
      } else {
1232
        LOG_ERROR("pop queue err", "tenant_id", id_, K(ret));
1233
      }
1234
    } else if (w.is_level_worker()) {
1235
      ret = w.get_group()->multi_level_queue_.pop(task, wk_level, timeout);
1236
    } else {
1237
      for (int32_t level = MAX_REQUEST_LEVEL - 1; level >= GROUP_MULTI_LEVEL_THRESHOLD; level--) {
1238
        IGNORE_RETURN w.get_group()->multi_level_queue_.try_pop(task, level);
1239
        if (nullptr != task) {
1240
          ret = OB_SUCCESS;
1241
          break;
1242
        }
1243
      }
1244
      if (nullptr == task) {
1245
        ret = w.get_group()->req_queue_.pop(task, timeout);
1246
      }
1247
    }
1248
  } else {
1249
    w.set_large_query(false);
1250
    w.set_curr_request_level(0);
1251
    wk_level = w.get_worker_level();
1252
    if (wk_level < 0 || wk_level >= MAX_REQUEST_LEVEL) {
1253
      ret = OB_ERR_UNEXPECTED;
1254
      LOG_ERROR("unexpected level", K(wk_level), K(id_));
1255
    } else if (wk_level >= MAX_REQUEST_LEVEL - 1) {
1256
      ret = multi_level_queue_->pop_timeup(task, wk_level, timeout);
1257
      if ((ret == OB_SUCCESS && nullptr == task) || ret == OB_ENTRY_NOT_EXIST) {
1258
        ret = OB_ENTRY_NOT_EXIST; // If the pop comes out and finds that there is not enough time, then push the front back, ret is succ,
1259
                                  // But because of this situation, the subsequent processing strategy should be the same as the original queue itself is empty.
1260
                                  // So set ret to be the same as the queue empty situation, that is, set to entry not exist
1261
        ob_usleep(10 * 1000L);
1262
      } else if (ret == OB_SUCCESS){
1263
        rpc::ObRequest *tmp_req = static_cast<rpc::ObRequest*>(task);
1264
        LOG_WARN("req is timeout and discard", "tenant_id", id_, K(tmp_req));
1265
      } else {
1266
        LOG_ERROR("pop queue err", "tenant_id", id_, K(ret));
1267
      }
1268
    } else if (w.is_level_worker()) {
1269
      ret = multi_level_queue_->pop(task, wk_level, timeout);
1270
    } else {
1271
      if (w.is_default_worker()) {
1272
        for (int32_t level = MAX_REQUEST_LEVEL - 1; level >= 1; level--) { // Level 0 threads also need to look at the requests of non-level 0 queues first
1273
          IGNORE_RETURN multi_level_queue_->try_pop(task, level);
1274
          if (nullptr != task) {
1275
            ret = OB_SUCCESS;
1276
            break;
1277
          }
1278
        }
1279
      }
1280
      if (OB_ISNULL(task)) {
1281
        if (OB_UNLIKELY(w.is_high_priority())) {
1282
          // We must ensure at least one worker can process the highest
1283
          // priority task.
1284
          ret = req_queue_.pop_high(task, timeout);
1285
        } else if (OB_UNLIKELY(w.is_normal_priority())) {
1286
          // We must ensure at least number of tokens of workers which don't
1287
          // process low priority task.
1288
          ret = req_queue_.pop_normal(task, timeout);
1289
        } else {
1290
          // If large requests exist and this worker doesn't have LQT but
1291
          // can acquire, do it.
1292
          ATOMIC_INC(&pop_normal_cnt_);
1293
          ret = req_queue_.pop(task, timeout);
1294
        }
1295
      }
1296
    }
1297
  }
1298

1299
  if (OB_SUCC(ret)) {
1300
    EVENT_INC(REQUEST_DEQUEUE_COUNT);
1301
    if (nullptr == req && nullptr != task) {
1302
      req = static_cast<rpc::ObRequest*>(task);
1303
    }
1304
    if (nullptr != req) {
1305
      if (w.is_group_worker() && req->large_retry_flag()) {
1306
        w.set_large_query();
1307
      }
1308
      if (req->get_type() == ObRequest::OB_RPC) {
1309
        using obrpc::ObRpcPacket;
1310
        const ObRpcPacket &pkt
1311
          = static_cast<const ObRpcPacket&>(req->get_packet());
1312
        w.set_curr_request_level(pkt.get_request_level());
1313
      }
1314
    }
1315
  }
1316
  return ret;
1317
}
1318

1319
using oceanbase::obrpc::ObRpcPacket;
1320
inline bool is_high_prio(const ObRpcPacket &pkt)
1321
{
1322
  return pkt.get_priority() < 5;
1323
}
1324

1325
inline bool is_normal_prio(const ObRpcPacket &pkt)
1326
{
1327
  return pkt.get_priority() == 5;
1328
}
1329

1330
inline bool is_low_prio(const ObRpcPacket &pkt)
1331
{
1332
  return pkt.get_priority() > 5 && pkt.get_priority() < 10;
1333
}
1334

1335
inline bool is_ddl(const ObRpcPacket &pkt)
1336
{
1337
  return pkt.get_priority() == 10;
1338
}
1339

1340
inline bool is_warmup(const ObRpcPacket &pkt)
1341
{
1342
  return pkt.get_priority() == 11;
1343
}
1344

1345
int ObTenant::recv_group_request(ObRequest &req, int64_t group_id)
1346
{
1347
  int ret = OB_SUCCESS;
1348
  int64_t now = ObTimeUtility::current_time();
1349
  req.set_enqueue_timestamp(now);
1350
  ObResourceGroup* group = nullptr;
1351
  ObResourceGroupNode* node = nullptr;
1352
  ObResourceGroupNode key(group_id);
1353
  int req_level = 0;
1354
  if (OB_SUCC(GroupMap::err_code_map(group_map_.get(&key, node)))) {
1355
    group = static_cast<ObResourceGroup*>(node);
1356
  } else if (OB_FAIL(group_map_.create_and_insert_group(group_id, this,  &cgroup_ctrl_, group))) {
1357
    if (OB_ENTRY_EXIST == ret && OB_SUCC(GroupMap::err_code_map(group_map_.get(&key, node)))) {
1358
      group = static_cast<ObResourceGroup*>(node);
1359
    } else {
1360
      LOG_WARN("failed to create and insert group", K(ret), K(group_id), K(id_));
1361
    }
1362
  } else {
1363
    LOG_INFO("create group successfully", K_(id), K(group_id), K(group));
1364
  }
1365
  if (OB_SUCC(ret)) {
1366
    if (req.get_type() == ObRequest::OB_RPC) {
1367
      using obrpc::ObRpcPacket;
1368
      const ObRpcPacket &pkt
1369
          = static_cast<const ObRpcPacket&>(req.get_packet());
1370
      req_level = min(pkt.get_request_level(), MAX_REQUEST_LEVEL - 1);
1371
    }
1372
    if (req_level < 0) {
1373
      ret = OB_ERR_UNEXPECTED;
1374
      LOG_ERROR("unexpected level", K(req_level), K(id_), K(group_id));
1375
    } else if (is_user_group(group_id) && req_level >= GROUP_MULTI_LEVEL_THRESHOLD) {
1376
      group->recv_level_rpc_cnt_.atomic_inc(req_level);
1377
      if (OB_FAIL(group->multi_level_queue_.push(req, req_level, 0))) {
1378
        LOG_WARN("push request to queue fail", K(req_level), K(id_), K(group_id));
1379
      }
1380
    } else {
1381
      group->atomic_inc_recv_cnt();
1382
      if (OB_FAIL(group->req_queue_.push(&req, 0))) {
1383
        LOG_ERROR("push request to queue fail", K(id_), K(group_id));
1384
      }
1385
    }
1386
    int tmp_ret = OB_SUCCESS;
1387
    if (!share::ObCgSet::instance().is_group_critical(group_id) && 0 == group->workers_.get_size()) {
1388
      if (OB_SUCCESS == (tmp_ret = group->workers_lock_.trylock())) {
1389
        if (0 == group->workers_.get_size()) {
1390
          int64_t succ_num = 0L;
1391
          group->token_change_ts_ = now;
1392
          ATOMIC_STORE(&group->shrink_, false);
1393
          group->acquire_more_worker(1, succ_num, /* force */ true);
1394
          LOG_INFO("worker thread created", K(id()), K(group->group_id_));
1395
        }
1396
        IGNORE_RETURN group->workers_lock_.unlock();
1397
      } else {
1398
        LOG_WARN("failed to lock group workers", K(ret), K(id_), K(group_id));
1399
      }
1400
    }
1401
  }
1402
  return ret;
1403
}
1404

1405
int ObTenant::recv_request(ObRequest &req)
1406
{
1407
  int ret = OB_SUCCESS;
1408
  int req_level = 0;
1409
  if (has_stopped()) {
1410
    ret = OB_TENANT_NOT_IN_SERVER;
1411
    LOG_WARN("receive request but tenant has already stopped", K(ret), K(id_));
1412
  } else if (0 != req.get_group_id()) {
1413
    if (OB_FAIL(recv_group_request(req, req.get_group_id()))) {
1414
      LOG_ERROR("recv group request failed", K(ret), K(id_), K(req.get_group_id()));
1415
    }
1416
  } else {
1417
    // Request would been pushed into corresponding queue by rule.
1418
    //
1419
    //   1. RPC with high or normal priority goes into quick queue.
1420
    //   2. RPC with low priority, usually trivial task, goes into normal queue with low priority.
1421
    //   3. SQL goes into normal queue with normal priority.
1422
    //   4. Server task, session close task, goes into normal queue with high priority.
1423
    //
1424
    req.set_enqueue_timestamp(ObTimeUtility::current_time());
1425
    req.set_trace_point(ObRequest::OB_EASY_REQUEST_TENANT_RECEIVED);
1426
    switch (req.get_type()) {
1427
      case ObRequest::OB_RPC: {
1428
        using obrpc::ObRpcPacket;
1429
        const ObRpcPacket& pkt = static_cast<const ObRpcPacket&>(req.get_packet());
1430
        req_level = min(pkt.get_request_level(), MAX_REQUEST_LEVEL - 1); // Requests that exceed the limit are pushed to the highest-level queue
1431
        if (req_level < 0) {
1432
          ret = OB_ERR_UNEXPECTED;
1433
          LOG_ERROR("unexpected level", K(req_level), K(id_));
1434
        } else if (req_level >= MULTI_LEVEL_THRESHOLD) {
1435
          recv_level_rpc_cnt_.atomic_inc(req_level);
1436
          if (OB_FAIL(multi_level_queue_->push(req, req_level, 0))) {
1437
            LOG_WARN("push request to queue fail", K(ret), K(this));
1438
          }
1439
        } else {
1440
          // (0,5) High priority
1441
          //  [5,10) Normal priority
1442
          //  10 is the low priority used by ddl and should not appear here
1443
          //  11 Ultra-low priority for preheating
1444
          if (is_high_prio(pkt)) {  // the less number the higher priority
1445
            ATOMIC_INC(&recv_hp_rpc_cnt_);
1446
            if (OB_FAIL(req_queue_.push(&req, QQ_HIGH))) {
1447
              if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
1448
                LOG_WARN("push request to queue fail", K(ret), K(*this));
1449
              }
1450
            }
1451
          } else if (req.is_retry_on_lock())  {
1452
            ATOMIC_INC(&recv_retry_on_lock_rpc_cnt_);
1453
            if (OB_FAIL(req_queue_.push(&req, QQ_NORMAL))) {
1454
              LOG_WARN("push request to QQ_NORMAL queue fail", K(ret), K(this));
1455
            }
1456
          } else if (pkt.is_kv_request()) {
1457
            // the same as sql request, kv request use q4
1458
            ATOMIC_INC(&recv_np_rpc_cnt_);
1459
            if (OB_FAIL(req_queue_.push(&req, RQ_NORMAL))) {
1460
              LOG_WARN("push kv request to queue fail", K(ret), K(this));
1461
            }
1462
          } else if (is_normal_prio(pkt) || is_low_prio(pkt)) {
1463
            ATOMIC_INC(&recv_np_rpc_cnt_);
1464
            if (OB_FAIL(req_queue_.push(&req, QQ_LOW))) {
1465
              LOG_WARN("push request to queue fail", K(ret), K(this));
1466
            }
1467
          } else if (is_ddl(pkt)) {
1468
            ret = OB_ERR_UNEXPECTED;
1469
            LOG_WARN("priority 10 should not come here", K(ret));
1470
          } else if (is_warmup(pkt)) {
1471
            ATOMIC_INC(&recv_lp_rpc_cnt_);
1472
            if (OB_FAIL(req_queue_.push(&req, RQ_LOW))) {
1473
              LOG_WARN("push request to queue fail", K(ret), K(this));
1474
            }
1475
          } else {
1476
            ret = OB_ERR_UNEXPECTED;
1477
            LOG_ERROR("unexpected priority", K(ret), K(pkt.get_priority()));
1478
          }
1479
        }
1480
        break;
1481
      }
1482
      case ObRequest::OB_MYSQL: {
1483
        if (req.is_retry_on_lock()) {
1484
          ATOMIC_INC(&recv_retry_on_lock_mysql_cnt_);
1485
          if (OB_FAIL(req_queue_.push(&req, RQ_HIGH))) {
1486
            LOG_WARN("push request to RQ_HIGH queue fail", K(ret), K(this));
1487
          }
1488
        } else {
1489
          ATOMIC_INC(&recv_mysql_cnt_);
1490
          if (OB_FAIL(req_queue_.push(&req, RQ_NORMAL))) {
1491
            LOG_WARN("push request to queue fail", K(ret), K(this));
1492
          }
1493
        }
1494
        break;
1495
      }
1496
      case ObRequest::OB_TASK:
1497
      case ObRequest::OB_TS_TASK: {
1498
        ATOMIC_INC(&recv_task_cnt_);
1499
        if (OB_FAIL(req_queue_.push(&req, RQ_HIGH))) {
1500
          LOG_WARN("push request to queue fail", K(ret), K(this));
1501
        }
1502
        break;
1503
      }
1504
      case ObRequest::OB_SQL_TASK: {
1505
        ATOMIC_INC(&recv_sql_task_cnt_);
1506
        if (OB_FAIL(req_queue_.push(&req, RQ_NORMAL))) {
1507
          LOG_WARN("push request to queue fail", K(ret), K(this));
1508
        }
1509
        break;
1510
      }
1511
      default: {
1512
        ret = OB_ERR_UNEXPECTED;
1513
        LOG_ERROR("unknown request type", K(ret));
1514
        break;
1515
      }
1516
    }
1517
  }
1518

1519
  if (OB_SUCC(ret)) {
1520
    ObTenantStatEstGuard guard(id_);
1521
    EVENT_INC(REQUEST_ENQUEUE_COUNT);
1522
  }
1523

1524
  if (OB_SIZE_OVERFLOW == ret || req_queue_.size() >= FASTSTACK_REQ_QUEUE_SIZE_THRESHOLD) {
1525
    IGNORE_RETURN faststack();
1526
  }
1527

1528
  return ret;
1529
}
1530

1531
int ObTenant::recv_large_request(rpc::ObRequest &req)
1532
{
1533
  int ret = OB_SUCCESS;
1534
  req.set_enqueue_timestamp(ObTimeUtility::current_time());
1535
  req.set_large_retry_flag(true);
1536
  if (0 != req.get_group_id()) {
1537
    if (OB_FAIL(recv_group_request(req, req.get_group_id()))) {
1538
      LOG_WARN("tenant receive large retry request fail", K(ret));
1539
    }
1540
  } else if (OB_FAIL(recv_group_request(req, OBCG_LQ))){
1541
    LOG_ERROR("recv large request failed", K(id_));
1542
  } else {
1543
    ObTenantStatEstGuard guard(id_);
1544
    EVENT_INC(REQUEST_ENQUEUE_COUNT);
1545
  }
1546
  return ret;
1547
}
1548

1549
int ObTenant::push_retry_queue(rpc::ObRequest &req, const uint64_t timestamp)
1550
{
1551
  int ret = OB_SUCCESS;
1552
  if (has_stopped()) {
1553
    ret = OB_IN_STOP_STATE;
1554
    LOG_WARN("receive retry request but tenant has already stopped", K(ret), K(id_));
1555
  } else if (OB_FAIL(retry_queue_.push(req, timestamp))) {
1556
    LOG_ERROR("push retry queue failed", K(ret), K(id_));
1557
  }
1558
  return ret;
1559
}
1560

1561
int ObTenant::timeup()
1562
{
1563
  int ret = OB_SUCCESS;
1564
  ObLDHandle handle;
1565
  if (!has_stopped() && OB_SUCC(try_rdlock(handle))) {
1566
    // it may fail during drop tenant, try next time.
1567
    if (!has_stopped()) {
1568
      check_group_worker_count();
1569
      check_worker_count();
1570
      update_token_usage();
1571
      handle_retry_req();
1572
      update_queue_size();
1573
    }
1574
    IGNORE_RETURN unlock(handle);
1575
  }
1576
  return OB_SUCCESS;
1577
}
1578

1579
void ObTenant::handle_retry_req(bool need_clear)
1580
{
1581
  int ret = OB_SUCCESS;
1582
  ObLink* task = nullptr;
1583
  ObRequest *req = NULL;
1584
  while (OB_SUCC(retry_queue_.pop(task, need_clear))) {
1585
    req = static_cast<rpc::ObRequest*>(task);
1586
    if (OB_FAIL(recv_large_request(*req))) {
1587
      LOG_ERROR("tenant patrol push req fail", "tenant", id_);
1588
      break;
1589
    }
1590
  }
1591
}
1592

1593
void ObTenant::update_queue_size()
1594
{
1595
  ObResourceGroupNode* iter = NULL;
1596
  ObResourceGroup* group = nullptr;
1597
  while (NULL != (iter = group_map_.quick_next(iter))) {
1598
    group = static_cast<ObResourceGroup*>(iter);
1599
    group->update_queue_size();
1600
  }
1601
  req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
1602
  if (nullptr != multi_level_queue_) {
1603
    multi_level_queue_->set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
1604
  }
1605
}
1606

1607
void ObTenant::check_worker_count()
1608
{
1609
  int ret = OB_SUCCESS;
1610
  if (OB_SUCC(workers_lock_.trylock())) {
1611
    int64_t token = 3;
1612
    int64_t now = ObTimeUtility::current_time();
1613
    bool enable_dynamic_worker = true;
1614
    int64_t threshold = 3 * 1000;
1615
    {
1616
      ObTenantConfigGuard tenant_config(TENANT_CONF(id_));
1617
      enable_dynamic_worker = tenant_config.is_valid() ? tenant_config->_ob_enable_dynamic_worker : true;
1618
      threshold = tenant_config.is_valid() ? tenant_config->_stall_threshold_for_dynamic_worker : 3 * 1000;
1619
    }
1620
    // assume that high priority and normal priority were busy.
1621
    DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
1622
      const auto w = static_cast<ObThWorker*>(wnode->get_data());
1623
      if (w->has_set_stop()) {
1624
        workers_.remove(wnode);
1625
        destroy_worker(w);
1626
      } else if (w->has_req_flag()
1627
                 && 0 != w->blocking_ts()
1628
                 && now - w->blocking_ts() >= threshold
1629
                 && w->is_default_worker()
1630
                 && enable_dynamic_worker) {
1631
        ++token;
1632
      }
1633
    }
1634
    int64_t succ_num = 0L;
1635
    token = std::max(token, min_worker_cnt());
1636
    token = std::min(token, max_worker_cnt());
1637
    if (OB_UNLIKELY(workers_.get_size() < min_worker_cnt())) {
1638
      const auto diff = min_worker_cnt() - workers_.get_size();
1639
      token_change_ts_ = now;
1640
      ATOMIC_STORE(&shrink_, false);
1641
      acquire_more_worker(diff, succ_num, /* force */ true);
1642
      LOG_INFO("worker thread created", K(id_), K(token));
1643
    } else if (OB_UNLIKELY(token > workers_.get_size())
1644
               && OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(id_) > ObMallocAllocator::get_instance()->get_tenant_limit(id_) * 0.05)) {
1645
      ATOMIC_STORE(&shrink_, false);
1646
      if (OB_LIKELY(now - token_change_ts_ >= EXPAND_INTERVAL)) {
1647
        token_change_ts_ = now;
1648
        acquire_more_worker(1, succ_num);
1649
        LOG_INFO("worker thread created", K(id_), K(token));
1650
      }
1651
    } else if (OB_UNLIKELY(token < workers_.get_size())
1652
               && OB_LIKELY(now - token_change_ts_ >= SHRINK_INTERVAL)) {
1653
      token_change_ts_ = now;
1654
      ATOMIC_STORE(&shrink_, true);
1655
      LOG_INFO("worker thread began to shrink", K(id_), K(token));
1656
    }
1657
    IGNORE_RETURN workers_lock_.unlock();
1658
  }
1659

1660
  if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread &&
1661
      (is_sys_tenant(id_) || is_user_tenant(id_))) {
1662
    GCTX.net_frame_->reload_tenant_sql_thread_config(id_);
1663
  }
1664
}
1665

1666
void ObTenant::check_group_worker_count()
1667
{
1668
  ObResourceGroupNode* iter = NULL;
1669
  ObResourceGroup* group = nullptr;
1670
  while (NULL != (iter = group_map_.quick_next(iter))) {
1671
    group = static_cast<ObResourceGroup*>(iter);
1672
    group->check_worker_count();
1673
  }
1674
}
1675

1676
void ObTenant::check_worker_count(ObThWorker &w)
1677
{
1678
  int ret = OB_SUCCESS;
1679
  if (OB_LIKELY(w.is_default_worker())
1680
      && OB_UNLIKELY(ATOMIC_LOAD(&shrink_))
1681
      && OB_LIKELY(ATOMIC_BCAS(&shrink_, true, false))) {
1682
    w.stop();
1683
    if (cgroup_ctrl_.is_valid() && OB_FAIL(cgroup_ctrl_.remove_self_from_cgroup(id_))) {
1684
      LOG_WARN("remove thread from cgroup failed", K(ret), K_(id));
1685
    }
1686
    LOG_INFO("worker thread exit", K(id_), K(workers_.get_size()));
1687
  }
1688
}
1689

1690
int ObTenant::acquire_level_worker(int64_t num, int64_t &succ_num, int32_t level)
1691
{
1692
  int ret = OB_SUCCESS;
1693
  ObTenantSwitchGuard guard(this);
1694

1695
  const auto need_num = num;
1696
  succ_num = 0;
1697

1698
  if (level <= 0 || level > MAX_REQUEST_LEVEL) {
1699
    ret = OB_ERR_UNEXPECTED;
1700
    LOG_ERROR("unexpected level", K(level), K(id_));
1701
  } else {
1702
    while (OB_SUCC(ret) && need_num > succ_num) {
1703
      ObThWorker *w = nullptr;
1704
      if (OB_FAIL(create_worker(w, this, 0, level, true))) {
1705
        LOG_WARN("create worker failed", K(ret));
1706
      } else if (!nesting_workers_.add_last(&w->worker_node_)) {
1707
        OB_ASSERT(false);
1708
        ret = OB_ERR_UNEXPECTED;
1709
        LOG_ERROR("add worker to list fail", K(ret));
1710
      } else {
1711
        succ_num++;
1712
      }
1713
    }
1714
  }
1715

1716
  if (need_num != num ||  // Reach worker count bound,
1717
      succ_num != need_num  // or can't allocate enough worker.
1718
     ) {
1719
    if (TC_REACH_TIME_INTERVAL(10000000)) {
1720
      LOG_WARN("Alloc level worker less than lack", K(num), K(need_num), K(succ_num));
1721
    }
1722
  }
1723

1724
  return ret;
1725
}
1726

1727
// This interface is unnecessary after adding htap
1728
int ObTenant::acquire_more_worker(int64_t num, int64_t &succ_num, bool force)
1729
{
1730
  int ret = OB_SUCCESS;
1731
  succ_num = 0;
1732

1733
  ObTenantSwitchGuard guard(this);
1734
  while (OB_SUCC(ret) && num > succ_num) {
1735
    ObThWorker *w = nullptr;
1736
    if (OB_FAIL(create_worker(w, this, 0, 0, force))) {
1737
      LOG_WARN("create worker failed", K(ret));
1738
    } else if (!workers_.add_last(&w->worker_node_)) {
1739
      OB_ASSERT(false);
1740
      ret = OB_ERR_UNEXPECTED;
1741
      LOG_ERROR("add worker to list fail", K(ret));
1742
    } else {
1743
      succ_num++;
1744
    }
1745
  }
1746

1747
  return ret;
1748
}
1749

1750
void ObTenant::lq_end(ObThWorker &w)
1751
{
1752
  int ret = OB_SUCCESS;
1753
  if (w.is_lq_yield()) {
1754
    if (OB_FAIL(cgroup_ctrl_.add_self_to_cgroup(id_, w.get_group_id()))) {
1755
      LOG_WARN("move thread from lq group failed", K(ret), K(id_));
1756
    } else {
1757
      w.set_lq_yield(false);
1758
    }
1759
  }
1760
}
1761

1762
void ObTenant::lq_wait(ObThWorker &w)
1763
{
1764
  int64_t last_query_us = ObTimeUtility::current_time() - w.get_last_wakeup_ts();
1765
  int64_t lq_group_worker_cnt = w.get_group()->workers_.get_size();
1766
  int64_t default_group_worker_cnt = workers_.get_size();
1767
  double large_query_percentage = GCONF.large_query_worker_percentage / 100.0;
1768
  int64_t wait_us = static_cast<int64_t>(last_query_us * lq_group_worker_cnt /
1769
                                        (default_group_worker_cnt * large_query_percentage) -
1770
                                         last_query_us);
1771
  wait_us = std::min(wait_us, min(100 * 1000, w.get_timeout_remain()));
1772
  if (wait_us > 10 * 1000) {
1773
    usleep(wait_us);
1774
    w.set_last_wakeup_ts(ObTimeUtility::current_time());
1775
  }
1776
}
1777

1778
int ObTenant::lq_yield(ObThWorker &w)
1779
{
1780
  int ret = OB_SUCCESS;
1781
  ATOMIC_INC(&tt_large_quries_);
1782
  if (!cgroup_ctrl_.is_valid()) {
1783
    if (w.get_group_id() == share::OBCG_LQ) {
1784
      lq_wait(w);
1785
    }
1786
  } else if (w.is_lq_yield()) {
1787
    // avoid duplicate change group
1788
  } else if (OB_FAIL(cgroup_ctrl_.add_self_to_cgroup(id_, OBCG_LQ))) {
1789
    LOG_WARN("move thread to lq group failed", K(ret), K(id_));
1790
  } else {
1791
    w.set_lq_yield();
1792
  }
1793
  return ret;
1794
}
1795

1796
// thread unsafe
1797
void ObTenant::update_token_usage()
1798
{
1799
  int ret = OB_SUCCESS;
1800
  const auto now = ObTimeUtility::current_time();
1801
  const auto duration = static_cast<double>(now - token_usage_check_ts_);
1802
  if (duration >= 1000 * 1000 && OB_SUCC(workers_lock_.trylock())) {  // every second
1803
    ObResourceGroupNode* iter = NULL;
1804
    ObResourceGroup* group = nullptr;
1805
    int64_t idle_us = 0;
1806
    token_usage_check_ts_ = now;
1807
    DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
1808
      const auto w = static_cast<ObThWorker*>(wnode->get_data());
1809
      idle_us += ATOMIC_SET(&w->idle_us_, 0);
1810
    }
1811
    DLIST_FOREACH_REMOVESAFE(wnode, nesting_workers_) {
1812
      const auto w = static_cast<ObThWorker*>(wnode->get_data());
1813
      idle_us += ATOMIC_SET(&w->idle_us_, 0);
1814
    }
1815
    while (OB_NOT_NULL(iter = group_map_.quick_next(iter))) {
1816
      group = static_cast<ObResourceGroup*>(iter);
1817
      DLIST_FOREACH_REMOVESAFE(wnode, group->workers_) {
1818
        const auto w = static_cast<ObThWorker*>(wnode->get_data());
1819
        idle_us += ATOMIC_SET(&w->idle_us_, 0);
1820
      }
1821
    }
1822
    workers_lock_.unlock();
1823
    const auto total_us = duration * total_worker_cnt_;
1824
    token_usage_ = std::max(.0, 1.0 * (total_us - idle_us) / total_us);
1825
    IGNORE_RETURN ATOMIC_FAA(&worker_us_, total_us - idle_us);
1826
  }
1827

1828
  if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid()) {
1829
    //do nothing
1830
  } else if (duration >= 1000 * 1000 && OB_SUCC(thread_list_lock_.trylock())) {  // every second
1831
    int64_t cpu_time_inc = 0;
1832
    DLIST_FOREACH_REMOVESAFE(thread_list_node_, thread_list_)
1833
    {
1834
      Thread *thread = thread_list_node_->get_data();
1835
      int64_t inc = 0;
1836
      if (OB_SUCC(thread->get_cpu_time_inc(inc))) {
1837
        cpu_time_inc += inc;
1838
      }
1839
    }
1840
    thread_list_lock_.unlock();
1841
    IGNORE_RETURN ATOMIC_FAA(&cpu_time_us_, cpu_time_inc);
1842
  }
1843
}
1844

1845
void ObTenant::periodically_check()
1846
{
1847
  int ret = OB_SUCCESS;
1848
  WITH_ENTITY(ctx_) {
1849
    check_parallel_servers_target();
1850
    check_resource_manager_plan();
1851
    check_dtl();
1852
    check_px_thread_recycle();
1853
  }
1854
}
1855

1856
void ObTenant::check_resource_manager_plan()
1857
{
1858
  int ret = OB_SUCCESS;
1859
  ObString plan_name;
1860
  ObResourcePlanManager &plan_mgr = G_RES_MGR.get_plan_mgr();
1861
  ObResourceMappingRuleManager &rule_mgr = G_RES_MGR.get_mapping_rule_mgr();
1862
  ObResourceColMappingRuleManager &col_rule_mgr = G_RES_MGR.get_col_mapping_rule_mgr();
1863
  char data[OB_MAX_RESOURCE_PLAN_NAME_LENGTH];
1864
  ObDataBuffer allocator(data, OB_MAX_RESOURCE_PLAN_NAME_LENGTH);
1865
  if (OB_SYS_TENANT_ID != id_ && OB_MAX_RESERVED_TENANT_ID >= id_) {
1866
    // Except for system rental outside, internal tenants do not use resource plan for internal isolation
1867
  } else if (OB_FAIL(ObSchemaUtils::get_tenant_varchar_variable(
1868
              id_,
1869
              SYS_VAR_RESOURCE_MANAGER_PLAN,
1870
              allocator,
1871
              plan_name))) {
1872
    LOG_WARN("fail get tenant variable", K(id_), K(plan_name), K(ret));
1873
    // skip
1874
  } else if (OB_FAIL(rule_mgr.refresh_group_mapping_rule(id_, plan_name))) {
1875
    LOG_WARN("refresh group id name mapping rule fail."
1876
             "Tenant resource isolation may not work",
1877
             K(id_), K(plan_name), K(ret));
1878
  } else if (OB_FAIL(plan_mgr.refresh_resource_plan(id_, plan_name))) {
1879
    LOG_WARN("refresh resource plan fail."
1880
             "Tenant resource isolation may not work",
1881
             K(id_), K(plan_name), K(ret));
1882
  } else if (OB_FAIL(rule_mgr.refresh_resource_mapping_rule(id_, plan_name))) {
1883
    LOG_WARN("refresh resource mapping rule fail."
1884
             "Tenant resource isolation may not work",
1885
             K(id_), K(plan_name), K(ret));
1886
  } else if (OB_FAIL(col_rule_mgr.refresh_resource_column_mapping_rule(id_, get<ObPlanCache*>(),
1887
                                                                       plan_name))) {
1888
    LOG_WARN("refresh resource column mapping rule fail."
1889
             "Tenant resource isolation may not work",
1890
             K(id_), K(plan_name), K(ret));
1891
  }
1892
}
1893

1894
void ObTenant::check_dtl()
1895
{
1896
  int ret = OB_SUCCESS;
1897
  if (is_virtual_tenant_id(id_)) {
1898
    // Except for system rentals, internal tenants do not allocate px threads
1899
  } else {
1900
    ObTenantSwitchGuard guard(this);
1901
    auto tenant_dfc = MTL(ObTenantDfc*);
1902
    if (OB_NOT_NULL(tenant_dfc)) {
1903
      tenant_dfc->check_dtl(id_);
1904
    } else {
1905
      ret = OB_ERR_UNEXPECTED;
1906
      LOG_WARN("failed to switch to tenant", K(id_), K(ret));
1907
    }
1908
  }
1909
}
1910

1911
void ObTenant::check_das()
1912
{
1913
  int ret = OB_SUCCESS;
1914
  if (!is_virtual_tenant_id(id_)) {
1915
    ObTenantSwitchGuard guard(this);
1916
    if (OB_ISNULL(MTL(ObDataAccessService *))) {
1917
      ret = OB_ERR_UNEXPECTED;
1918
      LOG_WARN("failed to get das ptr", K(MTL_ID()));
1919
    } else {
1920
      double min_cpu = .0;
1921
      double max_cpu = .0;
1922
      if (OB_FAIL(GCTX.omt_->get_tenant_cpu(MTL_ID(), min_cpu, max_cpu))) {
1923
        LOG_WARN("failed to set das task max concurrency", K(MTL_ID()));
1924
      } else {
1925
        MTL(ObDataAccessService *)->set_max_concurrency(min_cpu);
1926
      }
1927
    }
1928
  }
1929
}
1930

1931
void ObTenant::check_parallel_servers_target()
1932
{
1933
  int ret = OB_SUCCESS;
1934
  int64_t val = 0;
1935
  if (is_virtual_tenant_id(id_)) {
1936
    // Except for system rentals, internal tenants do not allocate px threads
1937
  } else if (OB_FAIL(ObSchemaUtils::get_tenant_int_variable(
1938
              id_,
1939
              SYS_VAR_PARALLEL_SERVERS_TARGET,
1940
              val))) {
1941
    LOG_WARN("fail read tenant variable", K_(id), K(ret));
1942
  } else if (OB_FAIL(OB_PX_TARGET_MGR.set_parallel_servers_target(id_, val))) {
1943
    LOG_WARN("set parallel_servers_target failed", K(ret), K(id_), K(val));
1944
  }
1945
}
1946

1947
void ObTenant::check_px_thread_recycle()
1948
{
1949
  int ret = OB_SUCCESS;
1950
  if (is_virtual_tenant_id(id_)) {
1951
    // Except for system rentals, internal tenants do not allocate px threads
1952
  } else {
1953
    ObTenantSwitchGuard guard(this);
1954
    auto px_pools = MTL(ObPxPools*);
1955
    if (OB_NOT_NULL(px_pools)) {
1956
      px_pools->thread_recycle();
1957
    } else {
1958
      ret = OB_ERR_UNEXPECTED;
1959
      LOG_WARN("failed to switch to tenant", K(id_), K(ret));
1960
    }
1961
  }
1962
}
1963

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.