oceanbase

Форк
0
/
ob_restore_service.cpp 
160 строк · 5.1 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "ob_restore_service.h"
16
#include "ob_recover_table_job_scheduler.h"
17
#include "share/restore/ob_import_table_struct.h"
18
#include "share/restore/ob_recover_table_persist_helper.h"
19

20
using namespace oceanbase;
21
using namespace rootserver;
22
using namespace share;
23

24

25
ObRestoreService::ObRestoreService()
26
  : inited_(false),
27
    schema_service_(NULL),
28
    sql_proxy_(NULL),
29
    rpc_proxy_(NULL),
30
    srv_rpc_proxy_(NULL),
31
    self_addr_(),
32
    tenant_id_(OB_INVALID_TENANT_ID),
33
    idle_time_us_(1),
34
    wakeup_cnt_(0),
35
    restore_scheduler_(),
36
    recover_table_scheduler_()
37

38
{
39
}
40

41
ObRestoreService::~ObRestoreService()
42
{
43
  if (!has_set_stop()) {
44
    stop();
45
    wait();
46
  }
47
}
48

49
void ObRestoreService::destroy()
50
{
51
  ObTenantThreadHelper::destroy();
52
  inited_ = false;
53
}
54

55
int ObRestoreService::init()
56
{
57
  int ret = OB_SUCCESS;
58
  if (inited_) {
59
    ret = OB_INIT_TWICE;
60
    LOG_WARN("init twice", KR(ret));
61
  } else if (OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(GCTX.sql_proxy_)
62
      || OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) {
63
    ret = OB_INVALID_ARGUMENT;
64
    LOG_WARN("invalid argument", KR(ret), KP(GCTX.schema_service_), KP(GCTX.sql_proxy_),
65
        KP(GCTX.rs_rpc_proxy_), KP(GCTX.srv_rpc_proxy_), KP(GCTX.lst_operator_));
66
  } else if (OB_FAIL(ObTenantThreadHelper::create("REST_SER", lib::TGDefIDs::SimpleLSService, *this))) {
67
    LOG_WARN("failed to create thread", KR(ret));
68
  } else if (OB_FAIL(ObTenantThreadHelper::start())) {
69
    LOG_WARN("fail to start thread", KR(ret));
70
  } else if (OB_FAIL(restore_scheduler_.init(*this))) {
71
    LOG_WARN("failed to init restore scheduler", K(ret));
72
  } else if (OB_FAIL(recover_table_scheduler_.init(
73
      *GCTX.schema_service_, *GCTX.sql_proxy_, *GCTX.rs_rpc_proxy_, *GCTX.srv_rpc_proxy_))) {
74
    LOG_WARN("failed to init recover table scheduler", K(ret));
75
  } else if (OB_FAIL(import_table_scheduler_.init(*GCTX.schema_service_, *GCTX.sql_proxy_))) {
76
    LOG_WARN("failed to init import table scheduler", K(ret));
77
  } else {
78
    schema_service_ = GCTX.schema_service_;
79
    sql_proxy_ = GCTX.sql_proxy_;
80
    rpc_proxy_ = GCTX.rs_rpc_proxy_;
81
    srv_rpc_proxy_ = GCTX.srv_rpc_proxy_;
82
    tenant_id_ = is_sys_tenant(MTL_ID()) ? MTL_ID() : gen_user_tenant_id(MTL_ID());
83
    self_addr_ = GCTX.self_addr();
84
    inited_ = true;
85
  }
86
  return ret;
87
}
88

89
int ObRestoreService::idle()
90
{
91
  int ret = OB_SUCCESS;
92
  if (!inited_) {
93
    ret = OB_NOT_INIT;
94
    LOG_WARN("not init", K(ret));
95
  } else if (!has_set_stop() && 0 < ATOMIC_LOAD(&wakeup_cnt_)) {
96
    ATOMIC_SET(&wakeup_cnt_, 0); // wake up immediately
97
  } else {
98
    ObTenantThreadHelper::idle(idle_time_us_);
99
    idle_time_us_ = GCONF._restore_idle_time;
100
  }
101
  return ret;
102
}
103

104
int ObRestoreService::check_stop() const
105
{
106
  int ret = OB_SUCCESS;
107
  if (has_set_stop()) {
108
    ret = OB_CANCELED;
109
    LOG_WARN("restore service stopped", K(ret));
110
  }
111
  return ret;
112
}
113

114
void ObRestoreService::do_work()
115
{
116
  LOG_INFO("[RESTORE] restore service start");
117
  int ret = OB_SUCCESS;
118
  if (!inited_) {
119
    ret = OB_NOT_INIT;
120
    LOG_WARN("not inited", K(ret));
121
  } else {
122
    ObRSThreadFlag rs_work;
123
    // avoid using default idle time when observer restarts.
124
    idle_time_us_ = GCONF._restore_idle_time;
125
    const uint64_t tenant_id = MTL_ID();
126
    while (!has_set_stop()) {
127
      {
128
        ObCurTraceId::init(GCTX.self_addr());
129
        ObArray<ObPhysicalRestoreJob> job_infos;
130
        share::schema::ObSchemaGetterGuard schema_guard;
131
        const share::schema::ObTenantSchema *tenant_schema = NULL;
132
        if (OB_ISNULL(GCTX.schema_service_)) {
133
          ret = OB_ERR_UNEXPECTED;
134
          LOG_WARN("unexpected error", KR(ret));
135
        } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
136
                OB_SYS_TENANT_ID, schema_guard))) {
137
          LOG_WARN("fail to get schema guard", KR(ret));
138
        } else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
139
          LOG_WARN("failed to get tenant ids", KR(ret), K(tenant_id));
140
        } else if (OB_ISNULL(tenant_schema)) {
141
          ret = OB_TENANT_NOT_EXIST;
142
          LOG_WARN("tenant not exist", KR(ret), K(tenant_id));
143
        } else if (!tenant_schema->is_normal()) {
144
          //tenant not normal, maybe meta or sys tenant
145
          //while meta tenant not ready, cannot process tenant restore job
146
        } else {
147
          restore_scheduler_.do_work();
148
          recover_table_scheduler_.do_work();
149
          import_table_scheduler_.do_work();
150
          idle_time_us_ = 10;
151
        }
152
      }//for schema guard, must be free
153
      // retry until stopped, reset ret to OB_SUCCESS
154
      ret = OB_SUCCESS;
155
      idle();
156
    }
157
  }
158
  LOG_INFO("[RESTORE] restore service quit");
159
  return;
160
}

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.