oceanbase
293 строки · 10.6 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14#include "ob_migrate_unit_finish_checker.h"
15#include "lib/container/ob_array.h"
16#include "lib/container/ob_array_iterator.h"
17#include "lib/container/ob_se_array.h"
18#include "lib/container/ob_se_array_iterator.h"
19#include "share/ls/ob_ls_status_operator.h"
20#include "share/ls/ob_ls_table_operator.h"
21#include "ob_unit_manager.h"
22#include "ob_zone_manager.h"
23
24using namespace oceanbase::common;
25using namespace oceanbase::share;
26using namespace oceanbase::rootserver;
27
28ObMigrateUnitFinishChecker::ObMigrateUnitFinishChecker(volatile bool &stop)
29: inited_(false),
30unit_mgr_(nullptr),
31zone_mgr_(nullptr),
32schema_service_(nullptr),
33sql_proxy_(nullptr),
34lst_operator_(nullptr),
35stop_(stop)
36{
37}
38
39ObMigrateUnitFinishChecker::~ObMigrateUnitFinishChecker()
40{
41}
42
43int ObMigrateUnitFinishChecker::check_stop() const
44{
45int ret = OB_SUCCESS;
46if (stop_) {
47ret = OB_CANCELED;
48LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret), K(stop_));
49}
50return ret;
51}
52
53int ObMigrateUnitFinishChecker::init(
54ObUnitManager &unit_mgr,
55ObZoneManager &zone_mgr,
56share::schema::ObMultiVersionSchemaService &schema_service,
57common::ObMySQLProxy &sql_proxy,
58share::ObLSTableOperator &lst_operator)
59{
60int ret = OB_SUCCESS;
61if (OB_UNLIKELY(inited_)) {
62ret = OB_INIT_TWICE;
63LOG_WARN("init twice", KR(ret));
64} else {
65unit_mgr_ = &unit_mgr;
66zone_mgr_ = &zone_mgr;
67schema_service_ = &schema_service;
68sql_proxy_ = &sql_proxy;
69lst_operator_ = &lst_operator;
70inited_ = true;
71}
72return ret;
73}
74
75int ObMigrateUnitFinishChecker::check()
76{
77int ret = OB_SUCCESS;
78int tmp_ret = OB_SUCCESS;
79LOG_INFO("start check unit migrate finish");
80ObArray<uint64_t> tenant_id_array;
81if (OB_UNLIKELY(!inited_)) {
82ret = OB_NOT_INIT;
83LOG_WARN("not init", K(ret));
84} else if (OB_FAIL(check_stop())) {
85LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret));
86} else if (OB_UNLIKELY(ObTenantUtils::get_tenant_ids(schema_service_, tenant_id_array))) {
87LOG_WARN("fail to get tenant id array", KR(ret));
88} else {
89for (int64_t i = 0; OB_SUCC(ret) && i < tenant_id_array.count(); ++i) {
90const uint64_t tenant_id = tenant_id_array.at(i);
91if (is_meta_tenant(tenant_id)) {
92// bypass
93} else {
94// check unit belongs to existed tenant
95if (OB_SUCCESS != (tmp_ret = try_check_migrate_unit_finish_by_tenant(tenant_id))) {
96LOG_WARN("fail to try check migrate unit finish by tenant", KR(tmp_ret), K(tenant_id));
97}
98// check unit not in locality but in zone list
99if (OB_SUCCESS != (tmp_ret = try_check_migrate_unit_finish_not_in_locality(tenant_id))) {
100LOG_WARN("fail to try check migrate unit finish not in locality", KR(tmp_ret), K(tenant_id));
101}
102}
103}
104}
105// check unit not in tenant
106if (OB_SUCCESS != (tmp_ret = try_check_migrate_unit_finish_not_in_tenant())) {
107LOG_WARN("fail to try check migrate unit finish not in tenant", KR(tmp_ret));
108}
109return ret;
110}
111
112int ObMigrateUnitFinishChecker::try_check_migrate_unit_finish_not_in_locality(
113const uint64_t &tenant_id)
114{
115int ret = OB_SUCCESS;
116ObArray<common::ObZone> zone_list;
117ObSchemaGetterGuard schema_guard;
118if (OB_UNLIKELY(!inited_)) {
119ret = OB_NOT_INIT;
120LOG_WARN("not init", KR(ret));
121} else if (OB_FAIL(check_stop())) {
122LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret));
123} else if (OB_FAIL(unit_mgr_->get_tenant_pool_zone_list(tenant_id, zone_list))) {
124LOG_WARN("fail to get tenant pool zone list", KR(ret));
125} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
126LOG_WARN("get schema guard failed", KR(ret), K(tenant_id));
127} else if (OB_FAIL(unit_mgr_->finish_migrate_unit_not_in_locality(
128tenant_id, &schema_guard, zone_list))) {
129LOG_WARN("fail to finish migrat eunit not in locality", KR(ret), K(tenant_id), K(zone_list));
130}
131return ret;
132}
133
134int ObMigrateUnitFinishChecker::try_check_migrate_unit_finish_not_in_tenant()
135{
136int ret = OB_SUCCESS;
137ObArray<share::ObResourcePool> pools;
138if (OB_UNLIKELY(!inited_)) {
139ret = OB_NOT_INIT;
140LOG_WARN("not init", KR(ret));
141} else if (OB_FAIL(check_stop())) {
142LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret));
143} else if (OB_FAIL(unit_mgr_->get_pools(pools))) {
144LOG_WARN("fail to get pools", KR(ret));
145} else {
146FOREACH_CNT_X(pool, pools, OB_SUCC(ret)) {
147if (OB_FAIL(unit_mgr_->finish_migrate_unit_not_in_tenant(pool))) {
148LOG_WARN("fail to finish migrate unit not in tenant", KR(ret));
149}
150ret = OB_SUCCESS; //ignore ret 保证所有的pool都能运行
151}
152}
153return ret;
154}
155
156int ObMigrateUnitFinishChecker::try_check_migrate_unit_finish_by_tenant(
157const uint64_t tenant_id)
158{
159int ret = OB_SUCCESS;
160if (OB_UNLIKELY(!inited_)) {
161ret = OB_NOT_INIT;
162LOG_WARN("not init", KR(ret));
163} else if (OB_FAIL(check_stop())) {
164LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret));
165} else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id)) {
166ret = OB_INVALID_ARGUMENT;
167LOG_WARN("invalid argument", KR(ret), K(tenant_id));
168} else if (OB_ISNULL(sql_proxy_)) {
169ret = OB_ERR_UNEXPECTED;
170LOG_WARN("sql proxy is null", KR(ret));
171} else {
172LOG_INFO("try check migrate unit finish by tenant", K(tenant_id));
173DRLSInfo dr_ls_info(gen_user_tenant_id(tenant_id),
174zone_mgr_,
175schema_service_);
176ObLSStatusInfoArray ls_status_info_array;
177share::ObLSStatusOperator ls_status_operator;
178if (OB_FAIL(ls_status_operator.get_all_tenant_related_ls_status_info(
179*sql_proxy_, tenant_id, ls_status_info_array))) {
180LOG_WARN("fail to get all ls status", KR(ret), K(tenant_id));
181} else if (OB_FAIL(dr_ls_info.init())) {
182LOG_WARN("fail to init disaster log stream info", KR(ret));
183} else {
184for (int64_t i = 0; OB_SUCC(ret) && i < ls_status_info_array.count(); ++i) {
185share::ObLSInfo ls_info;
186share::ObLSStatusInfo &ls_status_info = ls_status_info_array.at(i);
187if (OB_FAIL(check_stop())) {
188LOG_WARN("DRWorker stopped", KR(ret));
189} else if (OB_FAIL(lst_operator_->get(
190GCONF.cluster_id,
191ls_status_info.tenant_id_,
192ls_status_info.ls_id_,
193share::ObLSTable::COMPOSITE_MODE,
194ls_info))) {
195LOG_WARN("fail to get log stream info", KR(ret));
196} else if (OB_FAIL(dr_ls_info.build_disaster_ls_info(
197ls_info,
198ls_status_info,
199true/*filter_readonly_replicas_with_flag*/))) {
200LOG_WARN("fail to generate dr log stream info", KR(ret));
201} else if (OB_FAIL(statistic_migrate_unit_by_ls(
202dr_ls_info,
203ls_status_info))) {
204LOG_WARN("fail to try log stream disaster recovery", KR(ret));
205}
206}
207}
208if (OB_SUCC(ret)) {
209if (OB_FAIL(try_finish_migrate_unit(
210dr_ls_info.get_unit_stat_info_map()))) {
211LOG_WARN("fail to try finish migrate unit", KR(ret));
212}
213}
214}
215return ret;
216}
217
218int ObMigrateUnitFinishChecker::statistic_migrate_unit_by_ls(
219DRLSInfo &dr_ls_info,
220share::ObLSStatusInfo &ls_status_info)
221{
222int ret = OB_SUCCESS;
223int64_t ls_replica_cnt = 0;
224if (OB_UNLIKELY(!inited_)) {
225ret = OB_NOT_INIT;
226LOG_WARN("not init", KR(ret));
227} else if (OB_FAIL(check_stop())) {
228LOG_WARN("ObMigrateUnitFinishChecker stopped", KR(ret));
229} else if (OB_FAIL(dr_ls_info.get_replica_cnt(ls_replica_cnt))) {
230LOG_WARN("fail to get replica cnt", KR(ret));
231} else {
232for (int64_t i = 0; OB_SUCC(ret) && i < ls_replica_cnt; ++i) {
233share::ObLSReplica *ls_replica = nullptr;
234DRServerStatInfo *server_stat_info = nullptr;
235DRUnitStatInfo *unit_stat_info = nullptr;
236DRUnitStatInfo *unit_in_group_stat_info = nullptr;
237if (OB_FAIL(dr_ls_info.get_replica_stat(
238i,
239ls_replica,
240server_stat_info,
241unit_stat_info,
242unit_in_group_stat_info))) {
243LOG_WARN("fail to get replica stat", KR(ret));
244} else if (OB_UNLIKELY(nullptr == ls_replica
245|| nullptr == server_stat_info
246|| nullptr == unit_stat_info
247|| nullptr == unit_in_group_stat_info)) {
248ret = OB_ERR_UNEXPECTED;
249LOG_WARN("replica stat unexpected", KR(ret),
250KP(ls_replica),
251KP(server_stat_info),
252KP(unit_stat_info),
253KP(unit_in_group_stat_info));
254} else if (server_stat_info->get_server() != unit_stat_info->get_unit().server_
255&& (ls_replica->is_in_service() || ls_status_info.ls_is_creating())) {
256unit_stat_info->inc_outside_replica_cnt();
257if (unit_stat_info->get_outside_replica_cnt() <= 2) { // print the first two outside replica
258LOG_INFO("outside replica", KPC(ls_replica), "unit", unit_stat_info->get_unit());
259}
260}
261}
262}
263return ret;
264}
265
266int ObMigrateUnitFinishChecker::try_finish_migrate_unit(
267const UnitStatInfoMap &unit_stat_info_map)
268{
269int ret = OB_SUCCESS;
270if (OB_UNLIKELY(!inited_)) {
271ret = OB_NOT_INIT;
272LOG_WARN("ObMigrateUnitFinishChecker not init", KR(ret));
273} else if (OB_UNLIKELY(nullptr == unit_mgr_)) {
274ret = OB_ERR_UNEXPECTED;
275LOG_WARN("unit mgr ptr is null", KR(ret), KP(unit_mgr_));
276} else {
277const UnitStatInfoMap::HashTable &inner_hash_table = unit_stat_info_map.get_hash_table();
278UnitStatInfoMap::HashTable::const_iterator iter = inner_hash_table.begin();
279for (; OB_SUCC(ret) && iter != inner_hash_table.end(); ++iter) {
280const DRUnitStatInfo &unit_stat_info = iter->v_;
281if (unit_stat_info.is_in_pool()
282&& unit_stat_info.get_unit().migrate_from_server_.is_valid()
283&& 0 == unit_stat_info.get_outside_replica_cnt()) {
284if (OB_FAIL(unit_mgr_->finish_migrate_unit(
285unit_stat_info.get_unit().unit_id_))) {
286LOG_WARN("fail to set unit migrate finish", KR(ret),
287"unit_id", unit_stat_info.get_unit().unit_id_);
288}
289}
290}
291}
292return ret;
293}
294