oceanbase

Форк
0
/
ob_create_standby_from_net_actor.cpp 
295 строк · 11.4 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14
#include "ob_create_standby_from_net_actor.h"
15
#include "share/schema/ob_multi_version_schema_service.h" // ObMultiVersionSchemaService
16
#include "observer/ob_server_struct.h"        // GCTX
17
#include "observer/ob_service.h" // ObService
18
#include "share/rc/ob_tenant_base.h"    // MTL_ID
19
#include "rootserver/ob_tenant_info_loader.h" // ObTenantInfoLoader
20
#include "share/ob_rpc_struct.h" // ObCreateTenantEndArg
21
#include "rootserver/restore/ob_restore_scheduler.h" //reset_schema_status
22
#include "rootserver/ob_rs_async_rpc_proxy.h" // ObSwitchSchemaProxy
23
#include "share/ob_common_rpc_proxy.h" // create_tenant_end
24
#include "share/ob_schema_status_proxy.h"//ObSchemaStatusProxy
25
#include "share/ob_rpc_struct.h" // ObBroadcastSchemaArg
26
#include "share/schema/ob_multi_version_schema_service.h" // for GSCHEMASERVICE
27

28
#define STAT(level, fmt, args...) RS_LOG(level, "[NET_STANDBY_TNT_SERVICE] " fmt, ##args)
29
#define ISTAT(fmt, args...) STAT(INFO, fmt, ##args)
30
#define WSTAT(fmt, args...) STAT(WARN, fmt, ##args)
31
#define TSTAT(fmt, args...) STAT(TRACE, fmt, ##args)
32

