oceanbase

Форк
0
/
ob_tablet_balance_allocator.cpp 
410 строк · 13.9 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SHARE_SCHEMA
14

15
#include "rootserver/parallel_ddl/ob_tablet_balance_allocator.h"
16
#include "rootserver/ob_balance_group_ls_stat_operator.h"
17
#include "share/ob_share_util.h"
18
#include "observer/omt/ob_tenant_config_mgr.h"
19

20
using namespace oceanbase::lib;
21
using namespace oceanbase::common;
22
using namespace oceanbase::share;
23
using namespace oceanbase::share::schema;
24
using namespace oceanbase::rootserver;
25

26

27
ObNonPartitionedTableTabletCache::ObNonPartitionedTableTabletCache(
28
  const uint64_t tenant_id,
29
  common::ObMySQLProxy &sql_proxy)
30
  : mutex_(),
31
    tenant_id_(tenant_id),
32
    sql_proxy_(sql_proxy),
33
    allocator_(ObMemAttr(OB_SYS_TENANT_ID, "NonPartTabtCac", ObCtxIds::SCHEMA_SERVICE),
34
               PAGE_SIZE),
35
    cache_(ARRAY_BLOCK_SIZE, ModulePageAllocator(allocator_)),
36
    loaded_timestamp_(OB_INVALID_TIMESTAMP),
37
    dump_timestamp_(OB_INVALID_TIMESTAMP)
38
{
39
}
40

41
void ObNonPartitionedTableTabletCache::reset_cache()
42
{
43
  lib::ObMutexGuard guard(mutex_);
44
  (void) inner_reset_cache_();
45
}
46

47
void ObNonPartitionedTableTabletCache::inner_reset_cache_()
48
{
49
  cache_.reset();
50
  allocator_.reset();
51
  loaded_timestamp_ = OB_INVALID_TIMESTAMP;
52
  LOG_INFO("[NON PARTITIONED TABLET CACHE] reset cache", K_(tenant_id));
53
}
54

55
// In the following cases, cache will be reload first:
56
// 1. cache_ is empty
57
// 2. cache_ is expire (consider transfer may change the placement of related tablets)
58
// 3. cache_ and avaliable_ls_ids are not matched (ls cnt or status changed)
59
bool ObNonPartitionedTableTabletCache::should_reload_cache_(
60
     const common::ObIArray<share::ObLSID> &avaliable_ls_ids)
61
{
62
  bool bret = false;
63
  int64_t interval = INT64_MAX;
64
  {
65
    omt::ObTenantConfigGuard tenant_config(OTC_MGR.get_tenant_config_with_lock(tenant_id_));
66
    if (tenant_config.is_valid()) {
67
      interval = tenant_config->partition_balance_schedule_interval;
68
    }
69
  }
70
  if (loaded_timestamp_ < 0) {
71
    bret = true; // case 1
72
    LOG_INFO("[NON PARTITIONED TABLET CACHE] failure/non parallel ddl occur or cache is empty, should be reloaded", K_(tenant_id));
73
  } else if (ObTimeUtility::current_time() - loaded_timestamp_ >= interval) {
74
    bret = true; // case 2
75
    LOG_INFO("[NON PARTITIONED TABLET CACHE] cache is expire, should be reloaded", K_(tenant_id));
76
  } else {
77
    // case 3
78
    if (avaliable_ls_ids.count() != cache_.count()) {
79
      bret = true;
80
    } else {
81
      for (int64_t i = 0; !bret && i < cache_.count(); i++) {
82
        ObLSID ls_id = cache_.at(i).get_ls_id();
83
        if (!has_exist_in_array(avaliable_ls_ids, ls_id)) {
84
          bret = true;
85
        }
86
      } // end for
87
    }
88
    if (bret) {
89
      LOG_INFO("[NON PARTITIONED TABLET CACHE] ls is changed, should be reloaded", K_(tenant_id));
90
    }
91
  }
92
  return bret;
93
}
94

