oceanbase
295 строк · 11.4 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14#include "ob_create_standby_from_net_actor.h"
15#include "share/schema/ob_multi_version_schema_service.h" // ObMultiVersionSchemaService
16#include "observer/ob_server_struct.h" // GCTX
17#include "observer/ob_service.h" // ObService
18#include "share/rc/ob_tenant_base.h" // MTL_ID
19#include "rootserver/ob_tenant_info_loader.h" // ObTenantInfoLoader
20#include "share/ob_rpc_struct.h" // ObCreateTenantEndArg
21#include "rootserver/restore/ob_restore_scheduler.h" //reset_schema_status
22#include "rootserver/ob_rs_async_rpc_proxy.h" // ObSwitchSchemaProxy
23#include "share/ob_common_rpc_proxy.h" // create_tenant_end
24#include "share/ob_schema_status_proxy.h"//ObSchemaStatusProxy
25#include "share/ob_rpc_struct.h" // ObBroadcastSchemaArg
26#include "share/schema/ob_multi_version_schema_service.h" // for GSCHEMASERVICE
27
28#define STAT(level, fmt, args...) RS_LOG(level, "[NET_STANDBY_TNT_SERVICE] " fmt, ##args)
29#define ISTAT(fmt, args...) STAT(INFO, fmt, ##args)
30#define WSTAT(fmt, args...) STAT(WARN, fmt, ##args)
31#define TSTAT(fmt, args...) STAT(TRACE, fmt, ##args)
32
33namespace oceanbase
34{
35using namespace share;
36using namespace common;
37namespace rootserver
38{
39
40int ObCreateStandbyFromNetActor::set_idle_interval_us_(const int64_t idle_time)
41{
42int ret = OB_SUCCESS;
43if (idle_time <= 0 || idle_time > MAX_IDLE_TIME) {
44ret = OB_INVALID_ARGUMENT;
45WSTAT("invalid idle_time", KR(ret), K(idle_time));
46} else {
47(void)ATOMIC_STORE(&idle_time_, idle_time);
48}
49return ret;
50}
51
52
53int ObCreateStandbyFromNetActor::init()
54{
55int ret = OB_SUCCESS;
56sql_proxy_ = GCTX.sql_proxy_;
57
58if (IS_INIT) {
59ret = OB_INIT_TWICE;
60WSTAT("init twice", KR(ret));
61} else if (OB_ISNULL(sql_proxy_)) {
62ret = OB_ERR_UNEXPECTED;
63WSTAT("sql proxy is null", KR(ret));
64} else if (OB_FAIL(ObTenantThreadHelper::create("NetStandbyCT",
65lib::TGDefIDs::ObCreateStandbyFromNetActor, *this))) {
66WSTAT("failed to create NET_STANDBY_TNT_SERVICE", KR(ret));
67} else if (OB_FAIL(ObTenantThreadHelper::start())) {
68WSTAT("failed to start NET_STANDBY_TNT_SERVICE", KR(ret));
69} else {
70tenant_id_ = MTL_ID();
71schema_broadcasted_ = false;
72is_inited_ = true;
73}
74
75return ret;
76}
77
78void ObCreateStandbyFromNetActor::do_work()
79{
80ISTAT("create standby from net actor start");
81int ret = OB_SUCCESS;
82if (OB_FAIL(check_inner_stat_())) {
83WSTAT("inner stat error", KR(ret), K_(is_inited));
84} else {
85while (!has_set_stop()) {
86ObCurTraceId::init(GCONF.self_addr_);
87if (OB_FAIL(do_creating_standby_tenant())) {
88WSTAT("create standby from net actor failed", KR(ret));
89}
90
91ISTAT("finish one round", KR(ret));
92idle(get_idle_interval_us_());
93}
94}
95}
96
97void ObCreateStandbyFromNetActor::destroy()
98{
99ISTAT("create standby from net actor destory", KPC(this));
100ObTenantThreadHelper::destroy();
101is_inited_ = false;
102tenant_id_ = OB_INVALID_TENANT_ID;
103sql_proxy_ = NULL;
104schema_broadcasted_ = false;
105}
106
107int ObCreateStandbyFromNetActor::check_inner_stat_()
108{
109int ret = OB_SUCCESS;
110if (OB_UNLIKELY(!is_inited_)) {
111ret = OB_NOT_INIT;
112WSTAT("not init", KR(ret));
113} else if (OB_ISNULL(sql_proxy_)) {
114ret = OB_ERR_UNEXPECTED;
115WSTAT("Member variables is NULL", KR(ret), KP(sql_proxy_));
116} else if (!is_user_tenant(tenant_id_)) {
117ret = OB_ERR_UNEXPECTED;
118WSTAT("shoule use meta tenant", K(tenant_id_));
119}
120return ret;
121}
122
123int ObCreateStandbyFromNetActor::check_has_user_ls(const uint64_t tenant_id, common::ObMySQLProxy *sql_proxy, bool &has_user_ls)
124{
125int ret = OB_SUCCESS;
126ObLSStatusOperator status_op;
127ObLSStatusInfoArray ls_array;
128has_user_ls = false;
129if (!is_user_tenant(tenant_id) || OB_ISNULL(sql_proxy)) {
130ret = OB_INVALID_ARGUMENT;
131WSTAT("invalid argument", K(tenant_id), KP(sql_proxy));
132} else if (OB_FAIL(status_op.get_all_ls_status_by_order(tenant_id, ls_array, *sql_proxy))) {
133WSTAT("failed to get all ls status", KR(ret), K(tenant_id));
134}
135for (int64_t i = 0; !has_user_ls && OB_SUCC(ret) && i < ls_array.count(); ++i) {
136const ObLSStatusInfo &info = ls_array.at(i);
137if (info.is_user_ls()) {
138has_user_ls = true;
139}
140}// end for
141return ret;
142}
143
144int ObCreateStandbyFromNetActor::finish_restore_if_possible_()
145{
146int ret = OB_SUCCESS;
147int tmp_ret = OB_SUCCESS;
148ObLSRecoveryStat recovery_stat;
149ObLSRecoveryStatOperator ls_recovery_operator;
150rootserver::ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
151obrpc::ObCreateTenantEndArg arg;
152arg.exec_tenant_id_ = OB_SYS_TENANT_ID;
153arg.tenant_id_ = tenant_id_;
154obrpc::ObCommonRpcProxy *rs_rpc_proxy = GCTX.rs_rpc_proxy_;
155SCN min_user_ls_create_scn = SCN::base_scn();
156SCN readable_scn = SCN::base_scn();
157
158if (OB_ISNULL(rs_rpc_proxy)) {
159ret = OB_ERR_UNEXPECTED;
160WSTAT("pointer is null", KP(rs_rpc_proxy));
161} else if (OB_FAIL(check_inner_stat_())) {
162WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_));
163} else if (OB_FAIL(ls_recovery_operator.get_tenant_min_user_ls_create_scn(tenant_id_, *sql_proxy_,
164min_user_ls_create_scn))) {
165WSTAT("failed to get tenant min_user_ls_create_scn", KR(ret), K_(tenant_id));
166} else if (OB_ISNULL(tenant_info_loader)) {
167ret = OB_ERR_UNEXPECTED;
168WSTAT("tenant info loader should not be NULL", KR(ret), KP(tenant_info_loader));
169} else {
170ISTAT("start to wait whether can finish restore", K_(tenant_id), K(min_user_ls_create_scn));
171DEBUG_SYNC(BLOCK_CREATE_STANDBY_TENANT_END);
172
173int64_t retry_cnt_after_sync_user_ls = 0;
174// wait 1 minute, sleep 1s and retry 60 times
175for (int64_t retry_cnt = 60; OB_SUCC(ret) && retry_cnt > 0 && !has_set_stop(); --retry_cnt) {
176bool is_dropped = false;
177if (OB_FAIL(GSCHEMASERVICE.check_if_tenant_has_been_dropped(tenant_id_, is_dropped))) {
178LOG_WARN("tenant has been dropped", KR(ret), K_(tenant_id));
179} else if (is_dropped) {
180ret = OB_TENANT_HAS_BEEN_DROPPED;
181LOG_WARN("tenant has been dropped", KR(ret), K_(tenant_id));
182} else if (OB_FAIL(tenant_info_loader->get_readable_scn(readable_scn))) {
183WSTAT("failed to get readable_scn", KR(ret));
184} else if (readable_scn >= min_user_ls_create_scn) {
185retry_cnt_after_sync_user_ls++;
186ISTAT("tenant readable scn can read inner table", K(readable_scn), K(min_user_ls_create_scn),
187K(retry_cnt_after_sync_user_ls));
188
189bool is_refreshed = false;
190if (OB_FAIL(GSCHEMASERVICE.check_if_tenant_schema_has_been_refreshed(tenant_id_, is_refreshed))) {
191LOG_WARN("fail to check tenant schema has been refreshed", KR(ret), K_(tenant_id), K(retry_cnt_after_sync_user_ls));
192} else if (!is_refreshed || !schema_broadcasted_) {
193if (50 < retry_cnt_after_sync_user_ls) {
194WSTAT("schema has not refreshed", KR(ret), KR(tmp_ret), K(is_refreshed),
195K_(schema_broadcasted), K(retry_cnt_after_sync_user_ls));
196}
197if (OB_TMP_FAIL(refresh_schema_())) {
198WSTAT("failed to refresh schema", KR(ret), KR(tmp_ret), K(is_refreshed), K_(schema_broadcasted),
199K(retry_cnt_after_sync_user_ls));
200}
201} else if (OB_FAIL(rs_rpc_proxy->create_tenant_end(arg))) {
202WSTAT("fail to execute create tenant end", KR(ret), K_(tenant_id), K(arg), K(retry_cnt_after_sync_user_ls));
203} else {
204ISTAT("execute create_tenant_end", KR(ret), K_(tenant_id), K(arg), K(retry_cnt_after_sync_user_ls));
205break;
206}
207}
208usleep(1000 * 1000); // 1s
209}
210}
211
212ISTAT("finish_restore_if_possible", K(ret), K_(tenant_id), K(min_user_ls_create_scn), K(arg), K(readable_scn));
213
214return ret;
215}
216
217int ObCreateStandbyFromNetActor::refresh_schema_()
218{
219int ret = OB_SUCCESS;
220observer::ObService *ob_service = GCTX.ob_service_;
221int tmp_ret = OB_SUCCESS;
222ObSchemaStatusProxy *schema_status_proxy = GCTX.schema_status_proxy_;
223ObRefreshSchemaStatus refresh_schema_status;
224
225if (OB_ISNULL(ob_service) || OB_ISNULL(schema_status_proxy)) {
226ret = OB_ERR_UNEXPECTED;
227WSTAT("pointer is null", KP(ob_service), KP(schema_status_proxy));
228} else if (OB_FAIL(check_inner_stat_())) {
229WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_));
230} else if (OB_FAIL(schema_status_proxy->get_refresh_schema_status(tenant_id_, refresh_schema_status))) {
231LOG_WARN("fail to get refresh schema status", KR(ret), K_(tenant_id));
232} else if (refresh_schema_status.snapshot_timestamp_ == 0) {
233if (OB_FAIL(ObRestoreScheduler::reset_schema_status(tenant_id_, sql_proxy_))) {
234WSTAT("failed to reset schema status", KR(ret), K_(tenant_id));
235}
236}
237
238if (OB_SUCC(ret) && !schema_broadcasted_) {
239obrpc::ObBroadcastSchemaArg arg;
240arg.tenant_id_ = tenant_id_;
241if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) {
242ret = OB_ERR_UNEXPECTED;
243LOG_WARN("pointer is null", KR(ret), KP(GCTX.rs_mgr_), KP(GCTX.rs_rpc_proxy_));
244} else if (OB_FAIL(GCTX.rs_rpc_proxy_->to_rs(*GCTX.rs_mgr_).broadcast_schema(arg))) {
245LOG_WARN("failed to broadcast schema", KR(ret), K(arg));
246} else {
247schema_broadcasted_ = true;
248}
249}
250
251ISTAT("refresh_schema finished", KR(ret), K_(tenant_id), K_(schema_broadcasted), K(refresh_schema_status));
252return ret;
253}
254
255int ObCreateStandbyFromNetActor::do_creating_standby_tenant()
256{
257int ret = OB_SUCCESS;
258ObSchemaGetterGuard schema_guard;
259const ObSimpleTenantSchema *tenant_schema = nullptr;
260bool has_user_ls = false;
261if (OB_FAIL(check_inner_stat_())) {
262WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_));
263} else if (OB_FAIL(GSCHEMASERVICE.get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
264WSTAT("failed to get schema guard", KR(ret));
265} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id_, tenant_schema))) {
266WSTAT("failed to get tenant info", KR(ret), K_(tenant_id));
267} else if (OB_ISNULL(tenant_schema)) {
268ret = OB_ERR_UNEXPECTED;
269WSTAT("tenant_schema is null", KR(ret), K_(tenant_id));
270} else if (tenant_schema->is_creating_standby_tenant_status()) {
271if (OB_FAIL(set_idle_interval_us_(DEFAULT_IDLE_TIME))) {
272WSTAT("failed to set_idle_interval_us_", KR(ret), K(tenant_id_));
273} else if (OB_FAIL(check_has_user_ls(tenant_id_, sql_proxy_, has_user_ls))) {
274WSTAT("failed to check_has_user_ls", KR(ret), K_(tenant_id), KP(sql_proxy_));
275} else if (!has_user_ls) {
276TSTAT("has not recover user ls, keep wait", KR(ret), K_(tenant_id));
277} else if (OB_FAIL(finish_restore_if_possible_())) {
278WSTAT("failed to finish_restore_if_possible", KR(ret), K_(tenant_id));
279}
280} else {
281ISTAT("not creating standby status, do nothing", KR(ret), K(tenant_id_), K(tenant_schema));
282if (OB_FAIL(set_idle_interval_us_(MAX_IDLE_TIME))) {
283WSTAT("failed to set_idle_interval_us_", KR(ret), K(tenant_id_));
284}
285}
286return ret;
287}
288
289}
290}
291
292#undef STAT
293#undef ISTAT
294#undef WSTAT
295#undef TSTAT
296