oceanbase

Форк
0
/
ob_root_minor_freeze.cpp 
400 строк · 12.8 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "ob_root_minor_freeze.h"
16

17
#include "share/ob_srv_rpc_proxy.h"
18
#include "share/location_cache/ob_location_service.h"
19
#include "share/ob_all_server_tracer.h"
20
#include "lib/container/ob_se_array.h"
21
#include "rootserver/ddl_task/ob_ddl_scheduler.h"
22
#include "rootserver/ob_unit_manager.h"
23
#include "rootserver/ob_rs_async_rpc_proxy.h"
24

25
namespace oceanbase
26
{
27
using namespace common;
28
using namespace obrpc;
29
using namespace share;
30
using namespace share::schema;
31

32
namespace rootserver
33
{
34
ObRootMinorFreeze::ObRootMinorFreeze()
35
    :inited_(false),
36
     stopped_(false),
37
     rpc_proxy_(NULL),
38
     unit_manager_(NULL)
39
{
40
}
41

42
ObRootMinorFreeze::~ObRootMinorFreeze()
43
{
44
  int ret = OB_SUCCESS;
45
  if (OB_FAIL(destroy())) {
46
    LOG_WARN("destroy failed", K(ret));
47
  }
48
}
49

50
int ObRootMinorFreeze::init(ObSrvRpcProxy &rpc_proxy,
51
                            ObUnitManager &unit_manager)
52
{
53
  int ret = OB_SUCCESS;
54
  if (inited_) {
55
    ret = OB_INIT_TWICE;
56
    LOG_WARN("init twice", K(ret));
57
  } else {
58
    rpc_proxy_ = &rpc_proxy;
59
    unit_manager_ = &unit_manager;
60
    stopped_ = false;
61
    inited_ = true;
62
  }
63

64
  return ret;
65
}
66

67
void ObRootMinorFreeze::start()
68
{
69
  ATOMIC_STORE(&stopped_, false);
70
}
71

72
void ObRootMinorFreeze::stop()
73
{
74
  ATOMIC_STORE(&stopped_, true);
75
}
76

77
int ObRootMinorFreeze::destroy()
78
{
79
  int ret = OB_SUCCESS;
80
  inited_ = false;
81
  return ret;
82
}
83

84
inline
85
int ObRootMinorFreeze::check_cancel() const
86
{
87
  int ret = OB_SUCCESS;
88
  if (!inited_) {
89
    ret = OB_NOT_INIT;
90
    LOG_WARN("not init", K(ret));
91
  } else if (ATOMIC_LOAD(&stopped_)) {
92
    ret = OB_CANCELED;
93
    LOG_WARN("rs is stopped", K(ret));
94
  }
95
  return ret;
96
}
97

98
inline
99
bool ObRootMinorFreeze::is_server_alive(const ObAddr &server) const
100
{
101
  int ret = OB_SUCCESS;
102
  bool is_alive = false;
103

104
  if (OB_LIKELY(server.is_valid())) {
105
    if (OB_FAIL(SVR_TRACER.check_server_alive(server, is_alive))) {
106
      LOG_WARN("fail to check whether server is alive, ", K(server), K(ret));
107
      is_alive = false;
108
    }
109
  }
110

111
  return is_alive;
112
}
113

114
int ObRootMinorFreeze::try_minor_freeze(const obrpc::ObRootMinorFreezeArg &arg) const
115
{
116
  int ret = OB_SUCCESS;
117
  if (!inited_) {
118
    ret = OB_NOT_INIT;
119
    LOG_WARN("ObRootMinorFreeze not init", K(ret));
120
  } else {
121
    ParamsContainer params;
122
    if ((arg.ls_id_.is_valid() && arg.ls_id_.id() > 0) || arg.tablet_id_.is_valid()) {
123
      if (1 == arg.tenant_ids_.count()) {
124
        if (OB_FAIL(init_params_by_ls_or_tablet(arg.tenant_ids_.at(0), arg.ls_id_, arg.tablet_id_, params))) {
125
          LOG_WARN("fail to init param by tablet_id");
126
        }
127
      } else {
128
        ret = OB_NOT_SUPPORTED;
129
        LOG_WARN("only one tenant is required for tablet_freeze", K(ret), K(arg));
130
      }
131
    } else if (arg.tenant_ids_.count() > 0) {
132
      if (OB_FAIL(init_params_by_tenant(arg.tenant_ids_, arg.zone_, arg.server_list_, params))) {
133
        LOG_WARN("fail to init param by tenant, ", K(ret), K(arg));
134
      }
135
    } else if (arg.server_list_.count() == 0 && arg.zone_.size() > 0) {
136
      if (OB_FAIL(init_params_by_zone(arg.zone_, params))) {
137
        LOG_WARN("fail to init param by zone, ", K(ret), K(arg));
138
      }
139
    } else {
140
      if (OB_FAIL(init_params_by_server(arg.server_list_, params))) {
141
        LOG_WARN("fail to init param by server, ", K(ret), K(arg));
142
      }
143
    }
144

145
    if (OB_SUCC(ret) && !params.is_empty()) {
146
      if (OB_FAIL(do_minor_freeze(params))) {
147
        LOG_WARN("fail to do minor freeze, ", K(ret));
148
      }
149
    }
150
  }
151

152
  return ret;
153
}
154

155
int ObRootMinorFreeze::do_minor_freeze(const ParamsContainer &params) const
156
{
157
  int ret = OB_SUCCESS;
158
  int tmp_ret = OB_SUCCESS;
159
  int64_t failure_cnt = 0;
160
  ObMinorFreezeProxy proxy(*rpc_proxy_, &ObSrvRpcProxy::minor_freeze);
161
  LOG_INFO("do minor freeze", K(params));
162

163
  for (int64_t i = 0; OB_SUCC(ret) && i < params.get_params().count(); ++i) {
164
    const MinorFreezeParam &param = params.get_params().at(i);
165
    if (OB_FAIL(check_cancel())) {
166
      LOG_WARN("fail to check cancel", KR(ret));
167
    } else if (OB_TMP_FAIL(proxy.call(param.server, MINOR_FREEZE_TIMEOUT, param.arg))) {
168
      LOG_WARN("proxy call failed", KR(tmp_ret), K(param.arg),
169
               "dest addr", param.server);
170
      failure_cnt++;
171
    }
172
  }
173

174
  ObArray<int> return_code_array;
175
  if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
176
    LOG_WARN("proxy wait failed", KR(ret), KR(tmp_ret));
177
    ret = OB_SUCC(ret) ? tmp_ret : ret;
178
  } else if (OB_FAIL(ret)) {
179
  } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) {
180
    LOG_WARN("return cnt not match", KR(ret), "return_cnt", return_code_array.count());
181
  } else {
182
    for (int i = 0; i < proxy.get_results().count(); ++i) {
183
      if (OB_TMP_FAIL(static_cast<int>(*proxy.get_results().at(i)))) {
184
        LOG_WARN("fail to do minor freeze on target server, ", K(tmp_ret),
185
                 "dest addr:", proxy.get_dests().at(i),
186
                 "param:", proxy.get_args().at(i));
187
        failure_cnt++;
188
      }
189
    }
190
  }
191

192
  if (OB_FAIL(ret)) {
193
  } else if (0 != failure_cnt) {
194
    ret = OB_PARTIAL_FAILED;
195
    LOG_WARN("minor freeze partial failed", KR(ret), K(failure_cnt));
196
  }
197

198
  return ret;
199
}
200