95
int ObNonPartitionedTableTabletCache::reload_cache_(
96
    const common::ObIArray<share::ObLSID> &avaliable_ls_ids)
97
{
98
  int ret = OB_SUCCESS;
99
  (void) inner_reset_cache_();
100

101
  ObBalanceGroupLSStatOperator op;
102
  common::ObArray<ObBalanceGroupLSStat> bg_ls_stat_array;
103
  ObBalanceGroupID bg_id(0, 0); // for non-partitioned table
104
  ObString bg_name(rootserver::ObBalanceGroup::NON_PART_BG_NAME);
105
  const int64_t default_timeout = GCONF.internal_sql_execute_timeout;
106
  int64_t start_time = ObTimeUtility::current_time();
107
  if (OB_FAIL(op.init(&sql_proxy_))) {
108
    LOG_WARN("fail to init ObBalanceGroupLSStatOperator", KR(ret));
109
  } else if (OB_FAIL(op.get_balance_group_ls_stat(
110
             default_timeout,
111
             sql_proxy_,
112
             tenant_id_,
113
             bg_id,
114
             false, /*for update*/
115
             bg_ls_stat_array))) {
116
    LOG_WARN("fail to get balance ls stat array", KR(ret), K_(tenant_id));
117
  } else {
118
    // 1. get existed ls stat
119
    common::ObArray<ObBalanceGroupLSStat> new_ls_stat_array;
120
    for (int64_t i = 0; OB_SUCC(ret) && i < bg_ls_stat_array.count(); i++) {
121
      const ObBalanceGroupLSStat &ls_stat = bg_ls_stat_array.at(i);
122
      ObLSID ls_id = ls_stat.get_ls_id();
123
      if (has_exist_in_array(avaliable_ls_ids, ls_id)) {
124
        if (OB_FAIL(new_ls_stat_array.push_back(ls_stat))) {
125
          LOG_WARN("fail to push back ObBalanceGroupLSStat", KR(ret), K(ls_stat));
126
        }
127
      }
128
    } // end for
129

130
    // 2. insert missing ls stat
131
    common::ObArray<ObBalanceGroupLSStat> miss_ls_stat_array;
132
    if (OB_SUCC(ret)) {
133
      for (int64_t i = 0; OB_SUCC(ret) && i < avaliable_ls_ids.count(); i++) {
134
        const ObLSID &ls_id = avaliable_ls_ids.at(i);
135
        bool finded = false;
136
        for (int64_t j = 0; !finded && OB_SUCC(ret) && j < bg_ls_stat_array.count(); j++) {
137
          const ObBalanceGroupLSStat &ls_stat = bg_ls_stat_array.at(j);
138
          if (ls_id == ls_stat.get_ls_id()) {
139
            finded = true;
140
          }
141
        } // end for
142
        if (OB_SUCC(ret) && !finded) {
143
          ObBalanceGroupLSStat ls_stat;
144
          if (OB_FAIL(ls_stat.build(tenant_id_, bg_id, ls_id, 0 /*bg cnt*/, bg_name))) {
145
            LOG_WARN("fail to build ls_stat", KR(ret), K_(tenant_id), K(ls_id));
146
          } else if (OB_FAIL(miss_ls_stat_array.push_back(ls_stat))) {
147
            LOG_WARN("fail to push back miss ls stat", KR(ret), K(ls_stat));
148
          }
149
        }
150
      } // end for
151

152
      if (OB_SUCC(ret) && miss_ls_stat_array.count() > 0) {
153
        if (OB_FAIL(op.insert_update_balance_group_ls_stat(
154
            default_timeout, tenant_id_, bg_id, miss_ls_stat_array))) {
155
          LOG_WARN("fail to insert miss ls stat", KR(ret), K_(tenant_id), K(miss_ls_stat_array));
156
        }
157
      }
158
    }
159

160
    // 3. store in cache
161
    if (FAILEDx(append(new_ls_stat_array, miss_ls_stat_array))) {
162
      LOG_WARN("fail to append ls stat array", KR(ret), K_(tenant_id), K(miss_ls_stat_array));
163
    } else {
164
      for (int64_t i = 0; OB_SUCC(ret) && i < new_ls_stat_array.count(); i++) {
165
        const ObBalanceGroupLSStat &ls_stat = new_ls_stat_array.at(i);
166
        Pair pair(ls_stat.get_ls_id(), ls_stat.get_tablet_group_count());
167
        if (OB_FAIL(cache_.push_back(pair))) {
168
          LOG_WARN("fail to push back pair", KR(ret), K_(tenant_id), K(ls_stat));
169
        }
170
      } // end for
171
      if (OB_FAIL(ret)) {
172
        (void) inner_reset_cache_();
173
      }
174
    }
175

176
    if (OB_SUCC(ret)) {
177
      loaded_timestamp_ = ObTimeUtility::current_time();
178
    }
179
  }
180
  LOG_INFO("[NON PARTITIONED TABLET CACHE] reload cache",
181
           KR(ret), K_(tenant_id), "cost", ObTimeUtility::current_time() - start_time);
182
  return ret;
183
}
184

