oceanbase

Форк
0
/
ob_migrate_unit_finish_checker.cpp 
293 строки · 10.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14
#include "ob_migrate_unit_finish_checker.h"
15
#include "lib/container/ob_array.h"
16
#include "lib/container/ob_array_iterator.h"
17
#include "lib/container/ob_se_array.h"
18
#include "lib/container/ob_se_array_iterator.h"
19
#include "share/ls/ob_ls_status_operator.h"
20
#include "share/ls/ob_ls_table_operator.h"
21
#include "ob_unit_manager.h"
22
#include "ob_zone_manager.h"
23

24
using namespace oceanbase::common;
25
using namespace oceanbase::share;
26
using namespace oceanbase::rootserver;
27

28
ObMigrateUnitFinishChecker::ObMigrateUnitFinishChecker(volatile bool &stop)
29
    : inited_(false),
30
      unit_mgr_(nullptr),
31
      zone_mgr_(nullptr),
32
      schema_service_(nullptr),
33
      sql_proxy_(nullptr),
34
      lst_operator_(nullptr),
35
      stop_(stop)
36
{
37
}
38

39
ObMigrateUnitFinishChecker::~ObMigrateUnitFinishChecker()
40
{
41
}
42

43
int ObMigrateUnitFinishChecker::check_stop() const
44
{
45
  int ret = OB_SUCCESS;
46
  if (stop_) {
47
    ret = OB_CANCELED;
48
    LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret), K(stop_));
49
  }
50
  return ret;
51
}
52

53
int ObMigrateUnitFinishChecker::init(
54
    ObUnitManager &unit_mgr,
55
    ObZoneManager &zone_mgr,
56
    share::schema::ObMultiVersionSchemaService &schema_service,
57
    common::ObMySQLProxy &sql_proxy,
58
    share::ObLSTableOperator &lst_operator)
59
{
60
  int ret = OB_SUCCESS;
61
  if (OB_UNLIKELY(inited_)) {
62
    ret = OB_INIT_TWICE;
63
    LOG_WARN("init twice", KR(ret));
64
  } else {
65
    unit_mgr_ = &unit_mgr;
66
    zone_mgr_ = &zone_mgr;
67
    schema_service_ = &schema_service;
68
    sql_proxy_ = &sql_proxy;
69
    lst_operator_ = &lst_operator;
70
    inited_ = true;
71
  }
72
  return ret;
73
}
74

75
int ObMigrateUnitFinishChecker::check()
76
{
77
  int ret = OB_SUCCESS;
78
  int tmp_ret = OB_SUCCESS;
79
  LOG_INFO("start check unit migrate finish");
80
  ObArray<uint64_t> tenant_id_array;
81
  if (OB_UNLIKELY(!inited_)) {
82
    ret = OB_NOT_INIT;
83
    LOG_WARN("not init", K(ret));
84
  } else if (OB_FAIL(check_stop())) {
85
    LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret));
86
  } else if (OB_UNLIKELY(ObTenantUtils::get_tenant_ids(schema_service_, tenant_id_array))) {
87
    LOG_WARN("fail to get tenant id array", KR(ret));
88
  } else {
89
    for (int64_t i = 0; OB_SUCC(ret) && i < tenant_id_array.count(); ++i) {
90
      const uint64_t tenant_id = tenant_id_array.at(i);
91
      if (is_meta_tenant(tenant_id)) {
92
        // bypass
93
      } else {
94
        // check unit belongs to existed tenant
95
        if (OB_SUCCESS != (tmp_ret = try_check_migrate_unit_finish_by_tenant(tenant_id))) {
96
          LOG_WARN("fail to try check migrate unit finish by tenant", KR(tmp_ret), K(tenant_id));
97
        }
98
        // check unit not in locality but in zone list
99
        if (OB_SUCCESS != (tmp_ret = try_check_migrate_unit_finish_not_in_locality(tenant_id))) {
100
          LOG_WARN("fail to try check migrate unit finish not in locality", KR(tmp_ret), K(tenant_id));
101
        }
102
      }
103
    }
104
  }
105
  // check unit not in tenant
106
  if (OB_SUCCESS != (tmp_ret = try_check_migrate_unit_finish_not_in_tenant())) {
107
    LOG_WARN("fail to try check migrate unit finish not in tenant", KR(tmp_ret));
108
  }
109
  return ret;
110
}
111

112
int ObMigrateUnitFinishChecker::try_check_migrate_unit_finish_not_in_locality(
113
    const uint64_t &tenant_id)