33
namespace oceanbase
34
{
35
using namespace share;
36
using namespace common;
37
namespace rootserver
38
{
39

40
int ObCreateStandbyFromNetActor::set_idle_interval_us_(const int64_t idle_time)
41
{
42
  int ret = OB_SUCCESS;
43
  if (idle_time <= 0 || idle_time > MAX_IDLE_TIME) {
44
    ret = OB_INVALID_ARGUMENT;
45
    WSTAT("invalid idle_time", KR(ret), K(idle_time));
46
  } else {
47
    (void)ATOMIC_STORE(&idle_time_, idle_time);
48
  }
49
  return ret;
50
}
51

52

53
int ObCreateStandbyFromNetActor::init()
54
{
55
  int ret = OB_SUCCESS;
56
  sql_proxy_ = GCTX.sql_proxy_;
57

58
  if (IS_INIT) {
59
    ret = OB_INIT_TWICE;
60
    WSTAT("init twice", KR(ret));
61
  } else if (OB_ISNULL(sql_proxy_)) {
62
    ret = OB_ERR_UNEXPECTED;
63
    WSTAT("sql proxy is null", KR(ret));
64
  } else if (OB_FAIL(ObTenantThreadHelper::create("NetStandbyCT",
65
         lib::TGDefIDs::ObCreateStandbyFromNetActor, *this))) {
66
    WSTAT("failed to create NET_STANDBY_TNT_SERVICE", KR(ret));
67
  } else if (OB_FAIL(ObTenantThreadHelper::start())) {
68
    WSTAT("failed to start NET_STANDBY_TNT_SERVICE", KR(ret));
69
  } else {
70
    tenant_id_ = MTL_ID();
71
    schema_broadcasted_ = false;
72
    is_inited_ = true;
73
  }
74

75
  return ret;
76
}
77

78
void ObCreateStandbyFromNetActor::do_work()
79
{
80
  ISTAT("create standby from net actor start");
81
  int ret = OB_SUCCESS;
82
  if (OB_FAIL(check_inner_stat_())) {
83
    WSTAT("inner stat error", KR(ret), K_(is_inited));
84
  } else {
85
    while (!has_set_stop()) {
86
      ObCurTraceId::init(GCONF.self_addr_);
87
      if (OB_FAIL(do_creating_standby_tenant())) {
88
        WSTAT("create standby from net actor failed", KR(ret));
89
      }
90

91
      ISTAT("finish one round", KR(ret));
92
      idle(get_idle_interval_us_());
93
    }
94
  }
95
}
96

97
void ObCreateStandbyFromNetActor::destroy()
98
{
99
  ISTAT("create standby from net actor destory", KPC(this));
100
  ObTenantThreadHelper::destroy();
101
  is_inited_ = false;
102
  tenant_id_ = OB_INVALID_TENANT_ID;
103
  sql_proxy_ = NULL;
104
  schema_broadcasted_ = false;
105
}
106

107
int ObCreateStandbyFromNetActor::check_inner_stat_()
108
{
109
  int ret = OB_SUCCESS;
110
  if (OB_UNLIKELY(!is_inited_)) {
111
    ret = OB_NOT_INIT;
112
    WSTAT("not init", KR(ret));
113
  } else if (OB_ISNULL(sql_proxy_)) {
114
    ret = OB_ERR_UNEXPECTED;
115
    WSTAT("Member variables is NULL", KR(ret), KP(sql_proxy_));
116
  } else if (!is_user_tenant(tenant_id_)) {
117
    ret = OB_ERR_UNEXPECTED;
118
    WSTAT("shoule use meta tenant", K(tenant_id_));
119
  }
120
  return ret;
121
}
122

123
int ObCreateStandbyFromNetActor::check_has_user_ls(const uint64_t tenant_id, common::ObMySQLProxy *sql_proxy, bool &has_user_ls)
124
{
125
  int ret = OB_SUCCESS;
126
  ObLSStatusOperator status_op;
127
  ObLSStatusInfoArray ls_array;
128
  has_user_ls = false;
129
  if (!is_user_tenant(tenant_id) || OB_ISNULL(sql_proxy)) {
130
    ret = OB_INVALID_ARGUMENT;
131
    WSTAT("invalid argument", K(tenant_id), KP(sql_proxy));
132
  } else if (OB_FAIL(status_op.get_all_ls_status_by_order(tenant_id, ls_array, *sql_proxy))) {
133
    WSTAT("failed to get all ls status", KR(ret), K(tenant_id));
134
  }
135
  for (int64_t i = 0; !has_user_ls && OB_SUCC(ret) && i < ls_array.count(); ++i) {
136
    const ObLSStatusInfo &info = ls_array.at(i);
137
    if (info.is_user_ls()) {
138
      has_user_ls = true;
139
    }
140
  }// end for
141
  return ret;
142
}
143

144
int ObCreateStandbyFromNetActor::finish_restore_if_possible_()
145
{
146
  int ret = OB_SUCCESS;
147
  int tmp_ret = OB_SUCCESS;
148
  ObLSRecoveryStat recovery_stat;
149
  ObLSRecoveryStatOperator ls_recovery_operator;
150
  rootserver::ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
151
  obrpc::ObCreateTenantEndArg arg;
152
  arg.exec_tenant_id_ = OB_SYS_TENANT_ID;
153
  arg.tenant_id_ = tenant_id_;
154
  obrpc::ObCommonRpcProxy *rs_rpc_proxy = GCTX.rs_rpc_proxy_;
155
  SCN min_user_ls_create_scn = SCN::base_scn();
156
  SCN readable_scn = SCN::base_scn();
157

158
  if (OB_ISNULL(rs_rpc_proxy)) {
159
    ret = OB_ERR_UNEXPECTED;
160
    WSTAT("pointer is null", KP(rs_rpc_proxy));
161
  } else if (OB_FAIL(check_inner_stat_())) {
162
    WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_));
163
  } else if (OB_FAIL(ls_recovery_operator.get_tenant_min_user_ls_create_scn(tenant_id_, *sql_proxy_,
164
                                                                        min_user_ls_create_scn))) {
165
    WSTAT("failed to get tenant min_user_ls_create_scn", KR(ret), K_(tenant_id));
166
  } else if (OB_ISNULL(tenant_info_loader)) {
167
    ret = OB_ERR_UNEXPECTED;
168
    WSTAT("tenant info loader should not be NULL", KR(ret), KP(tenant_info_loader));
169
  } else {
170
    ISTAT("start to wait whether can finish restore", K_(tenant_id), K(min_user_ls_create_scn));
171
    DEBUG_SYNC(BLOCK_CREATE_STANDBY_TENANT_END);
172

173
    int64_t retry_cnt_after_sync_user_ls = 0;
174
    // wait 1 minute, sleep 1s and retry 60 times
175
    for (int64_t retry_cnt = 60; OB_SUCC(ret) && retry_cnt > 0 && !has_set_stop(); --retry_cnt) {
176
      bool is_dropped = false;
177
      if (OB_FAIL(GSCHEMASERVICE.check_if_tenant_has_been_dropped(tenant_id_, is_dropped))) {
178
        LOG_WARN("tenant has been dropped", KR(ret), K_(tenant_id));
179
      } else if (is_dropped) {
180
        ret = OB_TENANT_HAS_BEEN_DROPPED;
181
        LOG_WARN("tenant has been dropped", KR(ret), K_(tenant_id));
182
      } else if (OB_FAIL(tenant_info_loader->get_readable_scn(readable_scn))) {
183
        WSTAT("failed to get readable_scn", KR(ret));
184
      } else if (readable_scn >= min_user_ls_create_scn) {
185
        retry_cnt_after_sync_user_ls++;
186
        ISTAT("tenant readable scn can read inner table", K(readable_scn), K(min_user_ls_create_scn),
187
                                                          K(retry_cnt_after_sync_user_ls));
188

189
        bool is_refreshed = false;
190
        if (OB_FAIL(GSCHEMASERVICE.check_if_tenant_schema_has_been_refreshed(tenant_id_, is_refreshed))) {
191
          LOG_WARN("fail to check tenant schema has been refreshed", KR(ret), K_(tenant_id), K(retry_cnt_after_sync_user_ls));
192
        } else if (!is_refreshed || !schema_broadcasted_) {
193
          if (50 < retry_cnt_after_sync_user_ls) {
194
            WSTAT("schema has not refreshed", KR(ret), KR(tmp_ret), K(is_refreshed),
195
                                        K_(schema_broadcasted), K(retry_cnt_after_sync_user_ls));
196
          }
197
          if (OB_TMP_FAIL(refresh_schema_())) {
198
            WSTAT("failed to refresh schema", KR(ret), KR(tmp_ret), K(is_refreshed), K_(schema_broadcasted),
199
                                              K(retry_cnt_after_sync_user_ls));
200
          }
201
        } else if (OB_FAIL(rs_rpc_proxy->create_tenant_end(arg))) {
202
          WSTAT("fail to execute create tenant end", KR(ret), K_(tenant_id), K(arg), K(retry_cnt_after_sync_user_ls));
203
        } else {
204
          ISTAT("execute create_tenant_end", KR(ret), K_(tenant_id), K(arg), K(retry_cnt_after_sync_user_ls));
205
          break;
206
        }
207
      }
208
      usleep(1000 * 1000); // 1s
209
    }
210
  }
211

212
  ISTAT("finish_restore_if_possible", K(ret), K_(tenant_id), K(min_user_ls_create_scn), K(arg), K(readable_scn));
213

214
  return ret;
215
}
216