185
int ObNonPartitionedTableTabletCache::alloc_tablet(
186
    const common::ObIArray<share::ObLSID> &avaliable_ls_ids,
187
    share::ObLSID &ls_id)
188
{
189
  int ret = OB_SUCCESS;
190
  ls_id.reset();
191
  lib::ObMutexGuard guard(mutex_);
192
  if (OB_UNLIKELY(avaliable_ls_ids.empty())) {
193
    ret = OB_INVALID_ARGUMENT;
194
    LOG_WARN("invalid arg", KR(ret), K(avaliable_ls_ids.count()));
195
  } else if (should_reload_cache_(avaliable_ls_ids)) {
196
    if (OB_FAIL(reload_cache_(avaliable_ls_ids))) {
197
      LOG_WARN("fail to reload cache", KR(ret), K_(tenant_id), K(avaliable_ls_ids));
198
    }
199
  }
200
  // find ls which has min tablet cnt
201
  if (OB_SUCC(ret)) {
202
    int64_t min_tablet_cnt = INT64_MAX;
203
    int64_t pos = OB_INVALID_INDEX;
204
    for (int64_t i = 0; OB_SUCC(ret) && i < cache_.count(); i++) {
205
      if (min_tablet_cnt > cache_.at(i).get_tablet_cnt()) {
206
        min_tablet_cnt = cache_.at(i).get_tablet_cnt();
207
        pos = i;
208
      }
209
    } // end for
210
    if (OB_UNLIKELY(OB_INVALID_INDEX == pos
211
        || pos >= cache_.count())) {
212
      ret = OB_ERR_UNEXPECTED;
213
      LOG_WARN("fail to find ls has min tablet cnt",
214
               KR(ret), K_(tenant_id), K(pos), K_(cache));
215
    } else {
216
      Pair &target_pair = cache_.at(pos);
217
      const int64_t tablet_cnt = target_pair.get_tablet_cnt() + 1;
218
      ls_id = target_pair.get_ls_id();
219
      target_pair.set_tablet_cnt(tablet_cnt);
220
    }
221
  }
222
  (void) dump_cache_();
223
  return ret;
224
}
225

226
void ObNonPartitionedTableTabletCache::dump_cache_()
227
{
228
  const int64_t DUMP_INTERVAL = 10 * 60 * 1000 * 1000L; // 10min
229
  const int64_t current_time = ObTimeUtility::current_time();
230
  if (current_time - dump_timestamp_ >= DUMP_INTERVAL) {
231
    LOG_INFO("[NON PARTITIONED TABLET CACHE] dump cache", K_(tenant_id),
232
             K_(loaded_timestamp), K_(dump_timestamp), K_(cache));
233
    dump_timestamp_ = current_time;
234
  }
235
}
236

237
ObNonPartitionedTableTabletAllocator::ObNonPartitionedTableTabletAllocator()
238
  : rwlock_(),
