oceanbase

Форк
0
/
ob_zone_unit_provider.cpp 
511 строк · 15.4 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14
#include "ob_zone_unit_provider.h"
15

16
using namespace oceanbase::common;
17
using namespace oceanbase::share;
18

19
namespace oceanbase
20
{
21
namespace rootserver
22
{
23

24
const ObUnitInfo *ObAliveZoneUnitAdaptor::at(int64_t idx) const
25
{
26
  ObUnitInfo *info = NULL;
27
  int ret = OB_SUCCESS;
28
  if (OB_UNLIKELY(NULL == zu_)) {
29
    ret = OB_NOT_INIT;
30
    LOG_ERROR("unexpected null zu_. bad code.");
31
  } else if (OB_UNLIKELY(idx < 0 || idx >= zu_->count())) {
32
    ret = OB_INVALID_ARGUMENT;
33
    LOG_ERROR("unexpected idx", K(idx), "count", zu_->count(), K(ret));
34
  } else {
35
    info = zu_->at(idx);
36
  }
37
  return info;
38
}
39

40
int64_t ObAliveZoneUnitAdaptor::count() const
41
{
42
  int64_t cnt = 0;
43
  if (OB_UNLIKELY(NULL == zu_)) {
44
    LOG_ERROR_RET(OB_ERR_UNEXPECTED, "unexpected null zu_");
45
  } else {
46
    cnt = zu_->count();
47
  }
48
  return cnt;
49
}
50

51
int ObAliveZoneUnitAdaptor::get_target_unit_idx(
52
    const int64_t unit_offset,
53
    common::hash::ObHashSet<int64_t> &unit_set,
54
    const bool is_primary_partition,
55
    int64_t &unit_idx) const
56
{
57
  int ret = OB_SUCCESS;
58
  UNUSED(is_primary_partition);
59
  if (count() <= 0) {
60
    ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
61
    LOG_WARN("no available unit to alloc replica", K(ret));
62
  } else {
63
    int64_t idx = unit_offset % count();
64
    const int64_t guard = idx;
65
    do {
66
      ret = unit_set.exist_refactored(at(idx)->unit_.unit_id_);
67
      if (OB_HASH_EXIST == ret) {
68
        idx++;
69
        idx %= count();
70
      }
71
    } while (OB_HASH_EXIST == ret && idx != guard);
72
    if (OB_HASH_NOT_EXIST == ret) {
73
      ret = OB_SUCCESS;
74
      unit_idx = idx;
75
    } else if (OB_HASH_EXIST == ret) {
76
      ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
77
    } else {
78
      LOG_WARN("fail to alloc replica", K(ret));
79
    }
80
  }
81
  return ret;
82
}
83

84
int ObAliveZoneUnitAdaptor::update_tg_pg_count(
85
    const int64_t unit_idx,
86
    const bool is_primary_partition)
87
{
88
  UNUSED(unit_idx);
89
  UNUSED(is_primary_partition);
90
  return OB_SUCCESS;
91
}
92

93
bool ObAliveZoneUnitsProvider::UnitSortOp::operator()(
94
     share::ObUnitInfo *left,
95
     share::ObUnitInfo *right)
96
{
97
  bool bool_ret = false;
98
  if (OB_UNLIKELY(common::OB_SUCCESS != ret_)) {
99
    // jump out
100
  } else if (OB_UNLIKELY(nullptr == left || nullptr == right)) {
101
    ret_ = common::OB_ERR_UNEXPECTED;
102
    LOG_WARN_RET(ret_, "left or right ptr is null", K(ret_), KP(left), KP(right));
103
  } else if (left->unit_.unit_id_ < right->unit_.unit_id_) {
104
    bool_ret = true;
105
  } else {
106
    bool_ret = false;
107
  }
108
  return bool_ret;
109
}
110

111
bool ObAliveZoneUnitsProvider::ZoneUnitSortOp::operator()(
112
     UnitPtrArray &left,
113
     UnitPtrArray &right)
114
{
115
  bool bool_ret = false;
116
  if (OB_UNLIKELY(common::OB_SUCCESS != ret_)) {
117
    // jump out
118
  } else if (left.count() <= 0 || right.count() <= 0) {
119
    ret_ = OB_ERR_UNEXPECTED;
120
    LOG_WARN_RET(ret_, "left or right unit array empty", K(ret_),
121
             "left_count", left.count(), "right_count", right.count());
122
  } else if (nullptr == left.at(0) || nullptr == right.at(0)) {
123
    ret_ = OB_ERR_UNEXPECTED;
124
    LOG_WARN_RET(ret_, "unit ptr is null", K(ret_), "left_ptr", left.at(0), "right_ptr", right.at(0));
125
  } else if (left.at(0)->unit_.zone_ < right.at(0)->unit_.zone_) {
126
    bool_ret = true;
127
  } else {
128
    bool_ret = false;
129
  }
130
  return bool_ret;
131
}
132

133
int ObAliveZoneUnitsProvider::init(
134
    const ZoneUnitPtrArray &all_zone_units)
135
{
136
  int ret = OB_SUCCESS;
137
  if (OB_UNLIKELY(inited_)) {
138
    ret = OB_INIT_TWICE;
139
    LOG_WARN("init twice", K(ret));
140
  } else {
141
    UnitPtrArray unit_ptr_array;
142
    for (int64_t i = 0; OB_SUCC(ret) && i < all_zone_units.count(); ++i) {
143
      const UnitPtrArray &unit_array = all_zone_units.at(i);
144
      unit_ptr_array.reuse();
145
      for (int64_t j = 0; OB_SUCC(ret) && j < unit_array.count(); ++j) {
146
        share::ObUnitInfo *unit_info = unit_array.at(j);
147
        if (OB_UNLIKELY(nullptr == unit_info)) {
148
          ret = OB_ERR_UNEXPECTED;
149
          LOG_WARN("unit info ptr is null", K(ret));
150
        } else if (OB_FAIL(unit_ptr_array.push_back(unit_info))) {
151
          LOG_WARN("fail to push back", K(ret));
152
        }
153
      }
154
      if (OB_SUCC(ret)) {
155
        UnitSortOp unit_sort_operator;
156
        std::sort(unit_ptr_array.begin(), unit_ptr_array.end(), unit_sort_operator);
157
        if (OB_FAIL(unit_sort_operator.get_ret())) {
158
          LOG_WARN("fail to sort unit in zone", K(ret));
159
        } else if (OB_FAIL(all_zone_unit_ptrs_.push_back(unit_ptr_array))) {
160
          LOG_WARN("fail to push back", K(ret));
161
        }
162
      }
163
    }
164
    if (OB_SUCC(ret)) {
165
      ZoneUnitSortOp zone_unit_sort_operator;
166
      std::sort(all_zone_unit_ptrs_.begin(), all_zone_unit_ptrs_.end(), zone_unit_sort_operator);
167
      if (OB_FAIL(zone_unit_sort_operator.get_ret())) {
168
        LOG_WARN("fail to sort zone unit", K(ret));
169
      }
170
    }
171
    if (OB_SUCC(ret)) {
172
      inited_ = true;
173
    }
174
  }
175
  LOG_INFO("alive zone unit provider init", K(ret), K(all_zone_unit_ptrs_), K(all_zone_units));
176
  return ret;
177
}
178

179
int ObAliveZoneUnitsProvider::prepare_for_next_partition(
180
    const common::hash::ObHashSet<int64_t> &unit_set)
181
{
182
  int ret = OB_SUCCESS;
183
  if (OB_UNLIKELY(!inited_)) {
184
    ret = OB_NOT_INIT;
185
    LOG_WARN("not init", K(ret));
186
  } else {
187
    available_zone_unit_ptrs_.reset();
188
    UnitPtrArray unit_ptr_array;
189
    for (int64_t i = 0; OB_SUCC(ret) && i < all_zone_unit_ptrs_.count(); ++i) {
190
      unit_ptr_array.reuse();
191
      const UnitPtrArray &this_unit_ptr_array = all_zone_unit_ptrs_.at(i);
192
      for (int64_t j = 0; OB_SUCC(ret) && j < this_unit_ptr_array.count(); ++j) {
193
        share::ObUnitInfo *unit_info = this_unit_ptr_array.at(j);
194
        if (OB_UNLIKELY(nullptr == unit_info)) {
195
          ret = OB_ERR_UNEXPECTED;
196
          LOG_WARN("unit info ptr is null", K(ret));
197
        } else {
198
          int tmp_ret = unit_set.exist_refactored(unit_info->unit_.unit_id_);
199
          if (OB_HASH_EXIST == tmp_ret) {
200
            // bypass
201
          } else if (OB_HASH_NOT_EXIST == tmp_ret) {
202
            if (OB_FAIL(unit_ptr_array.push_back(unit_info))) {
203
              LOG_WARN("fail to push back", K(ret));
204
            }
205
          } else {
206
            ret = OB_ERR_UNEXPECTED;
207
            LOG_WARN("check unit set exist failed", K(ret), K(tmp_ret));
208
          }
209
        }
210
      }
211
      if (OB_SUCC(ret) && unit_ptr_array.count() > 0) {
212
        if (OB_FAIL(available_zone_unit_ptrs_.push_back(unit_ptr_array))) {
213
          LOG_WARN("fail to push back", K(ret));
214
        }
215
      }
216
    }
217
  }
218
  LOG_INFO("units prepare for next partition", K(ret), K(available_zone_unit_ptrs_));
219
  return ret;
220
}
221

222
int ObAliveZoneUnitsProvider::get_all_zone_units(
223
    ZoneUnitArray& zone_unit) const
224
{
225
  UNUSED(zone_unit);
226
  return OB_NOT_IMPLEMENT;
227
}
228

229
int ObAliveZoneUnitsProvider::get_all_ptr_zone_units(
230
    ZoneUnitPtrArray& zone_unit_ptr) const
231
{
232
  return zone_unit_ptr.assign(all_zone_unit_ptrs_);
233
}
234

235
int ObAliveZoneUnitsProvider::find_zone(
236
    const common::ObZone &zone,
237
    const ObZoneUnitAdaptor *&zua)
238
{
239
  int ret = OB_SUCCESS;
240
  zua = NULL;
241
  if (OB_UNLIKELY(!inited_)) {
242
    ret = OB_NOT_INIT;
243
    LOG_WARN("not init", K(ret));
244
  } else {
245
    FOREACH_CNT_X(zu, available_zone_unit_ptrs_, OB_SUCCESS == ret) {
246
      if (zu->count() <= 0) {
247
        ret = OB_ERR_UNEXPECTED;
248
        LOG_WARN("invalid zone unit count. should not be zero.", K(ret));
249
      } else if (zu->at(0)->unit_.zone_ == zone) {
250
        zone_unit_adaptor_.set_zone_unit(zu);
251
        zua = &zone_unit_adaptor_;
252
        break;
253
      }
254
    }
255
  }
256
  return ret;
257
}
258

259
const ObUnitInfo *ObAllZoneUnitAdaptor::at(int64_t idx) const
260
{
261
  ObUnitInfo *info = NULL;
262
  int ret = OB_SUCCESS;
263
  if (OB_UNLIKELY(NULL == all_unit_)) {
264
    ret = OB_NOT_INIT;
265
    LOG_ERROR("unexpected null all_unit_. bad code.");
266
  } else if (OB_UNLIKELY(idx < 0 || idx >= all_unit_->count())) {
267
    ret = OB_INVALID_ARGUMENT;
268
    LOG_ERROR("unexpected idx", K(idx), "count", all_unit_->count(), K(ret));
269
  } else {
270
    info = &all_unit_->at(idx)->info_;
271
  }
272
  return info;
273
}
274

275
int64_t ObAllZoneUnitAdaptor::count() const
276
{
277
  int64_t cnt = 0;
278
  if (OB_UNLIKELY(NULL == all_unit_)) {
279
    LOG_ERROR_RET(OB_ERR_UNEXPECTED, "unexpected null all_unit_");
280
  } else {
281
    cnt = all_unit_->count();
282
  }
283
  return cnt;
284
}
285

286
int ObAllZoneUnitAdaptor::get_target_unit_idx(
287
    const int64_t unit_offset,
288
    common::hash::ObHashSet<int64_t> &unit_set,
289
    const bool is_primary_partition,
290
    int64_t &unit_idx) const
291
{
292
  int ret = OB_SUCCESS;
293
  if (OB_UNLIKELY(nullptr == all_unit_)) {
294
    ret = OB_NOT_INIT;
295
    LOG_ERROR("unexpected null all_unit_. bad code", K(ret));
296
  } else if (all_unit_->count() <= 0) {
297
    ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
298
    LOG_WARN("no available unit to alloc replica, may be migrate blocked", K(ret));
299
  } else if (is_primary_partition) {
300
    unit_idx = -1;
301
    const int64_t start = unit_offset % all_unit_->count();
302
    const int64_t end = start + all_unit_->count();
303
    for (int64_t i = start; OB_SUCC(ret) && i < end; ++i) {
304
      const int64_t idx = i % all_unit_->count();
305
      const UnitStat *this_unit = all_unit_->at(idx);
306
      if (OB_UNLIKELY(nullptr == this_unit)) {
307
        ret = OB_ERR_UNEXPECTED;
308
        LOG_WARN("unit ptr is null", K(ret));
309
      } else if (nullptr == this_unit->server_) {
310
        ret = OB_ERR_UNEXPECTED;
311
        LOG_WARN("this server ptr is null", K(ret));
312
      } else if (!this_unit->server_->active_
313
          || !this_unit->server_->online_
314
          || this_unit->server_->blocked_) {
315
        // bypass, since server not available
316
      } else if (OB_HASH_EXIST == unit_set.exist_refactored(this_unit->info_.unit_.unit_id_)) {
317
        // by pass
318
      } else if (-1 == unit_idx) {
319
        unit_idx = idx;
320
      } else if (all_unit_->at(unit_idx)->tg_pg_cnt_ > this_unit->tg_pg_cnt_) {
321
        unit_idx = idx;
322
      }
323
    }
324
    if (OB_SUCC(ret) && -1 == unit_idx) {
325
      ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
326
    } 
327
  } else {
328
    int64_t idx = unit_offset % all_unit_->count();
329
    const int64_t guard = idx;
330
    do {
331
      ret = unit_set.exist_refactored(at(idx)->unit_.unit_id_);
332
      if (OB_HASH_EXIST == ret) {
333
        idx++;
334
        idx %= count();
335
      }
336
    } while (OB_HASH_EXIST == ret && idx != guard);
337
    if (OB_HASH_NOT_EXIST == ret) {
338
      ret = OB_SUCCESS;
339
      unit_idx = idx;
340
    } else if (OB_HASH_EXIST == ret) {
341
      ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
342
    } else {
343
      LOG_WARN("fail to alloc replica", K(ret));
344
    }
345
  }
346
  return ret;
347
}
348

349
int ObAllZoneUnitAdaptor::update_tg_pg_count(
350
    const int64_t unit_idx,
351
    const bool is_primary_partition)
352
{
353
  int ret = OB_SUCCESS;
354
  if (unit_idx >= count()) {
355
    ret = OB_INVALID_ARGUMENT;
356
    LOG_WARN("invalid argument", K(ret), K(unit_idx));
357
  } else if (OB_UNLIKELY(nullptr == all_unit_)) {
358
    ret = OB_NOT_INIT;
359
    LOG_ERROR("unexpected null all_unit_. bad code", K(ret));
360
  } else if (is_primary_partition) {
361
    UnitStat *this_unit = const_cast<UnitStat *>(all_unit_->at(unit_idx));
362
    if (OB_UNLIKELY(nullptr == this_unit)) {
363
      ret = OB_ERR_UNEXPECTED;
364
      LOG_WARN("unit ptr is null", K(ret));
365
    } else {
366
      ++this_unit->tg_pg_cnt_;
367
    }
368
  } else {
369
    // not primary partition, no need to update tg pg cnt
370
  }
371
  return ret;
372
}
373

374
bool ObZoneLogonlyUnitProvider::exist(const ObZone &zone, const uint64_t unit_id) const
375
{
376
  bool bret = false;
377
  FOREACH_CNT(zu, all_zone_units_) {
378
    if (zu->zone_ == zone) {
379
      FOREACH_CNT(us, zu->all_unit_) {
380
        if ((*us)->info_.unit_.replica_type_ == REPLICA_TYPE_LOGONLY
381
            && (*us)->info_.unit_.unit_id_ == unit_id) {
382
          bret = true;
383
          break;
384
        }
385
      }
386
    }
387
  }
388
  return bret;
389
}
390

391
int ObZoneLogonlyUnitProvider::get_all_zone_units(ZoneUnitArray& zone_unit) const
392
{
393
  return zone_unit.assign(all_zone_units_);
394
}
395

396
int ObZoneLogonlyUnitProvider::get_all_ptr_zone_units(ZoneUnitPtrArray& zone_unit) const
397
{
398
  UNUSED(zone_unit);
399
  return OB_NOT_IMPLEMENT;
400
}
401

402
int ObZoneLogonlyUnitProvider::find_zone(const common::ObZone &zone,
403
                                      const ObZoneUnitAdaptor *&zua)
404
{
405
  int ret = OB_SUCCESS;
406
  zua = NULL;
407
  FOREACH_CNT_X(zu, all_zone_units_, OB_SUCCESS == ret) {
408
    if (zu->zone_ == zone) {
409
      // construct atemporay ZoneUnit
410
      all_unit_.reuse();
411
      FOREACH_CNT_X(us, zu->all_unit_, OB_SUCC(ret)) {
412
        const ServerStat *server = (*us)->server_;
413
        if (OB_ISNULL(server)) {
414
          ret = OB_ERR_UNEXPECTED;
415
        } else if (!server->can_migrate_in()) {
416
          // ignore
417
        } else if (REPLICA_TYPE_LOGONLY != (*us)->info_.unit_.replica_type_) {
418
          //nothing todo
419
        } else if (OB_FAIL(all_unit_.push_back(const_cast<UnitStat *>(*us)))) {
420
            LOG_WARN("fail add alive unit to zone_unit", K(ret));
421
        }
422
      }
423
      zone_unit_adaptor_.set_zone_unit(&all_unit_);
424
      zua = &zone_unit_adaptor_;
425
      break;
426
    }
427
  }
428
  return ret;
429
}
430

431
int ObZoneUnitsWithoutLogonlyProvider::get_all_zone_units(ZoneUnitArray& zone_unit) const
432
{
433
  return zone_unit.assign(all_zone_units_);
434
}
435

436
int ObZoneUnitsWithoutLogonlyProvider::get_all_ptr_zone_units(ZoneUnitPtrArray& zone_unit) const
437
{
438
  UNUSED(zone_unit);
439
  return OB_NOT_IMPLEMENT;
440
}
441

442
int ObZoneUnitsWithoutLogonlyProvider::find_zone(const common::ObZone &zone,
443
                                      const ObZoneUnitAdaptor *&zua)
444
{
445
  int ret = OB_SUCCESS;
446
  zua = NULL;
447
  FOREACH_CNT_X(zu, all_zone_units_, OB_SUCCESS == ret) {
448
    if (zu->zone_ == zone) {
449
      // construct a temporary ZoneUnit
450
      all_unit_.reuse();
451
      FOREACH_CNT_X(us, zu->all_unit_, OB_SUCC(ret)) {
452
        const ServerStat *server = (*us)->server_;
453
        if (OB_ISNULL(server)) {
454
          ret = OB_ERR_UNEXPECTED;
455
        } else if (!server->can_migrate_in()) {
456
          // ignore
457
        } else if (REPLICA_TYPE_LOGONLY == (*us)->info_.unit_.replica_type_) {
458
          //nothing todo
459
        } else if (OB_FAIL(all_unit_.push_back(const_cast<UnitStat *>(*us)))) {
460
            LOG_WARN("fail add alive unit to zone_unit", K(ret));
461
        }
462
      }
463
      zone_unit_adaptor_.set_zone_unit(&all_unit_);
464
      zua = &zone_unit_adaptor_;
465
      break;
466
    }
467
  }
468
  return ret;
469
}
470

471
int ObAllZoneUnitsProvider::get_all_zone_units(ZoneUnitArray& zone_unit) const
472
{
473
  return zone_unit.assign(all_zone_units_);
474
}
475

476
int ObAllZoneUnitsProvider::get_all_ptr_zone_units(ZoneUnitPtrArray& zone_unit) const
477
{
478
  UNUSED(zone_unit);
479
  return OB_NOT_IMPLEMENT;
480
}
481

482
int ObAllZoneUnitsProvider::find_zone(const common::ObZone &zone,
483
                                      const ObZoneUnitAdaptor *&zua)
484
{
485
  int ret = OB_SUCCESS;
486
  zua = NULL;
487
  FOREACH_CNT_X(zu, all_zone_units_, OB_SUCCESS == ret) {
488
    if (zu->zone_ == zone) {
489
      // construct a temporary ZoneUnit
490
      all_unit_.reuse();
491
      FOREACH_CNT_X(us, zu->all_unit_, OB_SUCC(ret)) {
492
        const ServerStat *server = (*us)->server_;
493
        if (OB_ISNULL(server)) {
494
          ret = OB_ERR_UNEXPECTED;
495
        } else if (!server->can_migrate_in()) {
496
          // ignore
497
        } else if (OB_FAIL(all_unit_.push_back(const_cast<UnitStat *>(*us)))) {
498
            LOG_WARN("fail add alive unit to zone_unit", K(ret));
499
        }
500
      }
501
      zone_unit_adaptor_.set_zone_unit(&all_unit_);
502
      zua = &zone_unit_adaptor_;
503
      break;
504
    }
505
  }
506
  return ret;
507
}
508

509

510
}/* ns rootserver*/
511
}/* ns oceanbase */
512

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.