201
int ObRootMinorFreeze::is_server_belongs_to_zone(const ObAddr &addr,
202
                                                 const ObZone &zone,
203
                                                 bool &server_in_zone) const
204
{
205
  int ret = OB_SUCCESS;
206
  ObZone server_zone;
207

208
  if (0 == zone.size()) {
209
    server_in_zone = true;
210
  } else if (OB_FAIL(SVR_TRACER.get_server_zone(addr, server_zone))) {
211
    LOG_WARN("fail to get server zone", KR(ret), K(addr));
212
  } else if (server_zone == zone) {
213
    server_in_zone = true;
214
  } else {
215
    server_in_zone = false;
216
  }
217

218
  return ret;
219
}
220

221
int ObRootMinorFreeze::init_params_by_ls_or_tablet(const uint64_t tenant_id,
222
                                                   share::ObLSID ls_id,
223
                                                   const common::ObTabletID &tablet_id,
224
                                                   ParamsContainer &params) const
225
{
226
  int ret = OB_SUCCESS;
227

228
  const int64_t expire_renew_time = INT64_MAX;
229
  share::ObLSLocation location;
230
  bool is_cache_hit = false;
231
  if (OB_UNLIKELY(OB_ISNULL(GCTX.location_service_))) {
232
    ret = OB_ERR_UNEXPECTED;
233
    LOG_WARN("location service ptr is null", KR(ret));
234
  } else if (tablet_id.is_valid() && !ls_id.is_valid()) {
235
    // get ls id by tablet_id
236
    if (tablet_id.is_ls_inner_tablet()) {
237
      ret = OB_NOT_SUPPORTED;
238
      LOG_WARN("can not minor freeze inner tablet without specifying ls id", K(tenant_id), K(ls_id), K(tablet_id));
239
    } else if (OB_FAIL(GCTX.location_service_->get(tenant_id, tablet_id, expire_renew_time, is_cache_hit, ls_id))) {
240
      LOG_WARN("fail to get ls id according to tablet_id", K(ret), K(tenant_id), K(tablet_id));
241
    }
242
  }
243

244
  if (OB_FAIL(ret)) {
245
  } else if (ls_id.is_valid()) {
246
    // get ls location by ls_id
247
    if (OB_FAIL(GCTX.location_service_->get(
248
            GCONF.cluster_id, tenant_id, ls_id, expire_renew_time, is_cache_hit, location))) {
249
      LOG_WARN("get ls location failed", KR(ret), K(tenant_id), K(ls_id), K(tablet_id));
250
    }
251
  } else {
252
    ret = OB_ERR_UNEXPECTED;
253
    LOG_WARN("invalid ls_id or tablet_id", KR(ret), K(ls_id), K(tablet_id));
254
  }
255

256
  if (OB_FAIL(ret)) {
257
  } else {
258
    const ObIArray<ObLSReplicaLocation> &ls_locations = location.get_replica_locations();
259
    for (int i = 0; i < ls_locations.count() && OB_SUCC(ret); ++i) {
260
      const ObAddr &server = ls_locations.at(i).get_server();
261
      if (is_server_alive(server)) {
262
        if (OB_FAIL(params.push_back_param(server, tenant_id, ls_id, tablet_id))) {
263
          LOG_WARN("fail to add tenant & server, ", K(ret), K(tenant_id), K(ls_id), K(tablet_id));
264
        }
265
      } else {
266
        int tmp_ret = OB_SERVER_NOT_ACTIVE;
267
        LOG_WARN("server not alive or invalid", "server", server, K(tmp_ret), K(tenant_id), K(ls_id), K(tablet_id));
268
      }
269
    }
270
  }
271

272
  return ret;
273
}
274

