oceanbase
410 строк · 13.9 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX SHARE_SCHEMA
14
15#include "rootserver/parallel_ddl/ob_tablet_balance_allocator.h"
16#include "rootserver/ob_balance_group_ls_stat_operator.h"
17#include "share/ob_share_util.h"
18#include "observer/omt/ob_tenant_config_mgr.h"
19
20using namespace oceanbase::lib;
21using namespace oceanbase::common;
22using namespace oceanbase::share;
23using namespace oceanbase::share::schema;
24using namespace oceanbase::rootserver;
25
26
27ObNonPartitionedTableTabletCache::ObNonPartitionedTableTabletCache(
28const uint64_t tenant_id,
29common::ObMySQLProxy &sql_proxy)
30: mutex_(),
31tenant_id_(tenant_id),
32sql_proxy_(sql_proxy),
33allocator_(ObMemAttr(OB_SYS_TENANT_ID, "NonPartTabtCac", ObCtxIds::SCHEMA_SERVICE),
34PAGE_SIZE),
35cache_(ARRAY_BLOCK_SIZE, ModulePageAllocator(allocator_)),
36loaded_timestamp_(OB_INVALID_TIMESTAMP),
37dump_timestamp_(OB_INVALID_TIMESTAMP)
38{
39}
40
41void ObNonPartitionedTableTabletCache::reset_cache()
42{
43lib::ObMutexGuard guard(mutex_);
44(void) inner_reset_cache_();
45}
46
47void ObNonPartitionedTableTabletCache::inner_reset_cache_()
48{
49cache_.reset();
50allocator_.reset();
51loaded_timestamp_ = OB_INVALID_TIMESTAMP;
52LOG_INFO("[NON PARTITIONED TABLET CACHE] reset cache", K_(tenant_id));
53}
54
55// In the following cases, cache will be reload first:
56// 1. cache_ is empty
57// 2. cache_ is expire (consider transfer may change the placement of related tablets)
58// 3. cache_ and avaliable_ls_ids are not matched (ls cnt or status changed)
59bool ObNonPartitionedTableTabletCache::should_reload_cache_(
60const common::ObIArray<share::ObLSID> &avaliable_ls_ids)
61{
62bool bret = false;
63int64_t interval = INT64_MAX;
64{
65omt::ObTenantConfigGuard tenant_config(OTC_MGR.get_tenant_config_with_lock(tenant_id_));
66if (tenant_config.is_valid()) {
67interval = tenant_config->partition_balance_schedule_interval;
68}
69}
70if (loaded_timestamp_ < 0) {
71bret = true; // case 1
72LOG_INFO("[NON PARTITIONED TABLET CACHE] failure/non parallel ddl occur or cache is empty, should be reloaded", K_(tenant_id));
73} else if (ObTimeUtility::current_time() - loaded_timestamp_ >= interval) {
74bret = true; // case 2
75LOG_INFO("[NON PARTITIONED TABLET CACHE] cache is expire, should be reloaded", K_(tenant_id));
76} else {
77// case 3
78if (avaliable_ls_ids.count() != cache_.count()) {
79bret = true;
80} else {
81for (int64_t i = 0; !bret && i < cache_.count(); i++) {
82ObLSID ls_id = cache_.at(i).get_ls_id();
83if (!has_exist_in_array(avaliable_ls_ids, ls_id)) {
84bret = true;
85}
86} // end for
87}
88if (bret) {
89LOG_INFO("[NON PARTITIONED TABLET CACHE] ls is changed, should be reloaded", K_(tenant_id));
90}
91}
92return bret;
93}
94
95int ObNonPartitionedTableTabletCache::reload_cache_(
96const common::ObIArray<share::ObLSID> &avaliable_ls_ids)
97{
98int ret = OB_SUCCESS;
99(void) inner_reset_cache_();
100
101ObBalanceGroupLSStatOperator op;
102common::ObArray<ObBalanceGroupLSStat> bg_ls_stat_array;
103ObBalanceGroupID bg_id(0, 0); // for non-partitioned table
104ObString bg_name(rootserver::ObBalanceGroup::NON_PART_BG_NAME);
105const int64_t default_timeout = GCONF.internal_sql_execute_timeout;
106int64_t start_time = ObTimeUtility::current_time();
107if (OB_FAIL(op.init(&sql_proxy_))) {
108LOG_WARN("fail to init ObBalanceGroupLSStatOperator", KR(ret));
109} else if (OB_FAIL(op.get_balance_group_ls_stat(
110default_timeout,
111sql_proxy_,
112tenant_id_,
113bg_id,
114false, /*for update*/
115bg_ls_stat_array))) {
116LOG_WARN("fail to get balance ls stat array", KR(ret), K_(tenant_id));
117} else {
118// 1. get existed ls stat
119common::ObArray<ObBalanceGroupLSStat> new_ls_stat_array;
120for (int64_t i = 0; OB_SUCC(ret) && i < bg_ls_stat_array.count(); i++) {
121const ObBalanceGroupLSStat &ls_stat = bg_ls_stat_array.at(i);
122ObLSID ls_id = ls_stat.get_ls_id();
123if (has_exist_in_array(avaliable_ls_ids, ls_id)) {
124if (OB_FAIL(new_ls_stat_array.push_back(ls_stat))) {
125LOG_WARN("fail to push back ObBalanceGroupLSStat", KR(ret), K(ls_stat));
126}
127}
128} // end for
129
130// 2. insert missing ls stat
131common::ObArray<ObBalanceGroupLSStat> miss_ls_stat_array;
132if (OB_SUCC(ret)) {
133for (int64_t i = 0; OB_SUCC(ret) && i < avaliable_ls_ids.count(); i++) {
134const ObLSID &ls_id = avaliable_ls_ids.at(i);
135bool finded = false;
136for (int64_t j = 0; !finded && OB_SUCC(ret) && j < bg_ls_stat_array.count(); j++) {
137const ObBalanceGroupLSStat &ls_stat = bg_ls_stat_array.at(j);
138if (ls_id == ls_stat.get_ls_id()) {
139finded = true;
140}
141} // end for
142if (OB_SUCC(ret) && !finded) {
143ObBalanceGroupLSStat ls_stat;
144if (OB_FAIL(ls_stat.build(tenant_id_, bg_id, ls_id, 0 /*bg cnt*/, bg_name))) {
145LOG_WARN("fail to build ls_stat", KR(ret), K_(tenant_id), K(ls_id));
146} else if (OB_FAIL(miss_ls_stat_array.push_back(ls_stat))) {
147LOG_WARN("fail to push back miss ls stat", KR(ret), K(ls_stat));
148}
149}
150} // end for
151
152if (OB_SUCC(ret) && miss_ls_stat_array.count() > 0) {
153if (OB_FAIL(op.insert_update_balance_group_ls_stat(
154default_timeout, tenant_id_, bg_id, miss_ls_stat_array))) {
155LOG_WARN("fail to insert miss ls stat", KR(ret), K_(tenant_id), K(miss_ls_stat_array));
156}
157}
158}
159
160// 3. store in cache
161if (FAILEDx(append(new_ls_stat_array, miss_ls_stat_array))) {
162LOG_WARN("fail to append ls stat array", KR(ret), K_(tenant_id), K(miss_ls_stat_array));
163} else {
164for (int64_t i = 0; OB_SUCC(ret) && i < new_ls_stat_array.count(); i++) {
165const ObBalanceGroupLSStat &ls_stat = new_ls_stat_array.at(i);
166Pair pair(ls_stat.get_ls_id(), ls_stat.get_tablet_group_count());
167if (OB_FAIL(cache_.push_back(pair))) {
168LOG_WARN("fail to push back pair", KR(ret), K_(tenant_id), K(ls_stat));
169}
170} // end for
171if (OB_FAIL(ret)) {
172(void) inner_reset_cache_();
173}
174}
175
176if (OB_SUCC(ret)) {
177loaded_timestamp_ = ObTimeUtility::current_time();
178}
179}
180LOG_INFO("[NON PARTITIONED TABLET CACHE] reload cache",
181KR(ret), K_(tenant_id), "cost", ObTimeUtility::current_time() - start_time);
182return ret;
183}
184
185int ObNonPartitionedTableTabletCache::alloc_tablet(
186const common::ObIArray<share::ObLSID> &avaliable_ls_ids,
187share::ObLSID &ls_id)
188{
189int ret = OB_SUCCESS;
190ls_id.reset();
191lib::ObMutexGuard guard(mutex_);
192if (OB_UNLIKELY(avaliable_ls_ids.empty())) {
193ret = OB_INVALID_ARGUMENT;
194LOG_WARN("invalid arg", KR(ret), K(avaliable_ls_ids.count()));
195} else if (should_reload_cache_(avaliable_ls_ids)) {
196if (OB_FAIL(reload_cache_(avaliable_ls_ids))) {
197LOG_WARN("fail to reload cache", KR(ret), K_(tenant_id), K(avaliable_ls_ids));
198}
199}
200// find ls which has min tablet cnt
201if (OB_SUCC(ret)) {
202int64_t min_tablet_cnt = INT64_MAX;
203int64_t pos = OB_INVALID_INDEX;
204for (int64_t i = 0; OB_SUCC(ret) && i < cache_.count(); i++) {
205if (min_tablet_cnt > cache_.at(i).get_tablet_cnt()) {
206min_tablet_cnt = cache_.at(i).get_tablet_cnt();
207pos = i;
208}
209} // end for
210if (OB_UNLIKELY(OB_INVALID_INDEX == pos
211|| pos >= cache_.count())) {
212ret = OB_ERR_UNEXPECTED;
213LOG_WARN("fail to find ls has min tablet cnt",
214KR(ret), K_(tenant_id), K(pos), K_(cache));
215} else {
216Pair &target_pair = cache_.at(pos);
217const int64_t tablet_cnt = target_pair.get_tablet_cnt() + 1;
218ls_id = target_pair.get_ls_id();
219target_pair.set_tablet_cnt(tablet_cnt);
220}
221}
222(void) dump_cache_();
223return ret;
224}
225
226void ObNonPartitionedTableTabletCache::dump_cache_()
227{
228const int64_t DUMP_INTERVAL = 10 * 60 * 1000 * 1000L; // 10min
229const int64_t current_time = ObTimeUtility::current_time();
230if (current_time - dump_timestamp_ >= DUMP_INTERVAL) {
231LOG_INFO("[NON PARTITIONED TABLET CACHE] dump cache", K_(tenant_id),
232K_(loaded_timestamp), K_(dump_timestamp), K_(cache));
233dump_timestamp_ = current_time;
234}
235}
236
237ObNonPartitionedTableTabletAllocator::ObNonPartitionedTableTabletAllocator()
238: rwlock_(),
239allocator_(ObMemAttr(OB_SYS_TENANT_ID, "NonPartTenCac", ObCtxIds::SCHEMA_SERVICE)),
240tenant_cache_(),
241sql_proxy_(NULL),
242inited_(false)
243{
244}
245
246ObNonPartitionedTableTabletAllocator::~ObNonPartitionedTableTabletAllocator()
247{
248destroy();
249}
250
251int ObNonPartitionedTableTabletAllocator::init(common::ObMySQLProxy &sql_proxy)
252{
253int ret = OB_SUCCESS;
254SpinWLockGuard guard(rwlock_);
255if (inited_) {
256ret = OB_INIT_TWICE;
257LOG_WARN("init twice", KR(ret));
258} else {
259const int64_t BUCKET_NUM = 1024;
260if (OB_FAIL(tenant_cache_.create(BUCKET_NUM, "NonPartTenMap", "NonPartTenMap"))) {
261LOG_WARN("fail to create hash map", KR(ret));
262} else {
263sql_proxy_ = &sql_proxy;
264inited_ = true;
265}
266}
267return ret;
268}
269
270void ObNonPartitionedTableTabletAllocator::destroy()
271{
272SpinWLockGuard guard(rwlock_);
273if (inited_) {
274FOREACH(it, tenant_cache_) {
275if (OB_NOT_NULL(it->second)) {
276(it->second)->~ObNonPartitionedTableTabletCache();
277it->second = NULL;
278}
279}
280tenant_cache_.destroy();
281allocator_.reset();
282sql_proxy_ = NULL;
283inited_ = false;
284}
285}
286
287void ObNonPartitionedTableTabletAllocator::reset_all_cache()
288{
289int ret = OB_SUCCESS;
290SpinRLockGuard guard(rwlock_);
291if (inited_) {
292FOREACH(it, tenant_cache_) {
293if (OB_NOT_NULL(it->second)) {
294(void) (it->second)->reset_cache();
295}
296}
297}
298}
299
300int ObNonPartitionedTableTabletAllocator::reset_cache(
301const uint64_t tenant_id)
302{
303int ret = OB_SUCCESS;
304SpinRLockGuard guard(rwlock_);
305if (OB_UNLIKELY(!inited_)) {
306ret = OB_NOT_INIT;
307LOG_WARN("not init", KR(ret));
308} else {
309ObNonPartitionedTableTabletCache *cache = NULL;
310if (OB_FAIL(tenant_cache_.get_refactored(tenant_id, cache))) {
311if (OB_HASH_NOT_EXIST != ret) {
312LOG_WARN("fail to get refactored", KR(ret), K(tenant_id));
313} else {
314// tenant not in cache, just skip
315ret = OB_SUCCESS;
316}
317} else if (OB_ISNULL(cache)) {
318ret = OB_ERR_UNEXPECTED;
319LOG_WARN("cache is null", KR(ret), K(tenant_id));
320} else {
321(void) cache->reset_cache();
322}
323}
324return ret;
325}
326
327int ObNonPartitionedTableTabletAllocator::try_init_cache_(
328const uint64_t tenant_id)
329{
330int ret = OB_SUCCESS;
331SpinWLockGuard guard(rwlock_);
332if (OB_UNLIKELY(!inited_)) {
333ret = OB_NOT_INIT;
334LOG_WARN("not init", KR(ret));
335} else if (OB_ISNULL(sql_proxy_)) {
336ret = OB_ERR_UNEXPECTED;
337LOG_WARN("sql_proxy is null", KR(ret));
338} else {
339ObNonPartitionedTableTabletCache *cache = NULL;
340if (OB_FAIL(tenant_cache_.get_refactored(tenant_id, cache))) {
341if (OB_HASH_NOT_EXIST != ret) {
342LOG_WARN("fail to get cache", KR(ret), K(tenant_id));
343} else {
344ret = OB_SUCCESS;
345cache = NULL;
346void *buf = NULL;
347if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObNonPartitionedTableTabletCache)))) {
348ret = OB_ALLOCATE_MEMORY_FAILED;
349LOG_WARN("fail to alloc memory", KR(ret));
350} else if (FALSE_IT(cache = new (buf) ObNonPartitionedTableTabletCache(tenant_id, *sql_proxy_))) {
351} else if (OB_FAIL(tenant_cache_.set_refactored(tenant_id, cache))) {
352LOG_WARN("fail to set cache", KR(ret), K(tenant_id));
353}
354}
355} else {
356// cache exist, just skip
357}
358}
359return ret;
360}
361
362int ObNonPartitionedTableTabletAllocator::alloc_tablet(
363const uint64_t tenant_id,
364const common::ObIArray<share::ObLSID> &avaliable_ls_ids,
365share::ObLSID &ls_id)
366{
367int ret = OB_SUCCESS;
368ls_id.reset();
369if (OB_UNLIKELY(!inited_)) {
370ret = OB_NOT_INIT;
371LOG_WARN("not init", KR(ret));
372} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
373|| avaliable_ls_ids.empty())) {
374ret = OB_INVALID_ARGUMENT;
375LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(avaliable_ls_ids.count()));
376} else if (OB_FAIL(try_init_cache_(tenant_id))) {
377LOG_WARN("try to init cache failed", KR(ret), K(tenant_id));
378} else {
379{
380SpinRLockGuard guard(rwlock_);
381ObNonPartitionedTableTabletCache *cache = NULL;
382if (OB_FAIL(tenant_cache_.get_refactored(tenant_id, cache))) {
383LOG_WARN("fail to get refactored", KR(ret), K(tenant_id));
384} else if (OB_ISNULL(cache)) {
385ret = OB_ERR_UNEXPECTED;
386LOG_WARN("cache is null", KR(ret));
387} else if (OB_FAIL(cache->alloc_tablet(avaliable_ls_ids, ls_id))) {
388LOG_WARN("fail to alloc tablet", KR(ret), K(tenant_id));
389}
390}
391if (OB_SUCC(ret)) {
392// try update ls stat
393ObBalanceGroupLSStat ls_stat;
394const ObBalanceGroupID bg_id(0, 0); // for non-partitioned table
395const ObString bg_name(rootserver::ObBalanceGroup::NON_PART_BG_NAME);
396const int64_t inc_tablet_cnt = 1;
397const int64_t default_timeout = GCONF.internal_sql_execute_timeout;
398ObBalanceGroupLSStatOperator op;
399if (OB_FAIL(op.init(sql_proxy_))) {
400LOG_WARN("fail to init ObBalanceGroupLSStatOperator", KR(ret));
401} else if (OB_FAIL(ls_stat.build(tenant_id, bg_id, ls_id, inc_tablet_cnt, bg_name))) {
402LOG_WARN("fail to build ls_stat", KR(ret), K(tenant_id), K(ls_id));
403} else if (OB_FAIL(op.inc_balance_group_ls_stat(
404default_timeout, *sql_proxy_, tenant_id, ls_stat))) {
405LOG_WARN("fail to inc ls stat", KR(ret), K(tenant_id), K(ls_stat));
406}
407}
408}
409return ret;
410}
411