oceanbase
160 строк · 5.1 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14
15#include "ob_restore_service.h"
16#include "ob_recover_table_job_scheduler.h"
17#include "share/restore/ob_import_table_struct.h"
18#include "share/restore/ob_recover_table_persist_helper.h"
19
20using namespace oceanbase;
21using namespace rootserver;
22using namespace share;
23
24
25ObRestoreService::ObRestoreService()
26: inited_(false),
27schema_service_(NULL),
28sql_proxy_(NULL),
29rpc_proxy_(NULL),
30srv_rpc_proxy_(NULL),
31self_addr_(),
32tenant_id_(OB_INVALID_TENANT_ID),
33idle_time_us_(1),
34wakeup_cnt_(0),
35restore_scheduler_(),
36recover_table_scheduler_()
37
38{
39}
40
41ObRestoreService::~ObRestoreService()
42{
43if (!has_set_stop()) {
44stop();
45wait();
46}
47}
48
49void ObRestoreService::destroy()
50{
51ObTenantThreadHelper::destroy();
52inited_ = false;
53}
54
55int ObRestoreService::init()
56{
57int ret = OB_SUCCESS;
58if (inited_) {
59ret = OB_INIT_TWICE;
60LOG_WARN("init twice", KR(ret));
61} else if (OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(GCTX.sql_proxy_)
62|| OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) {
63ret = OB_INVALID_ARGUMENT;
64LOG_WARN("invalid argument", KR(ret), KP(GCTX.schema_service_), KP(GCTX.sql_proxy_),
65KP(GCTX.rs_rpc_proxy_), KP(GCTX.srv_rpc_proxy_), KP(GCTX.lst_operator_));
66} else if (OB_FAIL(ObTenantThreadHelper::create("REST_SER", lib::TGDefIDs::SimpleLSService, *this))) {
67LOG_WARN("failed to create thread", KR(ret));
68} else if (OB_FAIL(ObTenantThreadHelper::start())) {
69LOG_WARN("fail to start thread", KR(ret));
70} else if (OB_FAIL(restore_scheduler_.init(*this))) {
71LOG_WARN("failed to init restore scheduler", K(ret));
72} else if (OB_FAIL(recover_table_scheduler_.init(
73*GCTX.schema_service_, *GCTX.sql_proxy_, *GCTX.rs_rpc_proxy_, *GCTX.srv_rpc_proxy_))) {
74LOG_WARN("failed to init recover table scheduler", K(ret));
75} else if (OB_FAIL(import_table_scheduler_.init(*GCTX.schema_service_, *GCTX.sql_proxy_))) {
76LOG_WARN("failed to init import table scheduler", K(ret));
77} else {
78schema_service_ = GCTX.schema_service_;
79sql_proxy_ = GCTX.sql_proxy_;
80rpc_proxy_ = GCTX.rs_rpc_proxy_;
81srv_rpc_proxy_ = GCTX.srv_rpc_proxy_;
82tenant_id_ = is_sys_tenant(MTL_ID()) ? MTL_ID() : gen_user_tenant_id(MTL_ID());
83self_addr_ = GCTX.self_addr();
84inited_ = true;
85}
86return ret;
87}
88
89int ObRestoreService::idle()
90{
91int ret = OB_SUCCESS;
92if (!inited_) {
93ret = OB_NOT_INIT;
94LOG_WARN("not init", K(ret));
95} else if (!has_set_stop() && 0 < ATOMIC_LOAD(&wakeup_cnt_)) {
96ATOMIC_SET(&wakeup_cnt_, 0); // wake up immediately
97} else {
98ObTenantThreadHelper::idle(idle_time_us_);
99idle_time_us_ = GCONF._restore_idle_time;
100}
101return ret;
102}
103
104int ObRestoreService::check_stop() const
105{
106int ret = OB_SUCCESS;
107if (has_set_stop()) {
108ret = OB_CANCELED;
109LOG_WARN("restore service stopped", K(ret));
110}
111return ret;
112}
113
114void ObRestoreService::do_work()
115{
116LOG_INFO("[RESTORE] restore service start");
117int ret = OB_SUCCESS;
118if (!inited_) {
119ret = OB_NOT_INIT;
120LOG_WARN("not inited", K(ret));
121} else {
122ObRSThreadFlag rs_work;
123// avoid using default idle time when observer restarts.
124idle_time_us_ = GCONF._restore_idle_time;
125const uint64_t tenant_id = MTL_ID();
126while (!has_set_stop()) {
127{
128ObCurTraceId::init(GCTX.self_addr());
129ObArray<ObPhysicalRestoreJob> job_infos;
130share::schema::ObSchemaGetterGuard schema_guard;
131const share::schema::ObTenantSchema *tenant_schema = NULL;
132if (OB_ISNULL(GCTX.schema_service_)) {
133ret = OB_ERR_UNEXPECTED;
134LOG_WARN("unexpected error", KR(ret));
135} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
136OB_SYS_TENANT_ID, schema_guard))) {
137LOG_WARN("fail to get schema guard", KR(ret));
138} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
139LOG_WARN("failed to get tenant ids", KR(ret), K(tenant_id));
140} else if (OB_ISNULL(tenant_schema)) {
141ret = OB_TENANT_NOT_EXIST;
142LOG_WARN("tenant not exist", KR(ret), K(tenant_id));
143} else if (!tenant_schema->is_normal()) {
144//tenant not normal, maybe meta or sys tenant
145//while meta tenant not ready, cannot process tenant restore job
146} else {
147restore_scheduler_.do_work();
148recover_table_scheduler_.do_work();
149import_table_scheduler_.do_work();
150idle_time_us_ = 10;
151}
152}//for schema guard, must be free
153// retry until stopped, reset ret to OB_SUCCESS
154ret = OB_SUCCESS;
155idle();
156}
157}
158LOG_INFO("[RESTORE] restore service quit");
159return;
160}