239
    allocator_(ObMemAttr(OB_SYS_TENANT_ID, "NonPartTenCac", ObCtxIds::SCHEMA_SERVICE)),
240
    tenant_cache_(),
241
    sql_proxy_(NULL),
242
    inited_(false)
243
{
244
}
245

246
ObNonPartitionedTableTabletAllocator::~ObNonPartitionedTableTabletAllocator()
247
{
248
  destroy();
249
}
250

251
int ObNonPartitionedTableTabletAllocator::init(common::ObMySQLProxy &sql_proxy)
252
{
253
  int ret = OB_SUCCESS;
254
  SpinWLockGuard guard(rwlock_);
255
  if (inited_) {
256
    ret = OB_INIT_TWICE;
257
    LOG_WARN("init twice", KR(ret));
258
  } else {
259
    const int64_t BUCKET_NUM = 1024;
260
    if (OB_FAIL(tenant_cache_.create(BUCKET_NUM, "NonPartTenMap", "NonPartTenMap"))) {
261
      LOG_WARN("fail to create hash map", KR(ret));
262
    } else {
263
      sql_proxy_ = &sql_proxy;
264
      inited_ = true;
265
    }
266
  }
267
  return ret;
268
}
269

270
void ObNonPartitionedTableTabletAllocator::destroy()
271
{
272
  SpinWLockGuard guard(rwlock_);
273
  if (inited_) {
274
    FOREACH(it, tenant_cache_) {
275
      if (OB_NOT_NULL(it->second)) {
276
        (it->second)->~ObNonPartitionedTableTabletCache();
277
        it->second = NULL;
278
      }
279
    }
280
    tenant_cache_.destroy();
281
    allocator_.reset();
282
    sql_proxy_ = NULL;
283
    inited_ = false;
284
  }
285
}
286

287
void ObNonPartitionedTableTabletAllocator::reset_all_cache()
288
{
289
  int ret = OB_SUCCESS;
290
  SpinRLockGuard guard(rwlock_);
291
  if (inited_) {
292
    FOREACH(it, tenant_cache_) {
293
      if (OB_NOT_NULL(it->second)) {
294
        (void) (it->second)->reset_cache();
295
      }
296
    }
297
  }
298
}
299

300
int ObNonPartitionedTableTabletAllocator::reset_cache(
301
    const uint64_t tenant_id)
302
{
303
  int ret = OB_SUCCESS;
304
  SpinRLockGuard guard(rwlock_);
305
  if (OB_UNLIKELY(!inited_)) {
306
    ret = OB_NOT_INIT;
307
    LOG_WARN("not init", KR(ret));
308
  } else {
309
    ObNonPartitionedTableTabletCache *cache = NULL;
310
    if (OB_FAIL(tenant_cache_.get_refactored(tenant_id, cache))) {
311
      if (OB_HASH_NOT_EXIST != ret) {
312
        LOG_WARN("fail to get refactored", KR(ret), K(tenant_id));
313
      } else {
314
        // tenant not in cache, just skip
315
        ret = OB_SUCCESS;
316
      }
317
    } else if (OB_ISNULL(cache)) {
318
      ret = OB_ERR_UNEXPECTED;
319
      LOG_WARN("cache is null", KR(ret), K(tenant_id));
320
    } else {
321
      (void) cache->reset_cache();
322
    }
323
  }
324
  return ret;
325
}
326

327
int ObNonPartitionedTableTabletAllocator::try_init_cache_(
328
    const uint64_t tenant_id)