114
{
115
  int ret = OB_SUCCESS;
116
  ObArray<common::ObZone> zone_list;
117
  ObSchemaGetterGuard schema_guard;
118
  if (OB_UNLIKELY(!inited_)) {
119
    ret = OB_NOT_INIT;
120
    LOG_WARN("not init", KR(ret));
121
  } else if (OB_FAIL(check_stop())) {
122
    LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret));
123
  } else if (OB_FAIL(unit_mgr_->get_tenant_pool_zone_list(tenant_id, zone_list))) {
124
    LOG_WARN("fail to get tenant pool zone list", KR(ret));
125
  } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
126
    LOG_WARN("get schema guard failed", KR(ret), K(tenant_id));
127
  } else if (OB_FAIL(unit_mgr_->finish_migrate_unit_not_in_locality(
128
             tenant_id, &schema_guard, zone_list))) {
129
    LOG_WARN("fail to finish migrat eunit not in locality", KR(ret), K(tenant_id), K(zone_list));
130
  }
131
  return ret;
132
}
133

134
int ObMigrateUnitFinishChecker::try_check_migrate_unit_finish_not_in_tenant()
135
{
136
  int ret = OB_SUCCESS;
137
  ObArray<share::ObResourcePool> pools;
138
  if (OB_UNLIKELY(!inited_)) {
139
    ret = OB_NOT_INIT;
140
    LOG_WARN("not init", KR(ret));
141
  } else if (OB_FAIL(check_stop())) {
142
    LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret));
143
  } else if (OB_FAIL(unit_mgr_->get_pools(pools))) {
144
    LOG_WARN("fail to get pools", KR(ret));
145
  } else {
146
    FOREACH_CNT_X(pool, pools, OB_SUCC(ret)) {
147
      if (OB_FAIL(unit_mgr_->finish_migrate_unit_not_in_tenant(pool))) {
148
        LOG_WARN("fail to finish migrate unit not in tenant", KR(ret));
149
      }
150
      ret = OB_SUCCESS; //ignore ret 保证所有的pool都能运行
151
    }
152
  }
153
  return ret;
154
}
155

156
int ObMigrateUnitFinishChecker::try_check_migrate_unit_finish_by_tenant(
157
    const uint64_t tenant_id)
158
{
159
  int ret = OB_SUCCESS;
160
  if (OB_UNLIKELY(!inited_)) {
161
    ret = OB_NOT_INIT;
162
    LOG_WARN("not init", KR(ret));
163
  } else if (OB_FAIL(check_stop())) {
164
    LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret));
165
  } else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id)) {
166
    ret = OB_INVALID_ARGUMENT;
167
    LOG_WARN("invalid argument", KR(ret), K(tenant_id));
168
  } else if (OB_ISNULL(sql_proxy_)) {
169
    ret = OB_ERR_UNEXPECTED;
170
    LOG_WARN("sql proxy is null", KR(ret));
171
  } else {
172
    LOG_INFO("try check migrate unit finish by tenant", K(tenant_id));
173
    DRLSInfo dr_ls_info(gen_user_tenant_id(tenant_id),
174
                        zone_mgr_,
175
                        schema_service_);
176
    ObLSStatusInfoArray ls_status_info_array;
177
    share::ObLSStatusOperator ls_status_operator;
178
    if (OB_FAIL(ls_status_operator.get_all_tenant_related_ls_status_info(
179
      *sql_proxy_, tenant_id, ls_status_info_array))) {
180
      LOG_WARN("fail to get all ls status", KR(ret), K(tenant_id));
181
    } else if (OB_FAIL(dr_ls_info.init())) {
182
      LOG_WARN("fail to init disaster log stream info", KR(ret));
183
    } else {
184
      for (int64_t i = 0; OB_SUCC(ret) && i < ls_status_info_array.count(); ++i) {
185
        share::ObLSInfo ls_info;
186
        share::ObLSStatusInfo &ls_status_info = ls_status_info_array.at(i);
187
        if (OB_FAIL(check_stop())) {
188
          LOG_WARN("DRWorker stopped", KR(ret));
189
        } else if (OB_FAIL(lst_operator_->get(
190
                GCONF.cluster_id,
191
                ls_status_info.tenant_id_,
192
                ls_status_info.ls_id_,
193
                share::ObLSTable::COMPOSITE_MODE,
194
                ls_info))) {
195
          LOG_WARN("fail to get log stream info", KR(ret));
196
        } else if (OB_FAIL(dr_ls_info.build_disaster_ls_info(
197
                ls_info,
198
                ls_status_info,
199
                true/*filter_readonly_replicas_with_flag*/))) {
200
          LOG_WARN("fail to generate dr log stream info", KR(ret));
201
        } else if (OB_FAIL(statistic_migrate_unit_by_ls(
202
                dr_ls_info,
203
                ls_status_info))) {
204
          LOG_WARN("fail to try log stream disaster recovery", KR(ret));
205
        }
206
      }
207
    }
208
    if (OB_SUCC(ret)) {
209
      if (OB_FAIL(try_finish_migrate_unit(
210
              dr_ls_info.get_unit_stat_info_map()))) {
211
        LOG_WARN("fail to try finish migrate unit", KR(ret));
212
      }
213
    }
214
  }