217
int ObCreateStandbyFromNetActor::refresh_schema_()
218
{
219
  int ret = OB_SUCCESS;
220
  observer::ObService *ob_service = GCTX.ob_service_;
221
  int tmp_ret = OB_SUCCESS;
222
  ObSchemaStatusProxy *schema_status_proxy = GCTX.schema_status_proxy_;
223
  ObRefreshSchemaStatus refresh_schema_status;
224

225
  if (OB_ISNULL(ob_service) || OB_ISNULL(schema_status_proxy)) {
226
    ret = OB_ERR_UNEXPECTED;
227
    WSTAT("pointer is null", KP(ob_service), KP(schema_status_proxy));
228
  } else if (OB_FAIL(check_inner_stat_())) {
229
    WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_));
230
  } else if (OB_FAIL(schema_status_proxy->get_refresh_schema_status(tenant_id_, refresh_schema_status))) {
231
    LOG_WARN("fail to get refresh schema status", KR(ret), K_(tenant_id));
232
  } else if (refresh_schema_status.snapshot_timestamp_ == 0) {
233
    if (OB_FAIL(ObRestoreScheduler::reset_schema_status(tenant_id_, sql_proxy_))) {
234
      WSTAT("failed to reset schema status", KR(ret), K_(tenant_id));
235
    }
236
  }
