13
#define USING_LOG_PREFIX SERVER_OMT
16
#include "share/ob_define.h"
17
#include "lib/container/ob_vector.h"
18
#include "lib/time/ob_time_utility.h"
19
#include "lib/stat/ob_diagnose_info.h"
20
#include "lib/stat/ob_session_stat.h"
21
#include "share/config/ob_server_config.h"
22
#include "sql/engine/px/ob_px_admission.h"
23
#include "share/interrupt/ob_global_interrupt_call.h"
24
#include "ob_th_worker.h"
25
#include "ob_multi_tenant.h"
26
#include "observer/ob_server_struct.h"
27
#include "share/schema/ob_schema_getter_guard.h"
28
#include "share/schema/ob_schema_struct.h"
29
#include "share/schema/ob_schema_utils.h"
30
#include "share/resource_manager/ob_resource_manager.h"
31
#include "sql/engine/px/ob_px_target_mgr.h"
32
#include "logservice/palf/palf_options.h"
33
#include "sql/dtl/ob_dtl_fc_server.h"
34
#include "observer/mysql/ob_mysql_request_manager.h"
35
#include "observer/ob_srv_deliver.h"
36
#include "observer/ob_srv_network_frame.h"
37
#include "storage/tx/wrs/ob_tenant_weak_read_service.h"
38
#include "sql/engine/ob_tenant_sql_memory_manager.h"
39
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
40
#include "lib/worker.h"
41
#include "ob_tenant_mtl_helper.h"
42
#include "storage/ob_file_system_router.h"
43
#include "storage/slog/ob_storage_logger.h"
44
#include "storage/slog/ob_storage_logger_manager.h"
45
#include "storage/ob_file_system_router.h"
46
#include "common/ob_smart_var.h"
47
#include "rpc/obmysql/ob_sql_nio_server.h"
48
#include "rpc/obrpc/ob_rpc_stat.h"
49
#include "rpc/obrpc/ob_rpc_packet.h"
50
#include "lib/container/ob_array.h"
51
#include "share/rc/ob_tenant_module_init_ctx.h"
52
#include "share/resource_manager/ob_cgroup_ctrl.h"
53
#include "sql/engine/px/ob_px_worker.h"
54
#include "lib/thread/protected_stack_allocator.h"
56
using namespace oceanbase::lib;
57
using namespace oceanbase::common;
58
using namespace oceanbase::omt;
59
using namespace oceanbase::rpc;
60
using namespace oceanbase::share;
61
using namespace oceanbase::share::schema;
62
using namespace oceanbase::storage;
63
using namespace oceanbase::sql::dtl;
64
using namespace oceanbase::obrpc;
66
#define EXPAND_INTERVAL (1 * 1000 * 1000)
67
#define SHRINK_INTERVAL (1 * 1000 * 1000)
68
#define SLEEP_INTERVAL (60 * 1000 * 1000)
70
int64_t FASTSTACK_REQ_QUEUE_SIZE_THRESHOLD = INT64_MAX;
73
int ob_pthread_create(void **ptr, void *(*start_routine) (void *), void *arg);
74
int ob_pthread_tryjoin_np(void *ptr);
76
void MultiLevelReqCnt::atomic_inc(const int32_t level)
78
if (level < 0 || level >= MAX_REQUEST_LEVEL) {
79
LOG_WARN_RET(OB_ERR_UNEXPECTED, "unexpected level", K(level));
81
ATOMIC_INC(&cnt_[level]);
85
int ObPxPools::init(uint64_t tenant_id)
87
static int PX_POOL_COUNT = 128;
89
tenant_id_ = tenant_id;
90
ObMemAttr attr(tenant_id, "PxPoolBkt");
91
if (OB_FAIL(pool_map_.create(PX_POOL_COUNT, attr, attr))) {
92
LOG_WARN("fail init pool map", K(ret));
97
int ObPxPools::get_or_create(int64_t group_id, ObPxPool *&pool)
100
if (!pool_map_.created()) {
102
} else if (OB_FAIL(pool_map_.get_refactored(group_id, pool))) {
103
if (OB_HASH_NOT_EXIST == ret) {
104
if (OB_FAIL(create_pool(group_id, pool))) {
105
LOG_WARN("fail create pool", K(ret), K(group_id));
108
LOG_WARN("fail get group id from hashmap", K(ret), K(group_id));
114
int ObPxPools::create_pool(int64_t group_id, ObPxPool *&pool)
116
static constexpr uint64_t MAX_TASKS_PER_CPU = 1;
117
int ret = OB_SUCCESS;
118
common::SpinWLockGuard g(lock_);
119
if (OB_FAIL(pool_map_.get_refactored(group_id, pool))) {
120
if (OB_HASH_NOT_EXIST == ret) {
121
pool = OB_NEW(ObPxPool, ObMemAttr(tenant_id_, "PxPool"));
122
if (OB_ISNULL(pool)) {
123
ret = common::OB_ALLOCATE_MEMORY_FAILED;
125
pool->set_tenant_id(tenant_id_);
126
pool->set_group_id(group_id);
127
pool->set_run_wrapper(MTL_CTX());
128
if (OB_FAIL(pool->start())) {
129
LOG_WARN("fail startup px pool", K(group_id), K(tenant_id_), K(ret));
130
} else if (OB_FAIL(pool_map_.set_refactored(group_id, pool))) {
131
LOG_WARN("fail set pool to hashmap", K(group_id), K(ret));
135
LOG_WARN("fail get group id from hashmap", K(ret), K(group_id));
141
int ObPxPools::thread_recycle()
143
int ret = OB_SUCCESS;
144
common::SpinWLockGuard g(lock_);
145
ThreadRecyclePoolFunc recycle_pool_func;
146
if (OB_FAIL(pool_map_.foreach_refactored(recycle_pool_func))) {
147
LOG_WARN("failed to do foreach", K(ret));
152
int ObPxPools::ThreadRecyclePoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv)
154
int ret = OB_SUCCESS;
155
int64_t &group_id = kv.first;
156
ObPxPool *pool = kv.second;
158
LOG_WARN("pool is null", K(group_id));
160
IGNORE_RETURN pool->thread_recycle();
165
int ObPxPools::StopPoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv)
167
int ret = OB_SUCCESS;
168
int64_t &group_id = kv.first;
169
ObPxPool *pool = kv.second;
171
LOG_WARN("pool is null", K(group_id));
174
LOG_INFO("DEL_POOL_STEP_1: mark px pool stop succ!", K(group_id));
179
int ObPxPools::DeletePoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv)
181
int ret = OB_SUCCESS;
182
int64_t &group_id = kv.first;
183
ObPxPool *pool = kv.second;
185
LOG_WARN("pool is null", K(group_id));
188
LOG_INFO("DEL_POOL_STEP_2: wait pool empty succ!", K(group_id));
190
LOG_INFO("DEL_POOL_STEP_3: pool destroy succ!", K(group_id), K(pool->get_queue_size()));
191
common::ob_delete(pool);
196
void ObPxPools::mtl_stop(ObPxPools *&pools)
198
int ret = OB_SUCCESS;
199
if (OB_ISNULL(pools)) {
201
LOG_WARN("pools is null");
203
common::SpinWLockGuard g(pools->lock_);
204
StopPoolFunc stop_pool_func;
205
if (OB_FAIL(pools->pool_map_.foreach_refactored(stop_pool_func))) {
206
LOG_WARN("failed to do foreach", K(ret));
211
void ObPxPools::destroy()
213
int ret = OB_SUCCESS;
214
common::SpinWLockGuard g(lock_);
215
DeletePoolFunc free_pool_func;
216
if (OB_FAIL(pool_map_.foreach_refactored(free_pool_func))) {
217
LOG_WARN("failed to do foreach", K(ret));
220
tenant_id_ = OB_INVALID_ID;
224
int ObPxPool::submit(const RunFuncT &func)
226
int ret = OB_SUCCESS;
228
queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
232
ATOMIC_INC(&concurrency_);
233
if (ATOMIC_LOAD(&active_threads_) < ATOMIC_LOAD(&concurrency_)) {
234
ret = OB_SIZE_OVERFLOW;
236
Task *t = OB_NEW(Task, ObMemAttr(tenant_id_, "PxTask"), func);
238
ret = OB_ALLOCATE_MEMORY_FAILED;
239
} else if (OB_FAIL(queue_.push(static_cast<ObLink*>(t), 0))) {
240
LOG_ERROR("px push queue failed", K(ret));
243
if (ret != OB_SUCCESS) {
244
ATOMIC_DEC(&concurrency_);
250
void ObPxPool::handle(ObLink *task)
252
Task *t = static_cast<Task*>(task);
254
LOG_ERROR_RET(OB_INVALID_ARGUMENT, "px task is invalid");
256
bool need_exec = true;
258
OB_DELETE(Task, "PxTask", t);
260
ATOMIC_DEC(&concurrency_);
263
void ObPxPool::set_px_thread_name()
266
snprintf(buf, 32, "PX_G%ld", group_id_);
267
ob_get_tenant_id() = tenant_id_;
268
lib::set_thread_name(buf);
271
void ObPxPool::run(int64_t idx)
273
ATOMIC_INC(&active_threads_);
277
Worker::set_worker_to_thread_local(&worker);
284
int ret = OB_SUCCESS;
285
set_px_thread_name();
286
ObTLTaGuard ta_guard(tenant_id_);
287
auto *pm = common::ObPageManager::thread_local_instance();
288
if (OB_LIKELY(nullptr != pm)) {
289
pm->set_tenant_ctx(tenant_id_, common::ObCtxIds::DEFAULT_CTX_ID);
292
CLEAR_INTERRUPTABLE();
293
ObCgroupCtrl *cgroup_ctrl = GCTX.cgroup_ctrl_;
294
LOG_INFO("run px pool", K(group_id_), K(tenant_id_), K_(active_threads));
295
if (nullptr != cgroup_ctrl && OB_LIKELY(cgroup_ctrl->is_valid())) {
296
cgroup_ctrl->add_self_to_cgroup(tenant_id_, group_id_);
297
LOG_INFO("add thread to group succ", K(tenant_id_), K(group_id_));
301
queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
305
ObLink *task = nullptr;
306
int64_t idle_time = 0;
307
while (!Thread::current().has_set_stop()) {
309
ob_usleep(10 * 1000L);
311
if (OB_SUCC(queue_.pop(task, QUEUE_WAIT_TIME))) {
315
idle_time += QUEUE_WAIT_TIME;
317
try_recycle(idle_time);
323
void ObPxPool::try_recycle(int64_t idle_time)
331
if ((idle_time > 10LL * 60 * 1000 * 1000 && get_thread_count() >= N)
332
|| idle_time > 60LL * 60 * 1000 * 1000) {
333
if (OB_SUCCESS == recycle_lock_.trylock()) {
334
if (ATOMIC_LOAD(&active_threads_) > ATOMIC_LOAD(&concurrency_)) {
335
ATOMIC_DEC(&active_threads_);
338
Thread::current().stop();
340
recycle_lock_.unlock();
347
int ret = OB_SUCCESS;
349
ObLink *task = nullptr;
350
bool need_exec = false;
351
while (OB_SUCC(queue_.pop(task, QUEUE_WAIT_TIME))) {
352
Task *t = static_cast<Task*>(task);
353
if (OB_NOT_NULL(t)) {
355
OB_DELETE(Task, "PxTask", t);
357
ATOMIC_DEC(&concurrency_);
361
ObResourceGroup::ObResourceGroup(int32_t group_id, ObTenant* tenant, share::ObCgroupCtrl *cgroup_ctrl):
362
ObResourceGroupNode(group_id),
363
workers_lock_(tenant->workers_lock_),
368
nesting_worker_cnt_(0),
370
cgroup_ctrl_(cgroup_ctrl)
374
int ObResourceGroup::init()
376
int ret = OB_SUCCESS;
377
if (nullptr == tenant_) {
378
ret = OB_ERR_UNEXPECTED;
379
LOG_ERROR("group init failed");
380
} else if (FALSE_IT(multi_level_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size))) {
381
LOG_WARN("multi level queue set limit failed", K(ret), K(tenant_->id()), K(group_id_), K(*this));
383
req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
389
void ObResourceGroup::update_queue_size()
391
req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
394
int ObResourceGroup::acquire_level_worker(int32_t level)
396
int ret = OB_SUCCESS;
397
ObTenantSwitchGuard guard(tenant_);
399
if (level <= 0 || level > MAX_REQUEST_LEVEL) {
400
ret = OB_ERR_UNEXPECTED;
401
LOG_ERROR("unexpected level", K(level), K(tenant_->id()));
403
ObThWorker *w = nullptr;
404
if (OB_FAIL(create_worker(w, tenant_, group_id_, level, true , this))) {
405
LOG_WARN("create worker failed", K(ret));
406
} else if (!nesting_workers_.add_last(&w->worker_node_)) {
408
ret = OB_ERR_UNEXPECTED;
409
LOG_ERROR("add worker to list fail", K(ret));
416
int ObResourceGroup::acquire_more_worker(int64_t num, int64_t &succ_num, bool force)
418
int ret = OB_SUCCESS;
419
ObTenantSwitchGuard guard(tenant_);
421
const auto need_num = num;
424
while (OB_SUCC(ret) && need_num > succ_num) {
425
ObThWorker *w = nullptr;
426
if (OB_FAIL(create_worker(w, tenant_, group_id_, INT32_MAX, force, this))) {
427
LOG_WARN("create worker failed", K(ret));
428
} else if (!workers_.add_last(&w->worker_node_)) {
430
ret = OB_ERR_UNEXPECTED;
431
LOG_ERROR("add worker to list fail", K(ret));
437
if (need_num != num ||
440
if (TC_REACH_TIME_INTERVAL(10000000)) {
441
LOG_WARN("Alloc group worker less than lack", K(num), K(need_num), K(succ_num));
448
void ObResourceGroup::check_worker_count()
450
int ret = OB_SUCCESS;
451
if (OB_SUCC(workers_lock_.trylock())) {
452
if (is_user_group(group_id_)
453
&& nesting_worker_cnt_ < (MAX_REQUEST_LEVEL - GROUP_MULTI_LEVEL_THRESHOLD)) {
454
for (int level = GROUP_MULTI_LEVEL_THRESHOLD + nesting_worker_cnt_; OB_SUCC(ret) && level < MAX_REQUEST_LEVEL; level++) {
455
if (OB_SUCC(acquire_level_worker(level))) {
456
nesting_worker_cnt_ = nesting_worker_cnt_ + 1;
460
int64_t now = ObTimeUtility::current_time();
461
bool enable_dynamic_worker = true;
462
int64_t threshold = 3 * 1000;
464
ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_->id()));
465
enable_dynamic_worker = tenant_config.is_valid() ? tenant_config->_ob_enable_dynamic_worker : true;
466
threshold = tenant_config.is_valid() ? tenant_config->_stall_threshold_for_dynamic_worker : 3 * 1000;
468
int64_t blocking_cnt = 0;
469
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
470
const auto w = static_cast<ObThWorker*>(wnode->get_data());
471
if (w->has_set_stop()) {
472
workers_.remove(wnode);
474
} else if (w->has_req_flag()
475
&& 0 != w->blocking_ts()
476
&& now - w->blocking_ts() >= threshold
477
&& enable_dynamic_worker) {
482
int64_t target_min = 0;
484
bool is_group_critical = share::ObCgSet::instance().is_group_critical(group_id_);
485
if (is_group_critical) {
486
target_min = min_worker_cnt();
487
token = 1 + blocking_cnt;
488
token = std::min(token, max_worker_cnt());
489
token = std::max(token, target_min);
491
target_min = std::min(req_queue_.size(), min_worker_cnt());
492
if (blocking_cnt == 0 && req_queue_.size() == 0) {
495
token = 1 + blocking_cnt;
496
token = std::min(token, max_worker_cnt());
500
int64_t succ_num = 0L;
502
(!is_group_critical && workers_.get_size() == 1 && token == 0) ? SLEEP_INTERVAL : SHRINK_INTERVAL;
503
if (OB_UNLIKELY(workers_.get_size() < target_min)) {
504
const int64_t diff = target_min - workers_.get_size();
505
token_change_ts_ = now;
506
ATOMIC_STORE(&shrink_, false);
507
acquire_more_worker(diff, succ_num, true);
508
LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token));
509
} else if (OB_UNLIKELY(workers_.get_size() < token) &&
510
OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) >
511
ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05)) {
512
ATOMIC_STORE(&shrink_, false);
513
if (OB_LIKELY(now - token_change_ts_ >= EXPAND_INTERVAL)) {
514
token_change_ts_ = now;
515
acquire_more_worker(1, succ_num);
516
LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token));
518
} else if (OB_UNLIKELY(workers_.get_size() > token) && OB_LIKELY(now - token_change_ts_ >= shrink_ts)) {
519
token_change_ts_ = now;
520
ATOMIC_STORE(&shrink_, true);
521
LOG_INFO("worker thread began to shrink", K(tenant_->id()), K(group_id_), K(token));
523
IGNORE_RETURN workers_lock_.unlock();
527
void ObResourceGroup::check_worker_count(ObThWorker &w)
529
int ret = OB_SUCCESS;
530
if (OB_UNLIKELY(ATOMIC_LOAD(&shrink_))
531
&& OB_LIKELY(ATOMIC_BCAS(&shrink_, true, false))) {
533
if (cgroup_ctrl_->is_valid() && OB_FAIL(cgroup_ctrl_->remove_self_from_cgroup(tenant_->id()))) {
534
LOG_WARN("remove thread from cgroup failed", K(ret), "tenant:", tenant_->id(), K_(group_id));
536
LOG_INFO("worker thread exit", K(tenant_->id()), K(workers_.get_size()));
540
int ObResourceGroup::clear_worker()
542
int ret = OB_SUCCESS;
543
ObMutexGuard guard(workers_lock_);
544
while (req_queue_.size() > 0
545
|| (multi_level_queue_.get_total_size() > 0)) {
546
ob_usleep(10L * 1000L);
548
while (nesting_workers_.get_size() > 0) {
549
int ret = OB_SUCCESS;
550
DLIST_FOREACH_REMOVESAFE(wnode, nesting_workers_) {
551
ObThWorker *w = static_cast<ObThWorker*>(wnode->get_data());
552
nesting_workers_.remove(wnode);
555
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
557
"Tenant has some group nesting workers need stop",
559
"group nesting workers", nesting_workers_.get_size(),
560
"group id", get_group_id());
563
while (workers_.get_size() > 0) {
564
int ret = OB_SUCCESS;
565
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
566
const auto w = static_cast<ObThWorker*>(wnode->get_data());
567
workers_.remove(wnode);
570
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
572
"Tenant has some group workers need stop",
574
"group workers", workers_.get_size(),
575
"group id", get_group_id());
577
ob_usleep(10L * 1000L);
582
int GroupMap::create_and_insert_group(int32_t group_id, ObTenant *tenant, ObCgroupCtrl *cgroup_ctrl, ObResourceGroup *&group)
584
int ret = OB_SUCCESS;
585
if (nullptr == tenant
586
|| nullptr == cgroup_ctrl) {
587
ret = OB_INVALID_ARGUMENT;
589
const int64_t alloc_size = sizeof(ObResourceGroup);
590
ObResourceGroup *buf = nullptr;
591
if (nullptr == (buf = (ObResourceGroup*)ob_malloc(alloc_size, ObMemAttr(tenant->id(), "ResourceGroup")))) {
592
ret = OB_ALLOCATE_MEMORY_FAILED;
594
group = new(buf)ObResourceGroup(group_id, tenant, cgroup_ctrl);
595
if (OB_FAIL(group->init())) {
596
LOG_ERROR("group init failed", K(ret), K(group_id));
597
} else if (OB_FAIL(err_code_map(insert(group)))) {
598
LOG_WARN("groupmap insert group failed", K(ret), K(group->get_group_id()), K(tenant->id()));
600
if (OB_SUCCESS != ret) {
601
group->~ObResourceGroup();
604
group->check_worker_count();
611
void GroupMap::wait_group()
613
int ret = OB_SUCCESS;
614
ObResourceGroupNode* iter = NULL;
615
while (nullptr != (iter = quick_next(iter))) {
616
ObResourceGroup *group = static_cast<ObResourceGroup*>(iter);
617
if (OB_FAIL(group->clear_worker())) {
618
LOG_ERROR("group clear worker failed", K(ret));
623
void GroupMap::destroy_group()
625
int ret = OB_SUCCESS;
626
ObResourceGroupNode* iter = NULL;
627
while (nullptr != (iter = quick_next(iter))) {
628
ObResourceGroup *group = static_cast<ObResourceGroup*>(iter);
629
if (OB_SUCC(err_code_map(del(iter, iter)))) {
630
group->~ObResourceGroup();
634
LOG_ERROR("drop group failed", K(ret));
639
int GroupMap::err_code_map(int err)
641
int ret = OB_SUCCESS;
643
case 0: ret = OB_SUCCESS; break;
644
case -ENOENT: ret = OB_ENTRY_NOT_EXIST; break;
645
case -EAGAIN: ret = OB_EAGAIN; break;
646
case -ENOMEM: ret = OB_ALLOCATE_MEMORY_FAILED; break;
647
case -EEXIST: ret = OB_ENTRY_EXIST; break;
648
case -EOVERFLOW: ret = OB_SIZE_OVERFLOW; break;
649
default: ret = OB_ERROR;
654
int64_t RpcStatInfo::to_string(char *buf, const int64_t len) const
657
int ret = OB_SUCCESS;
659
obrpc::ObRpcPacketCode pcode_;
661
bool operator <(const PcodeDcount &other) const { return dcount_ > other.dcount_; }
662
int64_t to_string(char* buf, const int64_t len) const { UNUSED(buf); UNUSED(len); return 0L; }
664
SMART_VAR(ObArray<PcodeDcount>, pd_array) {
665
ObRpcPacketSet &set = ObRpcPacketSet::instance();
666
for (int64_t pcode_idx = 0; (OB_SUCCESS == ret) && (pcode_idx < ObRpcPacketSet::THE_PCODE_COUNT); pcode_idx++) {
669
if (OB_FAIL(rpc_stat_srv_.get(pcode_idx, item))) {
671
} else if (item.dcount_ != 0) {
672
pd_item.pcode_ = set.pcode_of_idx(pcode_idx);
673
pd_item.dcount_ = item.dcount_;
674
if (OB_FAIL(pd_array.push_back(pd_item))) {
679
if (OB_SUCC(ret) && pd_array.size() > 0) {
680
std::make_heap(pd_array.begin(), pd_array.end());
681
std::sort_heap(pd_array.begin(), pd_array.end());
682
for (int i = 0; i < min(5, pd_array.size()); i++) {
683
databuff_printf(buf, len, pos, " pcode=0x%x:cnt=%ld",
684
pd_array.at(i).pcode_, pd_array.at(i).dcount_);
688
for (int64_t pcode_idx = 0; pcode_idx < ObRpcPacketSet::THE_PCODE_COUNT; pcode_idx++) {
690
piece.reset_dcount_ = true;
691
rpc_stat_srv_.add(pcode_idx, piece);
697
ObTenant::ObTenant(const int64_t id,
698
const int64_t times_of_workers,
699
ObCgroupCtrl &cgroup_ctrl)
700
: ObTenantBase(id, true),
704
total_worker_cnt_(0),
708
wait_mtl_finished_(false),
710
multi_level_queue_(nullptr),
716
recv_sql_task_cnt_(0),
717
recv_large_req_cnt_(0),
720
recv_retry_on_lock_rpc_cnt_(0),
721
recv_retry_on_lock_mysql_cnt_(0),
724
group_map_(group_map_buf_, sizeof(group_map_buf_)),
726
rpc_stat_info_(nullptr),
727
mtl_init_ctx_(nullptr),
728
workers_lock_(common::ObLatchIds::TENANT_WORKER_LOCK),
729
cgroup_ctrl_(cgroup_ctrl),
730
disable_user_sched_(false),
732
token_usage_check_ts_(0),
740
token_usage_check_ts_ = ObTimeUtility::current_time();
741
lock_.set_diagnose(true);
744
ObTenant::~ObTenant() {}
746
int ObTenant::init_ctx()
748
int ret = OB_SUCCESS;
749
if (OB_FAIL(CREATE_ENTITY(ctx_, this))) {
750
LOG_WARN("create tenant ctx failed", K(ret));
751
} else if (OB_ISNULL(ctx_)) {
752
ret = OB_ERR_UNEXPECTED;
753
LOG_WARN("NULL ptr", K(ret));
758
int ObTenant::init(const ObTenantMeta &meta)
760
int ret = OB_SUCCESS;
762
if (OB_FAIL(ObTenantBase::init(&cgroup_ctrl_))) {
763
LOG_WARN("fail to init tenant base", K(ret));
764
} else if (FALSE_IT(req_queue_.set_limit(GCONF.tenant_task_queue_size))) {
765
} else if (OB_ISNULL(multi_level_queue_ = OB_NEW(ObMultiLevelQueue, ObMemAttr(id_, "MulLevelQueue")))) {
766
ret = OB_ALLOCATE_MEMORY_FAILED;
767
LOG_WARN("alloc ObMultiLevelQueue failed", K(ret), K(*this));
768
} else if (FALSE_IT(multi_level_queue_->set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size))) {
769
} else if (OB_ISNULL(rpc_stat_info_ = OB_NEW(RpcStatInfo, ObMemAttr(id_, "RpcStatInfo"), id_))) {
770
ret = OB_ALLOCATE_MEMORY_FAILED;
771
LOG_WARN("alloc RpcStatInfo failed", K(ret), K(*this));
772
} else if (OB_FAIL(construct_mtl_init_ctx(meta, mtl_init_ctx_))) {
773
LOG_WARN("construct_mtl_init_ctx failed", KR(ret), K(*this));
775
ObTenantBase::mtl_init_ctx_ = mtl_init_ctx_;
777
set_unit_min_cpu(meta.unit_.config_.min_cpu());
778
set_unit_max_cpu(meta.unit_.config_.max_cpu());
779
const int64_t memory_size = static_cast<double>(tenant_meta_.unit_.config_.memory_size());
780
set_unit_memory_size(memory_size);
781
constexpr static int64_t MINI_MEM_UPPER = 1L<<30;
782
update_mini_mode(memory_size <= MINI_MEM_UPPER);
784
if (!is_virtual_tenant_id(id_)) {
785
if (OB_FAIL(create_tenant_module())) {
787
} else if (OB_FAIL(OB_PX_TARGET_MGR.add_tenant(id_))) {
788
LOG_WARN("add tenant into px target mgr failed", K(ret), K(id_));
789
} else if (OB_FAIL(G_RES_MGR.get_col_mapping_rule_mgr().add_tenant(id_))) {
790
LOG_WARN("add tenant into res col maping rule mgr failed", K(ret), K(id_));
793
disable_user_sched();
798
int64_t succ_cnt = 0L;
799
if (OB_FAIL(acquire_more_worker(2, succ_cnt))) {
800
LOG_WARN("create worker in init failed", K(ret), K(succ_cnt));
803
static_cast<ObThWorker*>(workers_.get_first()->get_data())->set_priority_limit(QQ_HIGH);
804
static_cast<ObThWorker*>(workers_.get_last()->get_data())->set_priority_limit(QQ_NORMAL);
805
for (int level = MULTI_LEVEL_THRESHOLD; OB_SUCC(ret) && level < MAX_REQUEST_LEVEL; level++) {
806
if (OB_SUCC(acquire_level_worker(1, succ_cnt, level))) {
815
LOG_ERROR("fail to create tenant module", K(ret));
823
int ObTenant::construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantModuleInitCtx *&ctx)
825
int ret = OB_SUCCESS;
826
if (OB_ISNULL(ctx = OB_NEW(share::ObTenantModuleInitCtx, ObMemAttr(id_, "ModuleInitCtx")))) {
827
ret = OB_ALLOCATE_MEMORY_FAILED;
828
LOG_WARN("alloc ObTenantModuleInitCtx failed", K(ret));
829
} else if (OB_FAIL(OB_FILE_SYSTEM_ROUTER.get_tenant_clog_dir(id_, mtl_init_ctx_->tenant_clog_dir_))) {
830
LOG_ERROR("get_tenant_clog_dir failed", K(ret));
832
mtl_init_ctx_->palf_options_.disk_options_.log_disk_usage_limit_size_ = meta.unit_.config_.log_disk_size();
833
mtl_init_ctx_->palf_options_.disk_options_.log_disk_utilization_threshold_ = 80;
834
mtl_init_ctx_->palf_options_.disk_options_.log_disk_utilization_limit_threshold_ = 95;
835
mtl_init_ctx_->palf_options_.disk_options_.log_disk_throttling_percentage_ = 100;
836
mtl_init_ctx_->palf_options_.disk_options_.log_disk_throttling_maximum_duration_ = 2 * 60 * 60 * 1000 * 1000L;
837
mtl_init_ctx_->palf_options_.disk_options_.log_writer_parallelism_ = 3;
838
ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
839
if (OB_UNLIKELY(!tenant_config.is_valid())) {
840
ret = is_virtual_tenant_id(id_) ? OB_SUCCESS : OB_ENTRY_NOT_EXIST;
842
mtl_init_ctx_->palf_options_.disk_options_.log_writer_parallelism_ = tenant_config->_log_writer_parallelism;
844
LOG_INFO("construct_mtl_init_ctx success", "palf_options", mtl_init_ctx_->palf_options_.disk_options_);
848
bool ObTenant::is_hidden()
850
TCRLockGuard guard(meta_lock_);
851
return tenant_meta_.super_block_.is_hidden_;
854
ObTenantCreateStatus ObTenant::get_create_status()
856
TCRLockGuard guard(meta_lock_);
857
return tenant_meta_.create_status_;
859
void ObTenant::set_create_status(const ObTenantCreateStatus status)
861
TCWLockGuard guard(meta_lock_);
862
LOG_INFO("set create status",
864
"unit_id", tenant_meta_.unit_.unit_id_,
865
"new_status", status,
866
"old_status", tenant_meta_.create_status_,
868
tenant_meta_.create_status_ = status;
871
ObTenantMeta ObTenant::get_tenant_meta()
873
TCRLockGuard guard(meta_lock_);
877
ObUnitInfoGetter::ObTenantConfig ObTenant::get_unit()
879
TCRLockGuard guard(meta_lock_);
880
return tenant_meta_.unit_;
883
uint64_t ObTenant::get_unit_id()
885
TCRLockGuard guard(meta_lock_);
886
return tenant_meta_.unit_.unit_id_;
889
ObTenantSuperBlock ObTenant::get_super_block()
891
TCRLockGuard guard(meta_lock_);
892
return tenant_meta_.super_block_;
895
void ObTenant::set_tenant_unit(const ObUnitInfoGetter::ObTenantConfig &unit)
897
TCWLockGuard guard(meta_lock_);
898
tenant_meta_.unit_ = unit;
901
void ObTenant::set_tenant_super_block(const ObTenantSuperBlock &super_block)
903
TCWLockGuard guard(meta_lock_);
904
tenant_meta_.super_block_ = super_block;
907
Worker::CompatMode ObTenant::get_compat_mode() const
909
TCRLockGuard guard(meta_lock_);
910
return tenant_meta_.unit_.mode_;
913
void ObTenant::set_unit_status(const ObUnitInfoGetter::ObUnitStatus status)
915
TCWLockGuard guard(meta_lock_);
916
LOG_INFO("set unit status",
918
"unit_id", tenant_meta_.unit_.unit_id_,
919
"new_status", ObUnitInfoGetter::get_unit_status_str(status),
920
"old_status", ObUnitInfoGetter::get_unit_status_str(tenant_meta_.unit_.unit_status_),
922
tenant_meta_.unit_.unit_status_ = status;
925
ObUnitInfoGetter::ObUnitStatus ObTenant::get_unit_status()
927
TCRLockGuard guard(meta_lock_);
928
return tenant_meta_.unit_.unit_status_;
931
void ObTenant::mark_tenant_is_removed()
933
TCWLockGuard guard(meta_lock_);
934
LOG_INFO("mark tenant is removed",
936
"unit_id", tenant_meta_.unit_.unit_id_,
938
tenant_meta_.unit_.is_removed_ = true;
941
ERRSIM_POINT_DEF(CREATE_MTL_MODULE_FAIL)
943
int ObTenant::create_tenant_module()
945
int ret = OB_SUCCESS;
946
const uint64_t &tenant_id = id_;
947
const double max_cpu = static_cast<double>(tenant_meta_.unit_.config_.max_cpu());
949
ObTenantSwitchGuard guard(this);
951
FLOG_INFO("begin create mtl module>>>>", K(tenant_id), K(MTL_ID()));
953
bool mtl_init = false;
954
if (OB_FAIL(ObTenantBase::create_mtl_module())) {
955
LOG_ERROR("create mtl module failed", K(tenant_id), K(ret));
956
} else if (CREATE_MTL_MODULE_FAIL) {
957
ret = CREATE_MTL_MODULE_FAIL;
958
LOG_ERROR("create_tenant_module failed because of tracepoint CREATE_MTL_MODULE_FAIL",
959
K(tenant_id), K(ret));
960
} else if (FALSE_IT(ObTenantEnv::set_tenant(this))) {
964
} else if (FALSE_IT(mtl_init = true)) {
965
} else if (OB_FAIL(ObTenantBase::init_mtl_module())) {
966
LOG_ERROR("init mtl module failed", K(tenant_id), K(ret));
967
} else if (OB_FAIL(ObTenantBase::start_mtl_module())) {
968
LOG_ERROR("start mtl module failed", K(tenant_id), K(ret));
969
} else if (OB_FAIL(update_thread_cnt(max_cpu))) {
970
LOG_ERROR("update mtl module thread cnt fail", K(tenant_id), K(ret));
974
FLOG_INFO("finish create mtl module>>>>", K(tenant_id), K(MTL_ID()), K(ret));
978
ObTenantBase::stop_mtl_module();
979
ObTenantBase::wait_mtl_module();
981
ObTenantBase::destroy_mtl_module();
987
void ObTenant::sleep_and_warn(ObTenant* tenant)
990
const int64_t ts = ObTimeUtility::current_time() - tenant->stopped_;
991
if (ts >= 3L * 60 * 1000 * 1000 && TC_REACH_TIME_INTERVAL(3L * 60 * 1000 * 1000)) {
992
LOG_ERROR_RET(OB_SUCCESS, "tenant destructed for too long time.", K_(tenant->id), K(ts));
996
void* ObTenant::wait(void* t)
998
int ret = OB_SUCCESS;
999
ObTenant* tenant = (ObTenant*)t;
1000
ob_get_tenant_id() = tenant->id_;
1001
lib::set_thread_name("UnitGC");
1002
lib::Thread::update_loop_ts();
1003
tenant->handle_retry_req(true);
1004
while (tenant->req_queue_.size() > 0
1005
|| (tenant->multi_level_queue_ != nullptr && tenant->multi_level_queue_->get_total_size() > 0)) {
1006
sleep_and_warn(tenant);
1008
while (tenant->workers_.get_size() > 0) {
1009
if (OB_SUCC(tenant->workers_lock_.trylock())) {
1010
DLIST_FOREACH_REMOVESAFE(wnode, tenant->workers_) {
1011
const auto w = static_cast<ObThWorker*>(wnode->get_data());
1012
tenant->workers_.remove(wnode);
1015
IGNORE_RETURN tenant->workers_lock_.unlock();
1016
if (REACH_TIME_INTERVAL(10_s)) {
1018
"Tenant has some workers need stop", K_(tenant->id),
1019
"workers", tenant->workers_.get_size(),
1020
K_(tenant->req_queue));
1023
sleep_and_warn(tenant);
1025
LOG_INFO("start remove nesting", K(tenant->nesting_workers_.get_size()), K_(tenant->id));
1026
while (tenant->nesting_workers_.get_size() > 0) {
1027
int ret = OB_SUCCESS;
1028
if (OB_SUCC(tenant->workers_lock_.trylock())) {
1029
DLIST_FOREACH_REMOVESAFE(wnode, tenant->nesting_workers_) {
1030
auto w = static_cast<ObThWorker*>(wnode->get_data());
1031
tenant->nesting_workers_.remove(wnode);
1034
IGNORE_RETURN tenant->workers_lock_.unlock();
1035
if (REACH_TIME_INTERVAL(10_s)) {
1037
"Tenant has some nesting workers need stop",
1039
"nesting workers", tenant->nesting_workers_.get_size(),
1040
K_(tenant->req_queue));
1043
sleep_and_warn(tenant);
1045
LOG_INFO("finish remove nesting", K(tenant->nesting_workers_.get_size()), K_(tenant->id));
1046
LOG_INFO("start remove group_map", K_(tenant->id));
1047
tenant->group_map_.wait_group();
1048
LOG_INFO("finish remove group_map", K_(tenant->id));
1049
if (!is_virtual_tenant_id(tenant->id_) && !tenant->wait_mtl_finished_) {
1050
ObTenantSwitchGuard guard(tenant);
1051
tenant->stop_mtl_module();
1052
OB_PX_TARGET_MGR.delete_tenant(tenant->id_);
1053
G_RES_MGR.get_col_mapping_rule_mgr().drop_tenant(tenant->id_);
1054
tenant->wait_mtl_module();
1055
tenant->wait_mtl_finished_ = true;
1057
LOG_INFO("finish waiting", K_(tenant->id));
1061
int ObTenant::try_wait()
1063
int ret = OB_SUCCESS;
1064
if (OB_ISNULL(ATOMIC_LOAD(&gc_thread_))) {
1065
if (!ATOMIC_BCAS(&has_created_, false, true)) {
1069
LOG_WARN("try_wait again after wait successfully, there may be `kill -15` or failure of locking", K(id_), K(wait_mtl_finished_));
1072
ATOMIC_STORE(&stopped_, ObTimeUtility::current_time());
1073
if (OB_FAIL(ob_pthread_create(&gc_thread_, wait, this))) {
1074
ATOMIC_STORE(&has_created_, false);
1075
LOG_ERROR("tenant gc thread create failed", K(ret), K(errno), K(id_));
1078
LOG_INFO("tenant pthread_create gc thread successfully", K(id_), K(gc_thread_));
1082
if (OB_FAIL(ob_pthread_tryjoin_np(gc_thread_))) {
1083
LOG_WARN("tenant pthread_tryjoin_np failed", K(errno), K(id_));
1085
ATOMIC_STORE(&gc_thread_, nullptr);
1086
LOG_INFO("tenant pthread_tryjoin_np successfully", K(id_));
1088
const int64_t ts = ObTimeUtility::current_time() - stopped_;
1090
if (ts >= 3L * 60 * 1000 * 1000 && REACH_TIME_INTERVAL(3L * 60 * 1000 * 1000)) {
1091
LOG_ERROR_RET(OB_SUCCESS, "tenant destructed for too long time.", K_(id), K(ts));
1097
void ObTenant::destroy()
1099
int tmp_ret = OB_SUCCESS;
1100
if (ctx_ != nullptr) {
1101
DESTROY_ENTITY(ctx_);
1104
if (cgroup_ctrl_.is_valid()
1105
&& OB_SUCCESS != (tmp_ret = cgroup_ctrl_.remove_tenant_cgroup(id_))) {
1106
LOG_WARN_RET(tmp_ret, "remove tenant cgroup failed", K(tmp_ret), K_(id));
1108
group_map_.destroy_group();
1109
ObTenantSwitchGuard guard(this);
1110
destroy_mtl_module();
1115
if (OB_TMP_FAIL(FILE_MANAGER_INSTANCE_V2.remove_tenant_file(id_))) {
1116
if (OB_ENTRY_NOT_EXIST == tmp_ret) {
1117
tmp_ret = OB_SUCCESS;
1119
LOG_WARN_RET(tmp_ret, "fail to free tmp tenant file store", K(ret), K_(id));
1122
ObTenantBase::destroy();
1124
if (nullptr != multi_level_queue_) {
1125
common::ob_delete(multi_level_queue_);
1126
multi_level_queue_ = nullptr;
1128
if (nullptr != rpc_stat_info_) {
1129
common::ob_delete(rpc_stat_info_);
1130
rpc_stat_info_ = nullptr;
1132
if (nullptr != mtl_init_ctx_) {
1133
common::ob_delete(mtl_init_ctx_);
1134
mtl_init_ctx_ = nullptr;
1138
void ObTenant::set_unit_max_cpu(double cpu)
1140
int tmp_ret = OB_SUCCESS;
1141
unit_max_cpu_ = cpu;
1142
int32_t cfs_period_us = 0;
1143
int32_t cfs_period_us_new = 0;
1144
if (!cgroup_ctrl_.is_valid() || is_meta_tenant(id_)) {
1146
} else if (is_sys_tenant(id_)) {
1147
int32_t sys_cfs_quota_us = -1;
1148
if (OB_TMP_FAIL(cgroup_ctrl_.set_cpu_cfs_quota(sys_cfs_quota_us, id_))) {
1149
LOG_WARN_RET(tmp_ret, "set sys tennat cpu cfs quota failed", K(tmp_ret), K_(id), K(sys_cfs_quota_us));
1151
} else if (OB_TMP_FAIL(cgroup_ctrl_.get_cpu_cfs_period(cfs_period_us_new, id_, INT64_MAX))) {
1152
LOG_WARN_RET(tmp_ret, "fail get cpu cfs period", K_(id));
1154
uint32_t loop_times = 0;
1157
while (OB_SUCCESS == tmp_ret && cfs_period_us_new != cfs_period_us) {
1158
cfs_period_us = cfs_period_us_new;
1159
int32_t cfs_quota_us = static_cast<int32_t>(cfs_period_us * cpu);
1160
if (OB_TMP_FAIL(cgroup_ctrl_.set_cpu_cfs_quota(cfs_quota_us, id_))) {
1161
LOG_WARN_RET(tmp_ret, "set cpu cfs quota failed", K_(id), K(cfs_quota_us));
1162
} else if (OB_TMP_FAIL(cgroup_ctrl_.get_cpu_cfs_period(cfs_period_us_new, id_, INT64_MAX))) {
1163
LOG_ERROR_RET(tmp_ret, "fail get cpu cfs period", K_(id));
1166
if (loop_times > 3) {
1167
tmp_ret = OB_ERR_UNEXPECTED;
1168
LOG_ERROR_RET(tmp_ret, "cpu_cfs_period has been always changing, thread may be hung", K_(id), K(cfs_period_us), K(cfs_period_us_new), K(cfs_quota_us));
1175
void ObTenant::set_unit_min_cpu(double cpu)
1177
int tmp_ret = OB_SUCCESS;
1178
unit_min_cpu_ = cpu;
1179
const double default_cpu_shares = 1024.0;
1180
int32_t cpu_shares = static_cast<int32_t>(default_cpu_shares * cpu);
1181
if (cgroup_ctrl_.is_valid()
1182
&& OB_SUCCESS != (tmp_ret = cgroup_ctrl_.set_cpu_shares(cpu_shares, id_))) {
1183
LOG_WARN_RET(tmp_ret, "set cpu shares failed", K(tmp_ret), K_(id), K(cpu_shares));
1187
int64_t ObTenant::cpu_quota_concurrency() const
1189
ObTenantConfigGuard tenant_config(TENANT_CONF(id_));
1190
return static_cast<int64_t>((tenant_config.is_valid() ? tenant_config->cpu_quota_concurrency : 4));
1193
int64_t ObTenant::min_worker_cnt() const
1195
ObTenantConfigGuard tenant_config(TENANT_CONF(id_));
1196
return 2 + std::max(1L, static_cast<int64_t>(unit_min_cpu() * (tenant_config.is_valid() ? tenant_config->cpu_quota_concurrency : 4)));
1199
int64_t ObTenant::max_worker_cnt() const
1201
return std::max(tenant_meta_.unit_.config_.memory_size() / 20 / (GCONF.stack_size + (3 << 20) + (512 << 10)),
1205
int ObTenant::get_new_request(
1208
rpc::ObRequest *&req)
1210
int ret = OB_SUCCESS;
1211
ObLink* task = nullptr;
1215
Thread::WaitGuard guard(Thread::WAIT_IN_TENANT_QUEUE);
1216
if (w.is_group_worker()) {
1217
w.set_large_query(false);
1218
w.set_curr_request_level(0);
1219
wk_level = w.get_worker_level();
1220
if (wk_level < 0 || wk_level >= MAX_REQUEST_LEVEL) {
1221
ret = OB_ERR_UNEXPECTED;
1222
LOG_ERROR("unexpected level", K(wk_level), K(id_));
1223
} else if (wk_level >= MAX_REQUEST_LEVEL - 1) {
1224
ret = w.get_group()->multi_level_queue_.pop_timeup(task, wk_level, timeout);
1225
if ((ret == OB_SUCCESS && nullptr == task) || ret == OB_ENTRY_NOT_EXIST) {
1226
ret = OB_ENTRY_NOT_EXIST;
1228
} else if (ret == OB_SUCCESS){
1229
rpc::ObRequest *tmp_req = static_cast<rpc::ObRequest*>(task);
1230
LOG_WARN("req is timeout and discard", "tenant_id", id_, K(tmp_req));
1232
LOG_ERROR("pop queue err", "tenant_id", id_, K(ret));
1234
} else if (w.is_level_worker()) {
1235
ret = w.get_group()->multi_level_queue_.pop(task, wk_level, timeout);
1237
for (int32_t level = MAX_REQUEST_LEVEL - 1; level >= GROUP_MULTI_LEVEL_THRESHOLD; level--) {
1238
IGNORE_RETURN w.get_group()->multi_level_queue_.try_pop(task, level);
1239
if (nullptr != task) {
1244
if (nullptr == task) {
1245
ret = w.get_group()->req_queue_.pop(task, timeout);
1249
w.set_large_query(false);
1250
w.set_curr_request_level(0);
1251
wk_level = w.get_worker_level();
1252
if (wk_level < 0 || wk_level >= MAX_REQUEST_LEVEL) {
1253
ret = OB_ERR_UNEXPECTED;
1254
LOG_ERROR("unexpected level", K(wk_level), K(id_));
1255
} else if (wk_level >= MAX_REQUEST_LEVEL - 1) {
1256
ret = multi_level_queue_->pop_timeup(task, wk_level, timeout);
1257
if ((ret == OB_SUCCESS && nullptr == task) || ret == OB_ENTRY_NOT_EXIST) {
1258
ret = OB_ENTRY_NOT_EXIST;
1261
ob_usleep(10 * 1000L);
1262
} else if (ret == OB_SUCCESS){
1263
rpc::ObRequest *tmp_req = static_cast<rpc::ObRequest*>(task);
1264
LOG_WARN("req is timeout and discard", "tenant_id", id_, K(tmp_req));
1266
LOG_ERROR("pop queue err", "tenant_id", id_, K(ret));
1268
} else if (w.is_level_worker()) {
1269
ret = multi_level_queue_->pop(task, wk_level, timeout);
1271
if (w.is_default_worker()) {
1272
for (int32_t level = MAX_REQUEST_LEVEL - 1; level >= 1; level--) {
1273
IGNORE_RETURN multi_level_queue_->try_pop(task, level);
1274
if (nullptr != task) {
1280
if (OB_ISNULL(task)) {
1281
if (OB_UNLIKELY(w.is_high_priority())) {
1284
ret = req_queue_.pop_high(task, timeout);
1285
} else if (OB_UNLIKELY(w.is_normal_priority())) {
1288
ret = req_queue_.pop_normal(task, timeout);
1292
ATOMIC_INC(&pop_normal_cnt_);
1293
ret = req_queue_.pop(task, timeout);
1300
EVENT_INC(REQUEST_DEQUEUE_COUNT);
1301
if (nullptr == req && nullptr != task) {
1302
req = static_cast<rpc::ObRequest*>(task);
1304
if (nullptr != req) {
1305
if (w.is_group_worker() && req->large_retry_flag()) {
1306
w.set_large_query();
1308
if (req->get_type() == ObRequest::OB_RPC) {
1309
using obrpc::ObRpcPacket;
1310
const ObRpcPacket &pkt
1311
= static_cast<const ObRpcPacket&>(req->get_packet());
1312
w.set_curr_request_level(pkt.get_request_level());
1319
using oceanbase::obrpc::ObRpcPacket;
1320
inline bool is_high_prio(const ObRpcPacket &pkt)
1322
return pkt.get_priority() < 5;
1325
inline bool is_normal_prio(const ObRpcPacket &pkt)
1327
return pkt.get_priority() == 5;
1330
inline bool is_low_prio(const ObRpcPacket &pkt)
1332
return pkt.get_priority() > 5 && pkt.get_priority() < 10;
1335
inline bool is_ddl(const ObRpcPacket &pkt)
1337
return pkt.get_priority() == 10;
1340
inline bool is_warmup(const ObRpcPacket &pkt)
1342
return pkt.get_priority() == 11;
1345
int ObTenant::recv_group_request(ObRequest &req, int64_t group_id)
1347
int ret = OB_SUCCESS;
1348
int64_t now = ObTimeUtility::current_time();
1349
req.set_enqueue_timestamp(now);
1350
ObResourceGroup* group = nullptr;
1351
ObResourceGroupNode* node = nullptr;
1352
ObResourceGroupNode key(group_id);
1354
if (OB_SUCC(GroupMap::err_code_map(group_map_.get(&key, node)))) {
1355
group = static_cast<ObResourceGroup*>(node);
1356
} else if (OB_FAIL(group_map_.create_and_insert_group(group_id, this, &cgroup_ctrl_, group))) {
1357
if (OB_ENTRY_EXIST == ret && OB_SUCC(GroupMap::err_code_map(group_map_.get(&key, node)))) {
1358
group = static_cast<ObResourceGroup*>(node);
1360
LOG_WARN("failed to create and insert group", K(ret), K(group_id), K(id_));
1363
LOG_INFO("create group successfully", K_(id), K(group_id), K(group));
1366
if (req.get_type() == ObRequest::OB_RPC) {
1367
using obrpc::ObRpcPacket;
1368
const ObRpcPacket &pkt
1369
= static_cast<const ObRpcPacket&>(req.get_packet());
1370
req_level = min(pkt.get_request_level(), MAX_REQUEST_LEVEL - 1);
1372
if (req_level < 0) {
1373
ret = OB_ERR_UNEXPECTED;
1374
LOG_ERROR("unexpected level", K(req_level), K(id_), K(group_id));
1375
} else if (is_user_group(group_id) && req_level >= GROUP_MULTI_LEVEL_THRESHOLD) {
1376
group->recv_level_rpc_cnt_.atomic_inc(req_level);
1377
if (OB_FAIL(group->multi_level_queue_.push(req, req_level, 0))) {
1378
LOG_WARN("push request to queue fail", K(req_level), K(id_), K(group_id));
1381
group->atomic_inc_recv_cnt();
1382
if (OB_FAIL(group->req_queue_.push(&req, 0))) {
1383
LOG_ERROR("push request to queue fail", K(id_), K(group_id));
1386
int tmp_ret = OB_SUCCESS;
1387
if (!share::ObCgSet::instance().is_group_critical(group_id) && 0 == group->workers_.get_size()) {
1388
if (OB_SUCCESS == (tmp_ret = group->workers_lock_.trylock())) {
1389
if (0 == group->workers_.get_size()) {
1390
int64_t succ_num = 0L;
1391
group->token_change_ts_ = now;
1392
ATOMIC_STORE(&group->shrink_, false);
1393
group->acquire_more_worker(1, succ_num, true);
1394
LOG_INFO("worker thread created", K(id()), K(group->group_id_));
1396
IGNORE_RETURN group->workers_lock_.unlock();
1398
LOG_WARN("failed to lock group workers", K(ret), K(id_), K(group_id));
1405
int ObTenant::recv_request(ObRequest &req)
1407
int ret = OB_SUCCESS;
1409
if (has_stopped()) {
1410
ret = OB_TENANT_NOT_IN_SERVER;
1411
LOG_WARN("receive request but tenant has already stopped", K(ret), K(id_));
1412
} else if (0 != req.get_group_id()) {
1413
if (OB_FAIL(recv_group_request(req, req.get_group_id()))) {
1414
LOG_ERROR("recv group request failed", K(ret), K(id_), K(req.get_group_id()));
1424
req.set_enqueue_timestamp(ObTimeUtility::current_time());
1425
req.set_trace_point(ObRequest::OB_EASY_REQUEST_TENANT_RECEIVED);
1426
switch (req.get_type()) {
1427
case ObRequest::OB_RPC: {
1428
using obrpc::ObRpcPacket;
1429
const ObRpcPacket& pkt = static_cast<const ObRpcPacket&>(req.get_packet());
1430
req_level = min(pkt.get_request_level(), MAX_REQUEST_LEVEL - 1);
1431
if (req_level < 0) {
1432
ret = OB_ERR_UNEXPECTED;
1433
LOG_ERROR("unexpected level", K(req_level), K(id_));
1434
} else if (req_level >= MULTI_LEVEL_THRESHOLD) {
1435
recv_level_rpc_cnt_.atomic_inc(req_level);
1436
if (OB_FAIL(multi_level_queue_->push(req, req_level, 0))) {
1437
LOG_WARN("push request to queue fail", K(ret), K(this));
1444
if (is_high_prio(pkt)) {
1445
ATOMIC_INC(&recv_hp_rpc_cnt_);
1446
if (OB_FAIL(req_queue_.push(&req, QQ_HIGH))) {
1447
if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
1448
LOG_WARN("push request to queue fail", K(ret), K(*this));
1451
} else if (req.is_retry_on_lock()) {
1452
ATOMIC_INC(&recv_retry_on_lock_rpc_cnt_);
1453
if (OB_FAIL(req_queue_.push(&req, QQ_NORMAL))) {
1454
LOG_WARN("push request to QQ_NORMAL queue fail", K(ret), K(this));
1456
} else if (pkt.is_kv_request()) {
1458
ATOMIC_INC(&recv_np_rpc_cnt_);
1459
if (OB_FAIL(req_queue_.push(&req, RQ_NORMAL))) {
1460
LOG_WARN("push kv request to queue fail", K(ret), K(this));
1462
} else if (is_normal_prio(pkt) || is_low_prio(pkt)) {
1463
ATOMIC_INC(&recv_np_rpc_cnt_);
1464
if (OB_FAIL(req_queue_.push(&req, QQ_LOW))) {
1465
LOG_WARN("push request to queue fail", K(ret), K(this));
1467
} else if (is_ddl(pkt)) {
1468
ret = OB_ERR_UNEXPECTED;
1469
LOG_WARN("priority 10 should not come here", K(ret));
1470
} else if (is_warmup(pkt)) {
1471
ATOMIC_INC(&recv_lp_rpc_cnt_);
1472
if (OB_FAIL(req_queue_.push(&req, RQ_LOW))) {
1473
LOG_WARN("push request to queue fail", K(ret), K(this));
1476
ret = OB_ERR_UNEXPECTED;
1477
LOG_ERROR("unexpected priority", K(ret), K(pkt.get_priority()));
1482
case ObRequest::OB_MYSQL: {
1483
if (req.is_retry_on_lock()) {
1484
ATOMIC_INC(&recv_retry_on_lock_mysql_cnt_);
1485
if (OB_FAIL(req_queue_.push(&req, RQ_HIGH))) {
1486
LOG_WARN("push request to RQ_HIGH queue fail", K(ret), K(this));
1489
ATOMIC_INC(&recv_mysql_cnt_);
1490
if (OB_FAIL(req_queue_.push(&req, RQ_NORMAL))) {
1491
LOG_WARN("push request to queue fail", K(ret), K(this));
1496
case ObRequest::OB_TASK:
1497
case ObRequest::OB_TS_TASK: {
1498
ATOMIC_INC(&recv_task_cnt_);
1499
if (OB_FAIL(req_queue_.push(&req, RQ_HIGH))) {
1500
LOG_WARN("push request to queue fail", K(ret), K(this));
1504
case ObRequest::OB_SQL_TASK: {
1505
ATOMIC_INC(&recv_sql_task_cnt_);
1506
if (OB_FAIL(req_queue_.push(&req, RQ_NORMAL))) {
1507
LOG_WARN("push request to queue fail", K(ret), K(this));
1512
ret = OB_ERR_UNEXPECTED;
1513
LOG_ERROR("unknown request type", K(ret));
1520
ObTenantStatEstGuard guard(id_);
1521
EVENT_INC(REQUEST_ENQUEUE_COUNT);
1524
if (OB_SIZE_OVERFLOW == ret || req_queue_.size() >= FASTSTACK_REQ_QUEUE_SIZE_THRESHOLD) {
1525
IGNORE_RETURN faststack();
1531
int ObTenant::recv_large_request(rpc::ObRequest &req)
1533
int ret = OB_SUCCESS;
1534
req.set_enqueue_timestamp(ObTimeUtility::current_time());
1535
req.set_large_retry_flag(true);
1536
if (0 != req.get_group_id()) {
1537
if (OB_FAIL(recv_group_request(req, req.get_group_id()))) {
1538
LOG_WARN("tenant receive large retry request fail", K(ret));
1540
} else if (OB_FAIL(recv_group_request(req, OBCG_LQ))){
1541
LOG_ERROR("recv large request failed", K(id_));
1543
ObTenantStatEstGuard guard(id_);
1544
EVENT_INC(REQUEST_ENQUEUE_COUNT);
1549
int ObTenant::push_retry_queue(rpc::ObRequest &req, const uint64_t timestamp)
1551
int ret = OB_SUCCESS;
1552
if (has_stopped()) {
1553
ret = OB_IN_STOP_STATE;
1554
LOG_WARN("receive retry request but tenant has already stopped", K(ret), K(id_));
1555
} else if (OB_FAIL(retry_queue_.push(req, timestamp))) {
1556
LOG_ERROR("push retry queue failed", K(ret), K(id_));
1561
int ObTenant::timeup()
1563
int ret = OB_SUCCESS;
1565
if (!has_stopped() && OB_SUCC(try_rdlock(handle))) {
1567
if (!has_stopped()) {
1568
check_group_worker_count();
1569
check_worker_count();
1570
update_token_usage();
1572
update_queue_size();
1574
IGNORE_RETURN unlock(handle);
1579
void ObTenant::handle_retry_req(bool need_clear)
1581
int ret = OB_SUCCESS;
1582
ObLink* task = nullptr;
1583
ObRequest *req = NULL;
1584
while (OB_SUCC(retry_queue_.pop(task, need_clear))) {
1585
req = static_cast<rpc::ObRequest*>(task);
1586
if (OB_FAIL(recv_large_request(*req))) {
1587
LOG_ERROR("tenant patrol push req fail", "tenant", id_);
1593
void ObTenant::update_queue_size()
1595
ObResourceGroupNode* iter = NULL;
1596
ObResourceGroup* group = nullptr;
1597
while (NULL != (iter = group_map_.quick_next(iter))) {
1598
group = static_cast<ObResourceGroup*>(iter);
1599
group->update_queue_size();
1601
req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
1602
if (nullptr != multi_level_queue_) {
1603
multi_level_queue_->set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
1607
void ObTenant::check_worker_count()
1609
int ret = OB_SUCCESS;
1610
if (OB_SUCC(workers_lock_.trylock())) {
1612
int64_t now = ObTimeUtility::current_time();
1613
bool enable_dynamic_worker = true;
1614
int64_t threshold = 3 * 1000;
1616
ObTenantConfigGuard tenant_config(TENANT_CONF(id_));
1617
enable_dynamic_worker = tenant_config.is_valid() ? tenant_config->_ob_enable_dynamic_worker : true;
1618
threshold = tenant_config.is_valid() ? tenant_config->_stall_threshold_for_dynamic_worker : 3 * 1000;
1621
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
1622
const auto w = static_cast<ObThWorker*>(wnode->get_data());
1623
if (w->has_set_stop()) {
1624
workers_.remove(wnode);
1626
} else if (w->has_req_flag()
1627
&& 0 != w->blocking_ts()
1628
&& now - w->blocking_ts() >= threshold
1629
&& w->is_default_worker()
1630
&& enable_dynamic_worker) {
1634
int64_t succ_num = 0L;
1635
token = std::max(token, min_worker_cnt());
1636
token = std::min(token, max_worker_cnt());
1637
if (OB_UNLIKELY(workers_.get_size() < min_worker_cnt())) {
1638
const auto diff = min_worker_cnt() - workers_.get_size();
1639
token_change_ts_ = now;
1640
ATOMIC_STORE(&shrink_, false);
1641
acquire_more_worker(diff, succ_num, true);
1642
LOG_INFO("worker thread created", K(id_), K(token));
1643
} else if (OB_UNLIKELY(token > workers_.get_size())
1644
&& OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(id_) > ObMallocAllocator::get_instance()->get_tenant_limit(id_) * 0.05)) {
1645
ATOMIC_STORE(&shrink_, false);
1646
if (OB_LIKELY(now - token_change_ts_ >= EXPAND_INTERVAL)) {
1647
token_change_ts_ = now;
1648
acquire_more_worker(1, succ_num);
1649
LOG_INFO("worker thread created", K(id_), K(token));
1651
} else if (OB_UNLIKELY(token < workers_.get_size())
1652
&& OB_LIKELY(now - token_change_ts_ >= SHRINK_INTERVAL)) {
1653
token_change_ts_ = now;
1654
ATOMIC_STORE(&shrink_, true);
1655
LOG_INFO("worker thread began to shrink", K(id_), K(token));
1657
IGNORE_RETURN workers_lock_.unlock();
1660
if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread &&
1661
(is_sys_tenant(id_) || is_user_tenant(id_))) {
1662
GCTX.net_frame_->reload_tenant_sql_thread_config(id_);
1666
void ObTenant::check_group_worker_count()
1668
ObResourceGroupNode* iter = NULL;
1669
ObResourceGroup* group = nullptr;
1670
while (NULL != (iter = group_map_.quick_next(iter))) {
1671
group = static_cast<ObResourceGroup*>(iter);
1672
group->check_worker_count();
1676
void ObTenant::check_worker_count(ObThWorker &w)
1678
int ret = OB_SUCCESS;
1679
if (OB_LIKELY(w.is_default_worker())
1680
&& OB_UNLIKELY(ATOMIC_LOAD(&shrink_))
1681
&& OB_LIKELY(ATOMIC_BCAS(&shrink_, true, false))) {
1683
if (cgroup_ctrl_.is_valid() && OB_FAIL(cgroup_ctrl_.remove_self_from_cgroup(id_))) {
1684
LOG_WARN("remove thread from cgroup failed", K(ret), K_(id));
1686
LOG_INFO("worker thread exit", K(id_), K(workers_.get_size()));
1690
int ObTenant::acquire_level_worker(int64_t num, int64_t &succ_num, int32_t level)
1692
int ret = OB_SUCCESS;
1693
ObTenantSwitchGuard guard(this);
1695
const auto need_num = num;
1698
if (level <= 0 || level > MAX_REQUEST_LEVEL) {
1699
ret = OB_ERR_UNEXPECTED;
1700
LOG_ERROR("unexpected level", K(level), K(id_));
1702
while (OB_SUCC(ret) && need_num > succ_num) {
1703
ObThWorker *w = nullptr;
1704
if (OB_FAIL(create_worker(w, this, 0, level, true))) {
1705
LOG_WARN("create worker failed", K(ret));
1706
} else if (!nesting_workers_.add_last(&w->worker_node_)) {
1708
ret = OB_ERR_UNEXPECTED;
1709
LOG_ERROR("add worker to list fail", K(ret));
1716
if (need_num != num ||
1717
succ_num != need_num
1719
if (TC_REACH_TIME_INTERVAL(10000000)) {
1720
LOG_WARN("Alloc level worker less than lack", K(num), K(need_num), K(succ_num));
1728
int ObTenant::acquire_more_worker(int64_t num, int64_t &succ_num, bool force)
1730
int ret = OB_SUCCESS;
1733
ObTenantSwitchGuard guard(this);
1734
while (OB_SUCC(ret) && num > succ_num) {
1735
ObThWorker *w = nullptr;
1736
if (OB_FAIL(create_worker(w, this, 0, 0, force))) {
1737
LOG_WARN("create worker failed", K(ret));
1738
} else if (!workers_.add_last(&w->worker_node_)) {
1740
ret = OB_ERR_UNEXPECTED;
1741
LOG_ERROR("add worker to list fail", K(ret));
1750
void ObTenant::lq_end(ObThWorker &w)
1752
int ret = OB_SUCCESS;
1753
if (w.is_lq_yield()) {
1754
if (OB_FAIL(cgroup_ctrl_.add_self_to_cgroup(id_, w.get_group_id()))) {
1755
LOG_WARN("move thread from lq group failed", K(ret), K(id_));
1757
w.set_lq_yield(false);
1762
void ObTenant::lq_wait(ObThWorker &w)
1764
int64_t last_query_us = ObTimeUtility::current_time() - w.get_last_wakeup_ts();
1765
int64_t lq_group_worker_cnt = w.get_group()->workers_.get_size();
1766
int64_t default_group_worker_cnt = workers_.get_size();
1767
double large_query_percentage = GCONF.large_query_worker_percentage / 100.0;
1768
int64_t wait_us = static_cast<int64_t>(last_query_us * lq_group_worker_cnt /
1769
(default_group_worker_cnt * large_query_percentage) -
1771
wait_us = std::min(wait_us, min(100 * 1000, w.get_timeout_remain()));
1772
if (wait_us > 10 * 1000) {
1774
w.set_last_wakeup_ts(ObTimeUtility::current_time());
1778
int ObTenant::lq_yield(ObThWorker &w)
1780
int ret = OB_SUCCESS;
1781
ATOMIC_INC(&tt_large_quries_);
1782
if (!cgroup_ctrl_.is_valid()) {
1783
if (w.get_group_id() == share::OBCG_LQ) {
1786
} else if (w.is_lq_yield()) {
1788
} else if (OB_FAIL(cgroup_ctrl_.add_self_to_cgroup(id_, OBCG_LQ))) {
1789
LOG_WARN("move thread to lq group failed", K(ret), K(id_));
1797
void ObTenant::update_token_usage()
1799
int ret = OB_SUCCESS;
1800
const auto now = ObTimeUtility::current_time();
1801
const auto duration = static_cast<double>(now - token_usage_check_ts_);
1802
if (duration >= 1000 * 1000 && OB_SUCC(workers_lock_.trylock())) {
1803
ObResourceGroupNode* iter = NULL;
1804
ObResourceGroup* group = nullptr;
1805
int64_t idle_us = 0;
1806
token_usage_check_ts_ = now;
1807
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
1808
const auto w = static_cast<ObThWorker*>(wnode->get_data());
1809
idle_us += ATOMIC_SET(&w->idle_us_, 0);
1811
DLIST_FOREACH_REMOVESAFE(wnode, nesting_workers_) {
1812
const auto w = static_cast<ObThWorker*>(wnode->get_data());
1813
idle_us += ATOMIC_SET(&w->idle_us_, 0);
1815
while (OB_NOT_NULL(iter = group_map_.quick_next(iter))) {
1816
group = static_cast<ObResourceGroup*>(iter);
1817
DLIST_FOREACH_REMOVESAFE(wnode, group->workers_) {
1818
const auto w = static_cast<ObThWorker*>(wnode->get_data());
1819
idle_us += ATOMIC_SET(&w->idle_us_, 0);
1822
workers_lock_.unlock();
1823
const auto total_us = duration * total_worker_cnt_;
1824
token_usage_ = std::max(.0, 1.0 * (total_us - idle_us) / total_us);
1825
IGNORE_RETURN ATOMIC_FAA(&worker_us_, total_us - idle_us);
1828
if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid()) {
1830
} else if (duration >= 1000 * 1000 && OB_SUCC(thread_list_lock_.trylock())) {
1831
int64_t cpu_time_inc = 0;
1832
DLIST_FOREACH_REMOVESAFE(thread_list_node_, thread_list_)
1834
Thread *thread = thread_list_node_->get_data();
1836
if (OB_SUCC(thread->get_cpu_time_inc(inc))) {
1837
cpu_time_inc += inc;
1840
thread_list_lock_.unlock();
1841
IGNORE_RETURN ATOMIC_FAA(&cpu_time_us_, cpu_time_inc);
1845
void ObTenant::periodically_check()
1847
int ret = OB_SUCCESS;
1849
check_parallel_servers_target();
1850
check_resource_manager_plan();
1852
check_px_thread_recycle();
1856
void ObTenant::check_resource_manager_plan()
1858
int ret = OB_SUCCESS;
1860
ObResourcePlanManager &plan_mgr = G_RES_MGR.get_plan_mgr();
1861
ObResourceMappingRuleManager &rule_mgr = G_RES_MGR.get_mapping_rule_mgr();
1862
ObResourceColMappingRuleManager &col_rule_mgr = G_RES_MGR.get_col_mapping_rule_mgr();
1863
char data[OB_MAX_RESOURCE_PLAN_NAME_LENGTH];
1864
ObDataBuffer allocator(data, OB_MAX_RESOURCE_PLAN_NAME_LENGTH);
1865
if (OB_SYS_TENANT_ID != id_ && OB_MAX_RESERVED_TENANT_ID >= id_) {
1867
} else if (OB_FAIL(ObSchemaUtils::get_tenant_varchar_variable(
1869
SYS_VAR_RESOURCE_MANAGER_PLAN,
1872
LOG_WARN("fail get tenant variable", K(id_), K(plan_name), K(ret));
1874
} else if (OB_FAIL(rule_mgr.refresh_group_mapping_rule(id_, plan_name))) {
1875
LOG_WARN("refresh group id name mapping rule fail."
1876
"Tenant resource isolation may not work",
1877
K(id_), K(plan_name), K(ret));
1878
} else if (OB_FAIL(plan_mgr.refresh_resource_plan(id_, plan_name))) {
1879
LOG_WARN("refresh resource plan fail."
1880
"Tenant resource isolation may not work",
1881
K(id_), K(plan_name), K(ret));
1882
} else if (OB_FAIL(rule_mgr.refresh_resource_mapping_rule(id_, plan_name))) {
1883
LOG_WARN("refresh resource mapping rule fail."
1884
"Tenant resource isolation may not work",
1885
K(id_), K(plan_name), K(ret));
1886
} else if (OB_FAIL(col_rule_mgr.refresh_resource_column_mapping_rule(id_, get<ObPlanCache*>(),
1888
LOG_WARN("refresh resource column mapping rule fail."
1889
"Tenant resource isolation may not work",
1890
K(id_), K(plan_name), K(ret));
1894
void ObTenant::check_dtl()
1896
int ret = OB_SUCCESS;
1897
if (is_virtual_tenant_id(id_)) {
1900
ObTenantSwitchGuard guard(this);
1901
auto tenant_dfc = MTL(ObTenantDfc*);
1902
if (OB_NOT_NULL(tenant_dfc)) {
1903
tenant_dfc->check_dtl(id_);
1905
ret = OB_ERR_UNEXPECTED;
1906
LOG_WARN("failed to switch to tenant", K(id_), K(ret));
1911
void ObTenant::check_das()
1913
int ret = OB_SUCCESS;
1914
if (!is_virtual_tenant_id(id_)) {
1915
ObTenantSwitchGuard guard(this);
1916
if (OB_ISNULL(MTL(ObDataAccessService *))) {
1917
ret = OB_ERR_UNEXPECTED;
1918
LOG_WARN("failed to get das ptr", K(MTL_ID()));
1920
double min_cpu = .0;
1921
double max_cpu = .0;
1922
if (OB_FAIL(GCTX.omt_->get_tenant_cpu(MTL_ID(), min_cpu, max_cpu))) {
1923
LOG_WARN("failed to set das task max concurrency", K(MTL_ID()));
1925
MTL(ObDataAccessService *)->set_max_concurrency(min_cpu);
1931
void ObTenant::check_parallel_servers_target()
1933
int ret = OB_SUCCESS;
1935
if (is_virtual_tenant_id(id_)) {
1937
} else if (OB_FAIL(ObSchemaUtils::get_tenant_int_variable(
1939
SYS_VAR_PARALLEL_SERVERS_TARGET,
1941
LOG_WARN("fail read tenant variable", K_(id), K(ret));
1942
} else if (OB_FAIL(OB_PX_TARGET_MGR.set_parallel_servers_target(id_, val))) {
1943
LOG_WARN("set parallel_servers_target failed", K(ret), K(id_), K(val));
1947
void ObTenant::check_px_thread_recycle()
1949
int ret = OB_SUCCESS;
1950
if (is_virtual_tenant_id(id_)) {
1953
ObTenantSwitchGuard guard(this);
1954
auto px_pools = MTL(ObPxPools*);
1955
if (OB_NOT_NULL(px_pools)) {
1956
px_pools->thread_recycle();
1958
ret = OB_ERR_UNEXPECTED;
1959
LOG_WARN("failed to switch to tenant", K(id_), K(ret));