275
int ObRootMinorFreeze::init_params_by_tenant(const ObIArray<uint64_t> &tenant_ids,
276
                                             const ObZone &zone,
277
                                             const ObIArray<ObAddr> &server_list,
278
                                             ParamsContainer &params) const
279
{
280
  int ret = OB_SUCCESS;
281
  ObSEArray<ObAddr, 256> target_server_list;
282

283
  for (int i = 0; i < tenant_ids.count() && OB_SUCC(ret); ++i) {
284
    if (server_list.count() > 0) {
285
      for (int j = 0; j < server_list.count() && OB_SUCC(ret); ++j) {
286
        if (is_server_alive(server_list.at(j))) {
287
          if (OB_FAIL(params.push_back_param(server_list.at(j), tenant_ids.at(i)))) {
288
            LOG_WARN("fail to add tenant & server, ", K(ret));
289
          }
290
        } else {
291
          ret = OB_SERVER_NOT_ACTIVE;
292
          LOG_WARN("server not alive or invalid", "server", server_list.at(j), K(ret));
293
        }
294
      }
295
    } else {
296
      // TODO: filter servers according to tenant_id
297
      if (OB_ISNULL(unit_manager_)) {
298
        ret = OB_ERR_UNEXPECTED;
299
        LOG_WARN("unit_manager_ is null", KR(ret), KP(unit_manager_));
300
      } else if (OB_FAIL(unit_manager_->get_tenant_alive_servers_non_block(tenant_ids.at(i), target_server_list))) {
301
        LOG_WARN("fail to get tenant server list, ", K(ret));
302
      } else {
303
        bool server_in_zone = false;
304
        for (int j = 0; j < target_server_list.count() && OB_SUCC(ret); ++j) {
305
          const ObAddr &server = target_server_list.at(j);
306
          if (OB_FAIL(is_server_belongs_to_zone(server, zone, server_in_zone))) {
307
            LOG_WARN("fail to check server", K(ret));
308
          } else if (server_in_zone && OB_FAIL(params.push_back_param(server, tenant_ids.at(i)))) {
309
            LOG_WARN("fail to add tenant & server", K(ret));
310
          }
311
        }
312
      }
313
    }
314
  }
315

316
  return ret;
317
}
318