215
  return ret;
216
}
217

218
int ObMigrateUnitFinishChecker::statistic_migrate_unit_by_ls(
219
    DRLSInfo &dr_ls_info,
220
    share::ObLSStatusInfo &ls_status_info)
221
{
222
  int ret = OB_SUCCESS;
223
  int64_t ls_replica_cnt = 0;
224
  if (OB_UNLIKELY(!inited_)) {
225
    ret = OB_NOT_INIT;
226
    LOG_WARN("not init", KR(ret));
227
  } else if (OB_FAIL(check_stop())) {
228
    LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret));
229
  } else if (OB_FAIL(dr_ls_info.get_replica_cnt(ls_replica_cnt))) {
230
    LOG_WARN("fail to get replica cnt", KR(ret));
231
  } else {
232
    for (int64_t i = 0; OB_SUCC(ret) && i < ls_replica_cnt; ++i) {
233
      share::ObLSReplica *ls_replica = nullptr;
234
      DRServerStatInfo *server_stat_info = nullptr;
235
      DRUnitStatInfo *unit_stat_info = nullptr;
236
      DRUnitStatInfo *unit_in_group_stat_info = nullptr;
237
      if (OB_FAIL(dr_ls_info.get_replica_stat(
238
              i,
239
              ls_replica,
240
              server_stat_info,
241
              unit_stat_info,
242
              unit_in_group_stat_info))) {
243
        LOG_WARN("fail to get replica stat", KR(ret));
244
      } else if (OB_UNLIKELY(nullptr == ls_replica
245
                             || nullptr == server_stat_info
246
                             || nullptr == unit_stat_info
247
                             || nullptr == unit_in_group_stat_info)) {
248
        ret = OB_ERR_UNEXPECTED;
249
        LOG_WARN("replica stat unexpected", KR(ret),
250
                 KP(ls_replica),
251
                 KP(server_stat_info),
252
                 KP(unit_stat_info),
253
                 KP(unit_in_group_stat_info));
254
      } else if (server_stat_info->get_server() != unit_stat_info->get_unit().server_
255
          && (ls_replica->is_in_service() || ls_status_info.ls_is_creating())) {
256
        unit_stat_info->inc_outside_replica_cnt();
257
        if (unit_stat_info->get_outside_replica_cnt() <= 2) { // print the first two outside replica
258
          LOG_INFO("outside replica", KPC(ls_replica), "unit", unit_stat_info->get_unit());
259
        }
260
      }
261
    }
262
  }
263
  return ret;
264
}
265

266
int ObMigrateUnitFinishChecker::try_finish_migrate_unit(
267
    const UnitStatInfoMap &unit_stat_info_map)
268
{
269
  int ret = OB_SUCCESS;
270
  if (OB_UNLIKELY(!inited_)) {
271
    ret = OB_NOT_INIT;
272
    LOG_WARN("ObMigrateUnitFinishChecker not init", KR(ret));
273
  } else if (OB_UNLIKELY(nullptr == unit_mgr_)) {
274
    ret = OB_ERR_UNEXPECTED;
275
    LOG_WARN("unit mgr ptr is null", KR(ret), KP(unit_mgr_));
276
  } else {
277
    const UnitStatInfoMap::HashTable &inner_hash_table = unit_stat_info_map.get_hash_table();
278
    UnitStatInfoMap::HashTable::const_iterator iter = inner_hash_table.begin();
279
    for (; OB_SUCC(ret) && iter != inner_hash_table.end(); ++iter) {
280
      const DRUnitStatInfo &unit_stat_info = iter->v_;
281
      if (unit_stat_info.is_in_pool()
282
          && unit_stat_info.get_unit().migrate_from_server_.is_valid()
283
          && 0 == unit_stat_info.get_outside_replica_cnt()) {
284
        if (OB_FAIL(unit_mgr_->finish_migrate_unit(
285
                unit_stat_info.get_unit().unit_id_))) {
286
          LOG_WARN("fail to set unit migrate finish", KR(ret),
287
                   "unit_id", unit_stat_info.get_unit().unit_id_);
288
        }
289
      }
290
    }
291
  }
292
  return ret;
293
}
294

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.