237

238
  if (OB_SUCC(ret) && !schema_broadcasted_) {
239
    obrpc::ObBroadcastSchemaArg arg;
240
    arg.tenant_id_ = tenant_id_;
241
    if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) {
242
      ret = OB_ERR_UNEXPECTED;
243
      LOG_WARN("pointer is null", KR(ret), KP(GCTX.rs_mgr_), KP(GCTX.rs_rpc_proxy_));
244
    } else if (OB_FAIL(GCTX.rs_rpc_proxy_->to_rs(*GCTX.rs_mgr_).broadcast_schema(arg))) {
245
      LOG_WARN("failed to broadcast schema", KR(ret), K(arg));
246
    } else {
247
      schema_broadcasted_ = true;
248
    }
249
  }
250

251
  ISTAT("refresh_schema finished", KR(ret), K_(tenant_id), K_(schema_broadcasted), K(refresh_schema_status));
252
  return ret;
253
}
254

255
int ObCreateStandbyFromNetActor::do_creating_standby_tenant()
256
{
257
  int ret = OB_SUCCESS;
258
  ObSchemaGetterGuard schema_guard;
259
  const ObSimpleTenantSchema *tenant_schema = nullptr;
260
  bool has_user_ls = false;
261
  if (OB_FAIL(check_inner_stat_())) {
262
    WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_));
263
  } else if (OB_FAIL(GSCHEMASERVICE.get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
264
    WSTAT("failed to get schema guard", KR(ret));
265
  } else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id_, tenant_schema))) {
266
    WSTAT("failed to get tenant info", KR(ret), K_(tenant_id));
267
  } else if (OB_ISNULL(tenant_schema)) {
268
    ret = OB_ERR_UNEXPECTED;
269
    WSTAT("tenant_schema is null", KR(ret), K_(tenant_id));
270
  } else if (tenant_schema->is_creating_standby_tenant_status()) {
271
    if (OB_FAIL(set_idle_interval_us_(DEFAULT_IDLE_TIME))) {
272
      WSTAT("failed to set_idle_interval_us_", KR(ret), K(tenant_id_));
273
    } else if (OB_FAIL(check_has_user_ls(tenant_id_, sql_proxy_, has_user_ls))) {
274
      WSTAT("failed to check_has_user_ls", KR(ret), K_(tenant_id), KP(sql_proxy_));
275
    } else if (!has_user_ls) {
276
      TSTAT("has not recover user ls, keep wait", KR(ret), K_(tenant_id));
277
    } else if (OB_FAIL(finish_restore_if_possible_())) {
278
      WSTAT("failed to finish_restore_if_possible", KR(ret), K_(tenant_id));
279
    }
280
  } else {
281
    ISTAT("not creating standby status, do nothing", KR(ret), K(tenant_id_), K(tenant_schema));
282
    if (OB_FAIL(set_idle_interval_us_(MAX_IDLE_TIME))) {
283
      WSTAT("failed to set_idle_interval_us_", KR(ret), K(tenant_id_));
284
    }
285
  }
286
  return ret;
287
}
288

289
}
290
}
291

292
#undef STAT
293
#undef ISTAT
294
#undef WSTAT
295
#undef TSTAT
296

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.