329
{
330
  int ret = OB_SUCCESS;
331
  SpinWLockGuard guard(rwlock_);
332
  if (OB_UNLIKELY(!inited_)) {
333
    ret = OB_NOT_INIT;
334
    LOG_WARN("not init", KR(ret));
335
  } else if (OB_ISNULL(sql_proxy_)) {
336
    ret = OB_ERR_UNEXPECTED;
337
    LOG_WARN("sql_proxy is null", KR(ret));
338
  } else {
339
    ObNonPartitionedTableTabletCache *cache = NULL;
340
    if (OB_FAIL(tenant_cache_.get_refactored(tenant_id, cache))) {
341
      if (OB_HASH_NOT_EXIST != ret) {
342
        LOG_WARN("fail to get cache", KR(ret), K(tenant_id));
343
      } else {
344
        ret = OB_SUCCESS;
345
        cache = NULL;
346
        void *buf = NULL;
347
        if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObNonPartitionedTableTabletCache)))) {
348
          ret = OB_ALLOCATE_MEMORY_FAILED;
349
          LOG_WARN("fail to alloc memory", KR(ret));
350
        } else if (FALSE_IT(cache = new (buf) ObNonPartitionedTableTabletCache(tenant_id, *sql_proxy_))) {
351
        } else if (OB_FAIL(tenant_cache_.set_refactored(tenant_id, cache))) {
352
          LOG_WARN("fail to set cache", KR(ret), K(tenant_id));
353
        }
354
      }
355
    } else {
356
      // cache exist, just skip
357
    }
358
  }
359
  return ret;
360
}
361

362
int ObNonPartitionedTableTabletAllocator::alloc_tablet(
363
    const uint64_t tenant_id,
364
    const common::ObIArray<share::ObLSID> &avaliable_ls_ids,
365
    share::ObLSID &ls_id)
366
{
367
  int ret = OB_SUCCESS;
368
  ls_id.reset();
369
  if (OB_UNLIKELY(!inited_)) {
370
    ret = OB_NOT_INIT;
371
    LOG_WARN("not init", KR(ret));
372
  } else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
373
             || avaliable_ls_ids.empty())) {
374
    ret = OB_INVALID_ARGUMENT;
375
    LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(avaliable_ls_ids.count()));
376
  } else if (OB_FAIL(try_init_cache_(tenant_id))) {
377
    LOG_WARN("try to init cache failed", KR(ret), K(tenant_id));
378
  } else {
379
    {
380
      SpinRLockGuard guard(rwlock_);
381
      ObNonPartitionedTableTabletCache *cache = NULL;
382
      if (OB_FAIL(tenant_cache_.get_refactored(tenant_id, cache))) {
383
        LOG_WARN("fail to get refactored", KR(ret), K(tenant_id));
384
      } else if (OB_ISNULL(cache)) {
385
        ret = OB_ERR_UNEXPECTED;
386
        LOG_WARN("cache is null", KR(ret));
387
      } else if (OB_FAIL(cache->alloc_tablet(avaliable_ls_ids, ls_id))) {
388
        LOG_WARN("fail to alloc tablet", KR(ret), K(tenant_id));
389
      }
390
    }
391
    if (OB_SUCC(ret)) {
392
      // try update ls stat
393
      ObBalanceGroupLSStat ls_stat;
394
      const ObBalanceGroupID bg_id(0, 0); // for non-partitioned table
395
      const ObString bg_name(rootserver::ObBalanceGroup::NON_PART_BG_NAME);
396
      const int64_t inc_tablet_cnt = 1;
397
      const int64_t default_timeout = GCONF.internal_sql_execute_timeout;
398
      ObBalanceGroupLSStatOperator op;
399
      if (OB_FAIL(op.init(sql_proxy_))) {
400
        LOG_WARN("fail to init ObBalanceGroupLSStatOperator", KR(ret));
401
      } else if (OB_FAIL(ls_stat.build(tenant_id, bg_id, ls_id, inc_tablet_cnt, bg_name))) {
402
        LOG_WARN("fail to build ls_stat", KR(ret), K(tenant_id), K(ls_id));
403
      } else if (OB_FAIL(op.inc_balance_group_ls_stat(
404
                 default_timeout, *sql_proxy_, tenant_id, ls_stat))) {
405
        LOG_WARN("fail to inc ls stat", KR(ret), K(tenant_id), K(ls_stat));
406
      }
407
    }
408
  }
409
  return ret;
410
}
411

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.