oceanbase
511 строк · 15.4 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14#include "ob_zone_unit_provider.h"
15
16using namespace oceanbase::common;
17using namespace oceanbase::share;
18
19namespace oceanbase
20{
21namespace rootserver
22{
23
24const ObUnitInfo *ObAliveZoneUnitAdaptor::at(int64_t idx) const
25{
26ObUnitInfo *info = NULL;
27int ret = OB_SUCCESS;
28if (OB_UNLIKELY(NULL == zu_)) {
29ret = OB_NOT_INIT;
30LOG_ERROR("unexpected null zu_. bad code.");
31} else if (OB_UNLIKELY(idx < 0 || idx >= zu_->count())) {
32ret = OB_INVALID_ARGUMENT;
33LOG_ERROR("unexpected idx", K(idx), "count", zu_->count(), K(ret));
34} else {
35info = zu_->at(idx);
36}
37return info;
38}
39
40int64_t ObAliveZoneUnitAdaptor::count() const
41{
42int64_t cnt = 0;
43if (OB_UNLIKELY(NULL == zu_)) {
44LOG_ERROR_RET(OB_ERR_UNEXPECTED, "unexpected null zu_");
45} else {
46cnt = zu_->count();
47}
48return cnt;
49}
50
51int ObAliveZoneUnitAdaptor::get_target_unit_idx(
52const int64_t unit_offset,
53common::hash::ObHashSet<int64_t> &unit_set,
54const bool is_primary_partition,
55int64_t &unit_idx) const
56{
57int ret = OB_SUCCESS;
58UNUSED(is_primary_partition);
59if (count() <= 0) {
60ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
61LOG_WARN("no available unit to alloc replica", K(ret));
62} else {
63int64_t idx = unit_offset % count();
64const int64_t guard = idx;
65do {
66ret = unit_set.exist_refactored(at(idx)->unit_.unit_id_);
67if (OB_HASH_EXIST == ret) {
68idx++;
69idx %= count();
70}
71} while (OB_HASH_EXIST == ret && idx != guard);
72if (OB_HASH_NOT_EXIST == ret) {
73ret = OB_SUCCESS;
74unit_idx = idx;
75} else if (OB_HASH_EXIST == ret) {
76ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
77} else {
78LOG_WARN("fail to alloc replica", K(ret));
79}
80}
81return ret;
82}
83
84int ObAliveZoneUnitAdaptor::update_tg_pg_count(
85const int64_t unit_idx,
86const bool is_primary_partition)
87{
88UNUSED(unit_idx);
89UNUSED(is_primary_partition);
90return OB_SUCCESS;
91}
92
93bool ObAliveZoneUnitsProvider::UnitSortOp::operator()(
94share::ObUnitInfo *left,
95share::ObUnitInfo *right)
96{
97bool bool_ret = false;
98if (OB_UNLIKELY(common::OB_SUCCESS != ret_)) {
99// jump out
100} else if (OB_UNLIKELY(nullptr == left || nullptr == right)) {
101ret_ = common::OB_ERR_UNEXPECTED;
102LOG_WARN_RET(ret_, "left or right ptr is null", K(ret_), KP(left), KP(right));
103} else if (left->unit_.unit_id_ < right->unit_.unit_id_) {
104bool_ret = true;
105} else {
106bool_ret = false;
107}
108return bool_ret;
109}
110
111bool ObAliveZoneUnitsProvider::ZoneUnitSortOp::operator()(
112UnitPtrArray &left,
113UnitPtrArray &right)
114{
115bool bool_ret = false;
116if (OB_UNLIKELY(common::OB_SUCCESS != ret_)) {
117// jump out
118} else if (left.count() <= 0 || right.count() <= 0) {
119ret_ = OB_ERR_UNEXPECTED;
120LOG_WARN_RET(ret_, "left or right unit array empty", K(ret_),
121"left_count", left.count(), "right_count", right.count());
122} else if (nullptr == left.at(0) || nullptr == right.at(0)) {
123ret_ = OB_ERR_UNEXPECTED;
124LOG_WARN_RET(ret_, "unit ptr is null", K(ret_), "left_ptr", left.at(0), "right_ptr", right.at(0));
125} else if (left.at(0)->unit_.zone_ < right.at(0)->unit_.zone_) {
126bool_ret = true;
127} else {
128bool_ret = false;
129}
130return bool_ret;
131}
132
133int ObAliveZoneUnitsProvider::init(
134const ZoneUnitPtrArray &all_zone_units)
135{
136int ret = OB_SUCCESS;
137if (OB_UNLIKELY(inited_)) {
138ret = OB_INIT_TWICE;
139LOG_WARN("init twice", K(ret));
140} else {
141UnitPtrArray unit_ptr_array;
142for (int64_t i = 0; OB_SUCC(ret) && i < all_zone_units.count(); ++i) {
143const UnitPtrArray &unit_array = all_zone_units.at(i);
144unit_ptr_array.reuse();
145for (int64_t j = 0; OB_SUCC(ret) && j < unit_array.count(); ++j) {
146share::ObUnitInfo *unit_info = unit_array.at(j);
147if (OB_UNLIKELY(nullptr == unit_info)) {
148ret = OB_ERR_UNEXPECTED;
149LOG_WARN("unit info ptr is null", K(ret));
150} else if (OB_FAIL(unit_ptr_array.push_back(unit_info))) {
151LOG_WARN("fail to push back", K(ret));
152}
153}
154if (OB_SUCC(ret)) {
155UnitSortOp unit_sort_operator;
156std::sort(unit_ptr_array.begin(), unit_ptr_array.end(), unit_sort_operator);
157if (OB_FAIL(unit_sort_operator.get_ret())) {
158LOG_WARN("fail to sort unit in zone", K(ret));
159} else if (OB_FAIL(all_zone_unit_ptrs_.push_back(unit_ptr_array))) {
160LOG_WARN("fail to push back", K(ret));
161}
162}
163}
164if (OB_SUCC(ret)) {
165ZoneUnitSortOp zone_unit_sort_operator;
166std::sort(all_zone_unit_ptrs_.begin(), all_zone_unit_ptrs_.end(), zone_unit_sort_operator);
167if (OB_FAIL(zone_unit_sort_operator.get_ret())) {
168LOG_WARN("fail to sort zone unit", K(ret));
169}
170}
171if (OB_SUCC(ret)) {
172inited_ = true;
173}
174}
175LOG_INFO("alive zone unit provider init", K(ret), K(all_zone_unit_ptrs_), K(all_zone_units));
176return ret;
177}
178
179int ObAliveZoneUnitsProvider::prepare_for_next_partition(
180const common::hash::ObHashSet<int64_t> &unit_set)
181{
182int ret = OB_SUCCESS;
183if (OB_UNLIKELY(!inited_)) {
184ret = OB_NOT_INIT;
185LOG_WARN("not init", K(ret));
186} else {
187available_zone_unit_ptrs_.reset();
188UnitPtrArray unit_ptr_array;
189for (int64_t i = 0; OB_SUCC(ret) && i < all_zone_unit_ptrs_.count(); ++i) {
190unit_ptr_array.reuse();
191const UnitPtrArray &this_unit_ptr_array = all_zone_unit_ptrs_.at(i);
192for (int64_t j = 0; OB_SUCC(ret) && j < this_unit_ptr_array.count(); ++j) {
193share::ObUnitInfo *unit_info = this_unit_ptr_array.at(j);
194if (OB_UNLIKELY(nullptr == unit_info)) {
195ret = OB_ERR_UNEXPECTED;
196LOG_WARN("unit info ptr is null", K(ret));
197} else {
198int tmp_ret = unit_set.exist_refactored(unit_info->unit_.unit_id_);
199if (OB_HASH_EXIST == tmp_ret) {
200// bypass
201} else if (OB_HASH_NOT_EXIST == tmp_ret) {
202if (OB_FAIL(unit_ptr_array.push_back(unit_info))) {
203LOG_WARN("fail to push back", K(ret));
204}
205} else {
206ret = OB_ERR_UNEXPECTED;
207LOG_WARN("check unit set exist failed", K(ret), K(tmp_ret));
208}
209}
210}
211if (OB_SUCC(ret) && unit_ptr_array.count() > 0) {
212if (OB_FAIL(available_zone_unit_ptrs_.push_back(unit_ptr_array))) {
213LOG_WARN("fail to push back", K(ret));
214}
215}
216}
217}
218LOG_INFO("units prepare for next partition", K(ret), K(available_zone_unit_ptrs_));
219return ret;
220}
221
222int ObAliveZoneUnitsProvider::get_all_zone_units(
223ZoneUnitArray& zone_unit) const
224{
225UNUSED(zone_unit);
226return OB_NOT_IMPLEMENT;
227}
228
229int ObAliveZoneUnitsProvider::get_all_ptr_zone_units(
230ZoneUnitPtrArray& zone_unit_ptr) const
231{
232return zone_unit_ptr.assign(all_zone_unit_ptrs_);
233}
234
235int ObAliveZoneUnitsProvider::find_zone(
236const common::ObZone &zone,
237const ObZoneUnitAdaptor *&zua)
238{
239int ret = OB_SUCCESS;
240zua = NULL;
241if (OB_UNLIKELY(!inited_)) {
242ret = OB_NOT_INIT;
243LOG_WARN("not init", K(ret));
244} else {
245FOREACH_CNT_X(zu, available_zone_unit_ptrs_, OB_SUCCESS == ret) {
246if (zu->count() <= 0) {
247ret = OB_ERR_UNEXPECTED;
248LOG_WARN("invalid zone unit count. should not be zero.", K(ret));
249} else if (zu->at(0)->unit_.zone_ == zone) {
250zone_unit_adaptor_.set_zone_unit(zu);
251zua = &zone_unit_adaptor_;
252break;
253}
254}
255}
256return ret;
257}
258
259const ObUnitInfo *ObAllZoneUnitAdaptor::at(int64_t idx) const
260{
261ObUnitInfo *info = NULL;
262int ret = OB_SUCCESS;
263if (OB_UNLIKELY(NULL == all_unit_)) {
264ret = OB_NOT_INIT;
265LOG_ERROR("unexpected null all_unit_. bad code.");
266} else if (OB_UNLIKELY(idx < 0 || idx >= all_unit_->count())) {
267ret = OB_INVALID_ARGUMENT;
268LOG_ERROR("unexpected idx", K(idx), "count", all_unit_->count(), K(ret));
269} else {
270info = &all_unit_->at(idx)->info_;
271}
272return info;
273}
274
275int64_t ObAllZoneUnitAdaptor::count() const
276{
277int64_t cnt = 0;
278if (OB_UNLIKELY(NULL == all_unit_)) {
279LOG_ERROR_RET(OB_ERR_UNEXPECTED, "unexpected null all_unit_");
280} else {
281cnt = all_unit_->count();
282}
283return cnt;
284}
285
286int ObAllZoneUnitAdaptor::get_target_unit_idx(
287const int64_t unit_offset,
288common::hash::ObHashSet<int64_t> &unit_set,
289const bool is_primary_partition,
290int64_t &unit_idx) const
291{
292int ret = OB_SUCCESS;
293if (OB_UNLIKELY(nullptr == all_unit_)) {
294ret = OB_NOT_INIT;
295LOG_ERROR("unexpected null all_unit_. bad code", K(ret));
296} else if (all_unit_->count() <= 0) {
297ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
298LOG_WARN("no available unit to alloc replica, may be migrate blocked", K(ret));
299} else if (is_primary_partition) {
300unit_idx = -1;
301const int64_t start = unit_offset % all_unit_->count();
302const int64_t end = start + all_unit_->count();
303for (int64_t i = start; OB_SUCC(ret) && i < end; ++i) {
304const int64_t idx = i % all_unit_->count();
305const UnitStat *this_unit = all_unit_->at(idx);
306if (OB_UNLIKELY(nullptr == this_unit)) {
307ret = OB_ERR_UNEXPECTED;
308LOG_WARN("unit ptr is null", K(ret));
309} else if (nullptr == this_unit->server_) {
310ret = OB_ERR_UNEXPECTED;
311LOG_WARN("this server ptr is null", K(ret));
312} else if (!this_unit->server_->active_
313|| !this_unit->server_->online_
314|| this_unit->server_->blocked_) {
315// bypass, since server not available
316} else if (OB_HASH_EXIST == unit_set.exist_refactored(this_unit->info_.unit_.unit_id_)) {
317// by pass
318} else if (-1 == unit_idx) {
319unit_idx = idx;
320} else if (all_unit_->at(unit_idx)->tg_pg_cnt_ > this_unit->tg_pg_cnt_) {
321unit_idx = idx;
322}
323}
324if (OB_SUCC(ret) && -1 == unit_idx) {
325ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
326}
327} else {
328int64_t idx = unit_offset % all_unit_->count();
329const int64_t guard = idx;
330do {
331ret = unit_set.exist_refactored(at(idx)->unit_.unit_id_);
332if (OB_HASH_EXIST == ret) {
333idx++;
334idx %= count();
335}
336} while (OB_HASH_EXIST == ret && idx != guard);
337if (OB_HASH_NOT_EXIST == ret) {
338ret = OB_SUCCESS;
339unit_idx = idx;
340} else if (OB_HASH_EXIST == ret) {
341ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
342} else {
343LOG_WARN("fail to alloc replica", K(ret));
344}
345}
346return ret;
347}
348
349int ObAllZoneUnitAdaptor::update_tg_pg_count(
350const int64_t unit_idx,
351const bool is_primary_partition)
352{
353int ret = OB_SUCCESS;
354if (unit_idx >= count()) {
355ret = OB_INVALID_ARGUMENT;
356LOG_WARN("invalid argument", K(ret), K(unit_idx));
357} else if (OB_UNLIKELY(nullptr == all_unit_)) {
358ret = OB_NOT_INIT;
359LOG_ERROR("unexpected null all_unit_. bad code", K(ret));
360} else if (is_primary_partition) {
361UnitStat *this_unit = const_cast<UnitStat *>(all_unit_->at(unit_idx));
362if (OB_UNLIKELY(nullptr == this_unit)) {
363ret = OB_ERR_UNEXPECTED;
364LOG_WARN("unit ptr is null", K(ret));
365} else {
366++this_unit->tg_pg_cnt_;
367}
368} else {
369// not primary partition, no need to update tg pg cnt
370}
371return ret;
372}
373
374bool ObZoneLogonlyUnitProvider::exist(const ObZone &zone, const uint64_t unit_id) const
375{
376bool bret = false;
377FOREACH_CNT(zu, all_zone_units_) {
378if (zu->zone_ == zone) {
379FOREACH_CNT(us, zu->all_unit_) {
380if ((*us)->info_.unit_.replica_type_ == REPLICA_TYPE_LOGONLY
381&& (*us)->info_.unit_.unit_id_ == unit_id) {
382bret = true;
383break;
384}
385}
386}
387}
388return bret;
389}
390
391int ObZoneLogonlyUnitProvider::get_all_zone_units(ZoneUnitArray& zone_unit) const
392{
393return zone_unit.assign(all_zone_units_);
394}
395
396int ObZoneLogonlyUnitProvider::get_all_ptr_zone_units(ZoneUnitPtrArray& zone_unit) const
397{
398UNUSED(zone_unit);
399return OB_NOT_IMPLEMENT;
400}
401
402int ObZoneLogonlyUnitProvider::find_zone(const common::ObZone &zone,
403const ObZoneUnitAdaptor *&zua)
404{
405int ret = OB_SUCCESS;
406zua = NULL;
407FOREACH_CNT_X(zu, all_zone_units_, OB_SUCCESS == ret) {
408if (zu->zone_ == zone) {
409// construct atemporay ZoneUnit
410all_unit_.reuse();
411FOREACH_CNT_X(us, zu->all_unit_, OB_SUCC(ret)) {
412const ServerStat *server = (*us)->server_;
413if (OB_ISNULL(server)) {
414ret = OB_ERR_UNEXPECTED;
415} else if (!server->can_migrate_in()) {
416// ignore
417} else if (REPLICA_TYPE_LOGONLY != (*us)->info_.unit_.replica_type_) {
418//nothing todo
419} else if (OB_FAIL(all_unit_.push_back(const_cast<UnitStat *>(*us)))) {
420LOG_WARN("fail add alive unit to zone_unit", K(ret));
421}
422}
423zone_unit_adaptor_.set_zone_unit(&all_unit_);
424zua = &zone_unit_adaptor_;
425break;
426}
427}
428return ret;
429}
430
431int ObZoneUnitsWithoutLogonlyProvider::get_all_zone_units(ZoneUnitArray& zone_unit) const
432{
433return zone_unit.assign(all_zone_units_);
434}
435
436int ObZoneUnitsWithoutLogonlyProvider::get_all_ptr_zone_units(ZoneUnitPtrArray& zone_unit) const
437{
438UNUSED(zone_unit);
439return OB_NOT_IMPLEMENT;
440}
441
442int ObZoneUnitsWithoutLogonlyProvider::find_zone(const common::ObZone &zone,
443const ObZoneUnitAdaptor *&zua)
444{
445int ret = OB_SUCCESS;
446zua = NULL;
447FOREACH_CNT_X(zu, all_zone_units_, OB_SUCCESS == ret) {
448if (zu->zone_ == zone) {
449// construct a temporary ZoneUnit
450all_unit_.reuse();
451FOREACH_CNT_X(us, zu->all_unit_, OB_SUCC(ret)) {
452const ServerStat *server = (*us)->server_;
453if (OB_ISNULL(server)) {
454ret = OB_ERR_UNEXPECTED;
455} else if (!server->can_migrate_in()) {
456// ignore
457} else if (REPLICA_TYPE_LOGONLY == (*us)->info_.unit_.replica_type_) {
458//nothing todo
459} else if (OB_FAIL(all_unit_.push_back(const_cast<UnitStat *>(*us)))) {
460LOG_WARN("fail add alive unit to zone_unit", K(ret));
461}
462}
463zone_unit_adaptor_.set_zone_unit(&all_unit_);
464zua = &zone_unit_adaptor_;
465break;
466}
467}
468return ret;
469}
470
471int ObAllZoneUnitsProvider::get_all_zone_units(ZoneUnitArray& zone_unit) const
472{
473return zone_unit.assign(all_zone_units_);
474}
475
476int ObAllZoneUnitsProvider::get_all_ptr_zone_units(ZoneUnitPtrArray& zone_unit) const
477{
478UNUSED(zone_unit);
479return OB_NOT_IMPLEMENT;
480}
481
482int ObAllZoneUnitsProvider::find_zone(const common::ObZone &zone,
483const ObZoneUnitAdaptor *&zua)
484{
485int ret = OB_SUCCESS;
486zua = NULL;
487FOREACH_CNT_X(zu, all_zone_units_, OB_SUCCESS == ret) {
488if (zu->zone_ == zone) {
489// construct a temporary ZoneUnit
490all_unit_.reuse();
491FOREACH_CNT_X(us, zu->all_unit_, OB_SUCC(ret)) {
492const ServerStat *server = (*us)->server_;
493if (OB_ISNULL(server)) {
494ret = OB_ERR_UNEXPECTED;
495} else if (!server->can_migrate_in()) {
496// ignore
497} else if (OB_FAIL(all_unit_.push_back(const_cast<UnitStat *>(*us)))) {
498LOG_WARN("fail add alive unit to zone_unit", K(ret));
499}
500}
501zone_unit_adaptor_.set_zone_unit(&all_unit_);
502zua = &zone_unit_adaptor_;
503break;
504}
505}
506return ret;
507}
508
509
510}/* ns rootserver*/
511}/* ns oceanbase */
512