319
int ObRootMinorFreeze::init_params_by_zone(const ObZone &zone,
320
                                           ParamsContainer &params) const
321
{
322
  int ret = OB_SUCCESS;
323
  ObArray<ObAddr> target_server_list;
324

325
  if (OB_UNLIKELY(0 == zone.size())) {
326
    ret = OB_ERR_UNEXPECTED;
327
  } else {
328
    if (OB_FAIL(SVR_TRACER.get_servers_of_zone(zone, target_server_list))) {
329
      LOG_WARN("fail to get tenant server list, ", KR(ret), K(zone));
330
    } else if (0 == target_server_list.count()) {
331
      ret = OB_ZONE_NOT_ACTIVE;
332
      LOG_WARN("empty zone or invalid", K(zone), K(ret));
333
    } else {
334
      for (int i = 0; i < target_server_list.count() && OB_SUCC(ret); ++i) {
335
        if (OB_FAIL(params.push_back_param(target_server_list.at(i)))) {
336
          LOG_WARN("fail to add server", K(ret));
337
        }
338
      }
339
    }
340
  }
341
  return ret;
342
}
343

344
int ObRootMinorFreeze::init_params_by_server(const ObIArray<ObAddr> &server_list,
345
                                             ParamsContainer &params) const
346
{
347
  int ret = OB_SUCCESS;
348
  if (server_list.count() > 0) {
349
    for (int i = 0; i < server_list.count() && OB_SUCC(ret); ++i) {
350
      if (is_server_alive(server_list.at(i))) {
351
        if (OB_FAIL(params.push_back_param(server_list.at(i)))) {
352
          LOG_WARN("fail to add server, ", K(ret));
353
        }
354
      } else {
355
        ret = OB_SERVER_NOT_ACTIVE;
356
        LOG_WARN("server not alive or invalid", "server", server_list.at(i), K(ret));
357
      }
358
    }
359
  } else {
360
    ObZone zone; // empty zone, get all server status
361
    ObSEArray<ObAddr, 256> target_server_list;
362

363
    // get all alive server
364
    if (OB_FAIL(SVR_TRACER.get_alive_servers(zone, target_server_list))) {
365
      LOG_WARN("fail to get alive servers, ", KR(ret), K(zone));
366
    } else {
367
      for (int i = 0; i < target_server_list.count() && OB_SUCC(ret); ++i) {
368
        if (OB_FAIL(params.push_back_param(target_server_list.at(i)))) {
369
          LOG_WARN("fail to add server, ", K(ret));
370
        }
371
      }
372
    }
373
  }
374

375
  return ret;
376
}
377

378
int ObRootMinorFreeze::ParamsContainer::push_back_param(const common::ObAddr &server,
379
                                                        const uint64_t tenant_id,
380
                                                        share::ObLSID ls_id,
381
                                                        const common::ObTabletID &tablet_id)
382
{
383
  int ret = OB_SUCCESS;
384

385
  MinorFreezeParam param;
386
  param.server = server;
387
  param.arg.ls_id_ = ls_id;
388
  param.arg.tablet_id_ = tablet_id;
389

390
  if (0 != tenant_id && OB_FAIL(param.arg.tenant_ids_.push_back(tenant_id))) {
391
    LOG_WARN("fail to push tenant_id, ", K(ret));
392
  } else if (OB_FAIL(params_.push_back(param))) {
393
    LOG_WARN("fail to push tenant_id & server, ", K(ret));
394
  }
395

396
  return ret;
397
}
398

399
} // namespace rootserver
400
} // namespace oceanbase
401

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.