oceanbase
400 строк · 12.8 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14
15#include "ob_root_minor_freeze.h"
16
17#include "share/ob_srv_rpc_proxy.h"
18#include "share/location_cache/ob_location_service.h"
19#include "share/ob_all_server_tracer.h"
20#include "lib/container/ob_se_array.h"
21#include "rootserver/ddl_task/ob_ddl_scheduler.h"
22#include "rootserver/ob_unit_manager.h"
23#include "rootserver/ob_rs_async_rpc_proxy.h"
24
25namespace oceanbase
26{
27using namespace common;
28using namespace obrpc;
29using namespace share;
30using namespace share::schema;
31
32namespace rootserver
33{
34ObRootMinorFreeze::ObRootMinorFreeze()
35:inited_(false),
36stopped_(false),
37rpc_proxy_(NULL),
38unit_manager_(NULL)
39{
40}
41
42ObRootMinorFreeze::~ObRootMinorFreeze()
43{
44int ret = OB_SUCCESS;
45if (OB_FAIL(destroy())) {
46LOG_WARN("destroy failed", K(ret));
47}
48}
49
50int ObRootMinorFreeze::init(ObSrvRpcProxy &rpc_proxy,
51ObUnitManager &unit_manager)
52{
53int ret = OB_SUCCESS;
54if (inited_) {
55ret = OB_INIT_TWICE;
56LOG_WARN("init twice", K(ret));
57} else {
58rpc_proxy_ = &rpc_proxy;
59unit_manager_ = &unit_manager;
60stopped_ = false;
61inited_ = true;
62}
63
64return ret;
65}
66
67void ObRootMinorFreeze::start()
68{
69ATOMIC_STORE(&stopped_, false);
70}
71
72void ObRootMinorFreeze::stop()
73{
74ATOMIC_STORE(&stopped_, true);
75}
76
77int ObRootMinorFreeze::destroy()
78{
79int ret = OB_SUCCESS;
80inited_ = false;
81return ret;
82}
83
84inline
85int ObRootMinorFreeze::check_cancel() const
86{
87int ret = OB_SUCCESS;
88if (!inited_) {
89ret = OB_NOT_INIT;
90LOG_WARN("not init", K(ret));
91} else if (ATOMIC_LOAD(&stopped_)) {
92ret = OB_CANCELED;
93LOG_WARN("rs is stopped", K(ret));
94}
95return ret;
96}
97
98inline
99bool ObRootMinorFreeze::is_server_alive(const ObAddr &server) const
100{
101int ret = OB_SUCCESS;
102bool is_alive = false;
103
104if (OB_LIKELY(server.is_valid())) {
105if (OB_FAIL(SVR_TRACER.check_server_alive(server, is_alive))) {
106LOG_WARN("fail to check whether server is alive, ", K(server), K(ret));
107is_alive = false;
108}
109}
110
111return is_alive;
112}
113
114int ObRootMinorFreeze::try_minor_freeze(const obrpc::ObRootMinorFreezeArg &arg) const
115{
116int ret = OB_SUCCESS;
117if (!inited_) {
118ret = OB_NOT_INIT;
119LOG_WARN("ObRootMinorFreeze not init", K(ret));
120} else {
121ParamsContainer params;
122if ((arg.ls_id_.is_valid() && arg.ls_id_.id() > 0) || arg.tablet_id_.is_valid()) {
123if (1 == arg.tenant_ids_.count()) {
124if (OB_FAIL(init_params_by_ls_or_tablet(arg.tenant_ids_.at(0), arg.ls_id_, arg.tablet_id_, params))) {
125LOG_WARN("fail to init param by tablet_id");
126}
127} else {
128ret = OB_NOT_SUPPORTED;
129LOG_WARN("only one tenant is required for tablet_freeze", K(ret), K(arg));
130}
131} else if (arg.tenant_ids_.count() > 0) {
132if (OB_FAIL(init_params_by_tenant(arg.tenant_ids_, arg.zone_, arg.server_list_, params))) {
133LOG_WARN("fail to init param by tenant, ", K(ret), K(arg));
134}
135} else if (arg.server_list_.count() == 0 && arg.zone_.size() > 0) {
136if (OB_FAIL(init_params_by_zone(arg.zone_, params))) {
137LOG_WARN("fail to init param by zone, ", K(ret), K(arg));
138}
139} else {
140if (OB_FAIL(init_params_by_server(arg.server_list_, params))) {
141LOG_WARN("fail to init param by server, ", K(ret), K(arg));
142}
143}
144
145if (OB_SUCC(ret) && !params.is_empty()) {
146if (OB_FAIL(do_minor_freeze(params))) {
147LOG_WARN("fail to do minor freeze, ", K(ret));
148}
149}
150}
151
152return ret;
153}
154
155int ObRootMinorFreeze::do_minor_freeze(const ParamsContainer ¶ms) const
156{
157int ret = OB_SUCCESS;
158int tmp_ret = OB_SUCCESS;
159int64_t failure_cnt = 0;
160ObMinorFreezeProxy proxy(*rpc_proxy_, &ObSrvRpcProxy::minor_freeze);
161LOG_INFO("do minor freeze", K(params));
162
163for (int64_t i = 0; OB_SUCC(ret) && i < params.get_params().count(); ++i) {
164const MinorFreezeParam ¶m = params.get_params().at(i);
165if (OB_FAIL(check_cancel())) {
166LOG_WARN("fail to check cancel", KR(ret));
167} else if (OB_TMP_FAIL(proxy.call(param.server, MINOR_FREEZE_TIMEOUT, param.arg))) {
168LOG_WARN("proxy call failed", KR(tmp_ret), K(param.arg),
169"dest addr", param.server);
170failure_cnt++;
171}
172}
173
174ObArray<int> return_code_array;
175if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
176LOG_WARN("proxy wait failed", KR(ret), KR(tmp_ret));
177ret = OB_SUCC(ret) ? tmp_ret : ret;
178} else if (OB_FAIL(ret)) {
179} else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) {
180LOG_WARN("return cnt not match", KR(ret), "return_cnt", return_code_array.count());
181} else {
182for (int i = 0; i < proxy.get_results().count(); ++i) {
183if (OB_TMP_FAIL(static_cast<int>(*proxy.get_results().at(i)))) {
184LOG_WARN("fail to do minor freeze on target server, ", K(tmp_ret),
185"dest addr:", proxy.get_dests().at(i),
186"param:", proxy.get_args().at(i));
187failure_cnt++;
188}
189}
190}
191
192if (OB_FAIL(ret)) {
193} else if (0 != failure_cnt) {
194ret = OB_PARTIAL_FAILED;
195LOG_WARN("minor freeze partial failed", KR(ret), K(failure_cnt));
196}
197
198return ret;
199}
200
201int ObRootMinorFreeze::is_server_belongs_to_zone(const ObAddr &addr,
202const ObZone &zone,
203bool &server_in_zone) const
204{
205int ret = OB_SUCCESS;
206ObZone server_zone;
207
208if (0 == zone.size()) {
209server_in_zone = true;
210} else if (OB_FAIL(SVR_TRACER.get_server_zone(addr, server_zone))) {
211LOG_WARN("fail to get server zone", KR(ret), K(addr));
212} else if (server_zone == zone) {
213server_in_zone = true;
214} else {
215server_in_zone = false;
216}
217
218return ret;
219}
220
221int ObRootMinorFreeze::init_params_by_ls_or_tablet(const uint64_t tenant_id,
222share::ObLSID ls_id,
223const common::ObTabletID &tablet_id,
224ParamsContainer ¶ms) const
225{
226int ret = OB_SUCCESS;
227
228const int64_t expire_renew_time = INT64_MAX;
229share::ObLSLocation location;
230bool is_cache_hit = false;
231if (OB_UNLIKELY(OB_ISNULL(GCTX.location_service_))) {
232ret = OB_ERR_UNEXPECTED;
233LOG_WARN("location service ptr is null", KR(ret));
234} else if (tablet_id.is_valid() && !ls_id.is_valid()) {
235// get ls id by tablet_id
236if (tablet_id.is_ls_inner_tablet()) {
237ret = OB_NOT_SUPPORTED;
238LOG_WARN("can not minor freeze inner tablet without specifying ls id", K(tenant_id), K(ls_id), K(tablet_id));
239} else if (OB_FAIL(GCTX.location_service_->get(tenant_id, tablet_id, expire_renew_time, is_cache_hit, ls_id))) {
240LOG_WARN("fail to get ls id according to tablet_id", K(ret), K(tenant_id), K(tablet_id));
241}
242}
243
244if (OB_FAIL(ret)) {
245} else if (ls_id.is_valid()) {
246// get ls location by ls_id
247if (OB_FAIL(GCTX.location_service_->get(
248GCONF.cluster_id, tenant_id, ls_id, expire_renew_time, is_cache_hit, location))) {
249LOG_WARN("get ls location failed", KR(ret), K(tenant_id), K(ls_id), K(tablet_id));
250}
251} else {
252ret = OB_ERR_UNEXPECTED;
253LOG_WARN("invalid ls_id or tablet_id", KR(ret), K(ls_id), K(tablet_id));
254}
255
256if (OB_FAIL(ret)) {
257} else {
258const ObIArray<ObLSReplicaLocation> &ls_locations = location.get_replica_locations();
259for (int i = 0; i < ls_locations.count() && OB_SUCC(ret); ++i) {
260const ObAddr &server = ls_locations.at(i).get_server();
261if (is_server_alive(server)) {
262if (OB_FAIL(params.push_back_param(server, tenant_id, ls_id, tablet_id))) {
263LOG_WARN("fail to add tenant & server, ", K(ret), K(tenant_id), K(ls_id), K(tablet_id));
264}
265} else {
266int tmp_ret = OB_SERVER_NOT_ACTIVE;
267LOG_WARN("server not alive or invalid", "server", server, K(tmp_ret), K(tenant_id), K(ls_id), K(tablet_id));
268}
269}
270}
271
272return ret;
273}
274
275int ObRootMinorFreeze::init_params_by_tenant(const ObIArray<uint64_t> &tenant_ids,
276const ObZone &zone,
277const ObIArray<ObAddr> &server_list,
278ParamsContainer ¶ms) const
279{
280int ret = OB_SUCCESS;
281ObSEArray<ObAddr, 256> target_server_list;
282
283for (int i = 0; i < tenant_ids.count() && OB_SUCC(ret); ++i) {
284if (server_list.count() > 0) {
285for (int j = 0; j < server_list.count() && OB_SUCC(ret); ++j) {
286if (is_server_alive(server_list.at(j))) {
287if (OB_FAIL(params.push_back_param(server_list.at(j), tenant_ids.at(i)))) {
288LOG_WARN("fail to add tenant & server, ", K(ret));
289}
290} else {
291ret = OB_SERVER_NOT_ACTIVE;
292LOG_WARN("server not alive or invalid", "server", server_list.at(j), K(ret));
293}
294}
295} else {
296// TODO: filter servers according to tenant_id
297if (OB_ISNULL(unit_manager_)) {
298ret = OB_ERR_UNEXPECTED;
299LOG_WARN("unit_manager_ is null", KR(ret), KP(unit_manager_));
300} else if (OB_FAIL(unit_manager_->get_tenant_alive_servers_non_block(tenant_ids.at(i), target_server_list))) {
301LOG_WARN("fail to get tenant server list, ", K(ret));
302} else {
303bool server_in_zone = false;
304for (int j = 0; j < target_server_list.count() && OB_SUCC(ret); ++j) {
305const ObAddr &server = target_server_list.at(j);
306if (OB_FAIL(is_server_belongs_to_zone(server, zone, server_in_zone))) {
307LOG_WARN("fail to check server", K(ret));
308} else if (server_in_zone && OB_FAIL(params.push_back_param(server, tenant_ids.at(i)))) {
309LOG_WARN("fail to add tenant & server", K(ret));
310}
311}
312}
313}
314}
315
316return ret;
317}
318
319int ObRootMinorFreeze::init_params_by_zone(const ObZone &zone,
320ParamsContainer ¶ms) const
321{
322int ret = OB_SUCCESS;
323ObArray<ObAddr> target_server_list;
324
325if (OB_UNLIKELY(0 == zone.size())) {
326ret = OB_ERR_UNEXPECTED;
327} else {
328if (OB_FAIL(SVR_TRACER.get_servers_of_zone(zone, target_server_list))) {
329LOG_WARN("fail to get tenant server list, ", KR(ret), K(zone));
330} else if (0 == target_server_list.count()) {
331ret = OB_ZONE_NOT_ACTIVE;
332LOG_WARN("empty zone or invalid", K(zone), K(ret));
333} else {
334for (int i = 0; i < target_server_list.count() && OB_SUCC(ret); ++i) {
335if (OB_FAIL(params.push_back_param(target_server_list.at(i)))) {
336LOG_WARN("fail to add server", K(ret));
337}
338}
339}
340}
341return ret;
342}
343
344int ObRootMinorFreeze::init_params_by_server(const ObIArray<ObAddr> &server_list,
345ParamsContainer ¶ms) const
346{
347int ret = OB_SUCCESS;
348if (server_list.count() > 0) {
349for (int i = 0; i < server_list.count() && OB_SUCC(ret); ++i) {
350if (is_server_alive(server_list.at(i))) {
351if (OB_FAIL(params.push_back_param(server_list.at(i)))) {
352LOG_WARN("fail to add server, ", K(ret));
353}
354} else {
355ret = OB_SERVER_NOT_ACTIVE;
356LOG_WARN("server not alive or invalid", "server", server_list.at(i), K(ret));
357}
358}
359} else {
360ObZone zone; // empty zone, get all server status
361ObSEArray<ObAddr, 256> target_server_list;
362
363// get all alive server
364if (OB_FAIL(SVR_TRACER.get_alive_servers(zone, target_server_list))) {
365LOG_WARN("fail to get alive servers, ", KR(ret), K(zone));
366} else {
367for (int i = 0; i < target_server_list.count() && OB_SUCC(ret); ++i) {
368if (OB_FAIL(params.push_back_param(target_server_list.at(i)))) {
369LOG_WARN("fail to add server, ", K(ret));
370}
371}
372}
373}
374
375return ret;
376}
377
378int ObRootMinorFreeze::ParamsContainer::push_back_param(const common::ObAddr &server,
379const uint64_t tenant_id,
380share::ObLSID ls_id,
381const common::ObTabletID &tablet_id)
382{
383int ret = OB_SUCCESS;
384
385MinorFreezeParam param;
386param.server = server;
387param.arg.ls_id_ = ls_id;
388param.arg.tablet_id_ = tablet_id;
389
390if (0 != tenant_id && OB_FAIL(param.arg.tenant_ids_.push_back(tenant_id))) {
391LOG_WARN("fail to push tenant_id, ", K(ret));
392} else if (OB_FAIL(params_.push_back(param))) {
393LOG_WARN("fail to push tenant_id & server, ", K(ret));
394}
395
396return ret;
397}
398
399} // namespace rootserver
400} // namespace oceanbase
401