oceanbase

Форк
0
/
ob_restore_common_util.cpp 
396 строк · 16.8 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS_RESTORE
14

15
#include "ob_restore_common_util.h"
16
#include "share/ls/ob_ls_status_operator.h" //ObLSStatusOperator
17
#include "share/ls/ob_ls_operator.h"//ObLSAttr
18
#include "rootserver/ob_ls_service_helper.h"
19
#include "rootserver/ob_tenant_role_transition_service.h"
20
#include "src/share/ob_schema_status_proxy.h"
21
#include "src/share/ob_rpc_struct.h"
22
#include "rootserver/ob_ddl_service.h"
23
#ifdef OB_BUILD_TDE_SECURITY
24
#include "share/ob_master_key_getter.h"
25
#endif
26

27
using namespace oceanbase::share::schema;
28
using namespace oceanbase::rootserver;
29
using namespace oceanbase::share;
30
using namespace oceanbase::common;
31

32
int ObRestoreCommonUtil::notify_root_key(
33
    obrpc::ObSrvRpcProxy *srv_rpc_proxy_,
34
    common::ObMySQLProxy *sql_proxy_,
35
    const uint64_t tenant_id,
36
    const share::ObRootKey &root_key)
37
{
38
  int ret = OB_SUCCESS;
39
#ifdef OB_BUILD_TDE_SECURITY
40
  if (OB_UNLIKELY(!is_user_tenant(tenant_id))) {
41
    ret = OB_INVALID_ARGUMENT;
42
    LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
43
  } else if (OB_UNLIKELY(obrpc::RootKeyType::INVALID == root_key.key_type_)) {
44
    ret = OB_INVALID_ARGUMENT;
45
    LOG_WARN("invalid root_key", KR(ret), K(tenant_id));
46
  } else if (OB_ISNULL(srv_rpc_proxy_) || OB_ISNULL(sql_proxy_)) {
47
    ret = OB_ERR_UNEXPECTED;
48
    LOG_WARN("unexpected null svr rpc proxy or sql proxy", KR(ret),
49
                                    KP(srv_rpc_proxy_), KP(sql_proxy_));
50
  } else {
51
    obrpc::ObRootKeyArg arg;
52
    obrpc::ObRootKeyResult result;
53
    ObUnitTableOperator unit_operator;
54
    ObArray<ObUnit> units;
55
    ObArray<ObAddr> addrs;
56
    arg.tenant_id_ = tenant_id;
57
    arg.is_set_ = true;
58
    arg.key_type_ = root_key.key_type_;
59
    arg.root_key_ = root_key.key_;
60
    if (OB_FAIL(unit_operator.init(*sql_proxy_))) {
61
      LOG_WARN("failed to init unit operator", KR(ret));
62
    } else if (OB_FAIL(unit_operator.get_units_by_tenant(tenant_id, units))) {
63
      LOG_WARN("failed to get tenant unit", KR(ret), K(tenant_id));
64
    }
65
    for (int64_t i = 0; OB_SUCC(ret) && i < units.count(); i++) {
66
      const ObUnit &unit = units.at(i);
67
      if (OB_FAIL(addrs.push_back(unit.server_))) {
68
        LOG_WARN("failed to push back addr", KR(ret));
69
      }
70
    }
71
    if (OB_FAIL(ret)) {
72
    } else if (OB_FAIL(ObDDLService::notify_root_key(*srv_rpc_proxy_, arg, addrs, result))) {
73
      LOG_WARN("failed to notify root key", KR(ret));
74
    }
75
  }
76
#endif
77
  return ret;
78
}
79

80
int ObRestoreCommonUtil::create_all_ls(
81
    common::ObMySQLProxy *sql_proxy,
82
    const uint64_t tenant_id,
83
    const share::schema::ObTenantSchema &tenant_schema,
84
    const common::ObIArray<share::ObLSAttr> &ls_attr_array,
85
    const uint64_t source_tenant_id)
86
{
87
  int ret = OB_SUCCESS;
88
  ObLSStatusOperator status_op;
89
  ObLSStatusInfo status_info;
90
  if (OB_UNLIKELY(!is_user_tenant(tenant_id)
91
                  || !tenant_schema.is_valid()
92
                  || ls_attr_array.empty()
93
                  || OB_ISNULL(sql_proxy))) {
94
    ret = OB_INVALID_ARGUMENT;
95
    LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(tenant_schema),
96
                                              K(ls_attr_array), KP(sql_proxy));
97
  } else {
98
    common::ObMySQLTransaction trans;
99
    const int64_t exec_tenant_id = ObLSLifeIAgent::get_exec_tenant_id(tenant_id);
100

101
    if (OB_FAIL(trans.start(sql_proxy, exec_tenant_id))) {
102
      LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
103
    } else {
104
      //must be in trans
105
      //Multiple LS groups will be created here.
106
      //In order to ensure that each LS group can be evenly distributed in the unit group,
107
      //it is necessary to read the distribution of LS groups within the transaction.
108
      ObTenantLSInfo tenant_stat(sql_proxy, &tenant_schema, tenant_id, &trans);
109
      for (int64_t i = 0; OB_SUCC(ret) && i < ls_attr_array.count(); ++i) {
110
        const ObLSAttr &ls_info = ls_attr_array.at(i);
111
        ObLSFlag ls_flag = ls_info.get_ls_flag();
112
        if (ls_info.get_ls_id().is_sys_ls()) {
113
        } else if (OB_SUCC(status_op.get_ls_status_info(tenant_id, ls_info.get_ls_id(),
114
                status_info, trans))) {
115
          LOG_INFO("[RESTORE] ls already exist", K(ls_info), K(tenant_id));
116
        } else if (OB_ENTRY_NOT_EXIST != ret) {
117
          LOG_WARN("failed to get ls status info", KR(ret), K(tenant_id), K(ls_info));
118
        } else if (OB_FAIL(ObLSServiceHelper::create_new_ls_in_trans(
119
                   ls_info.get_ls_id(), ls_info.get_ls_group_id(), ls_info.get_create_scn(),
120
                   share::NORMAL_SWITCHOVER_STATUS, tenant_stat, trans, ls_flag, source_tenant_id))) {
121
          LOG_WARN("failed to add new ls status info", KR(ret), K(ls_info), K(source_tenant_id));
122
        }
123
        LOG_INFO("create init ls", KR(ret), K(ls_info), K(source_tenant_id));
124
      }
125
    }
126
    int tmp_ret = OB_SUCCESS;
127
    if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
128
      ret = OB_SUCC(ret) ? tmp_ret : ret;
129
      LOG_WARN("failed to end trans", KR(ret), KR(tmp_ret));
130
    }
131
  }
132
  return ret;
133
}
134

135
int ObRestoreCommonUtil::finish_create_ls(
136
    common::ObMySQLProxy *sql_proxy,
137
    const share::schema::ObTenantSchema &tenant_schema,
138
    const common::ObIArray<share::ObLSAttr> &ls_attr_array)
139
{
140
  int ret = OB_SUCCESS;
141
  if (OB_UNLIKELY(!tenant_schema.is_valid()
142
                  || OB_ISNULL(sql_proxy))) {
143
    ret = OB_INVALID_ARGUMENT;
144
    LOG_WARN("invalid argument", KR(ret), K(tenant_schema), KP(sql_proxy));
145
  } else {
146
    const uint64_t tenant_id = tenant_schema.get_tenant_id();
147
    const int64_t exec_tenant_id = ObLSLifeIAgent::get_exec_tenant_id(tenant_id);
148
    common::ObMySQLTransaction trans;
149
    ObLSStatusOperator status_op;
150
    ObLSStatusInfoArray ls_array;
151
    ObLSStatus ls_info = share::OB_LS_EMPTY;//ls status in __all_ls
152
    if (OB_FAIL(status_op.get_all_ls_status_by_order(tenant_id, ls_array,
153
                                                     *sql_proxy))) {
154
      LOG_WARN("failed to get all ls status", KR(ret), K(tenant_id));
155
    } else if (OB_FAIL(trans.start(sql_proxy, exec_tenant_id))) {
156
      LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
157
    } else {
158
      for (int64_t i = 0; OB_SUCC(ret) && i < ls_array.count(); ++i) {
159
        const ObLSStatusInfo &status_info = ls_array.at(i);
160
        if (OB_UNLIKELY(status_info.ls_is_creating())) {
161
          ret = OB_ERR_UNEXPECTED;
162
          LOG_WARN("ls should be created", KR(ret), K(status_info));
163
        } else {
164
          ret = OB_ENTRY_NOT_EXIST;
165
          for (int64_t j = 0; OB_ENTRY_NOT_EXIST == ret && j < ls_attr_array.count(); ++j) {
166
            if (ls_attr_array.at(i).get_ls_id() == status_info.ls_id_) {
167
              ret = OB_SUCCESS;
168
              ls_info = ls_attr_array.at(i).get_ls_status();
169
            }
170
          }
171
          if (OB_FAIL(ret)) {
172
            LOG_WARN("failed to find ls in attr", KR(ret), K(status_info), K(ls_attr_array));
173
          } else if (share::OB_LS_CREATING == ls_info) {
174
            //no need to update
175
          } else if (ls_info == status_info.status_) {
176
            //no need update
177
          } else if (OB_FAIL(status_op.update_ls_status_in_trans(
178
                  tenant_id, status_info.ls_id_, status_info.status_,
179
                  ls_info, share::NORMAL_SWITCHOVER_STATUS, trans))) {
180
            LOG_WARN("failed to update status", KR(ret), K(tenant_id), K(status_info), K(ls_info));
181
          } else {
182
            LOG_INFO("[RESTORE] update ls status", K(tenant_id), K(status_info), K(ls_info));
183
          }
184
        }
185
      }
186
    }
187
    int tmp_ret = OB_SUCCESS;
188
    if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
189
      ret = OB_SUCC(ret) ? tmp_ret : ret;
190
      LOG_WARN("failed to end trans", KR(ret), KR(tmp_ret));
191
    }
192
  }
193
  return ret;
194
}
195

196
int ObRestoreCommonUtil::try_update_tenant_role(common::ObMySQLProxy *sql_proxy,
197
                                                const uint64_t tenant_id,
198
                                                const share::SCN &restore_scn,
199
                                                const bool is_clone,
200
                                                bool &sync_satisfied)
201
{
202
  int ret = OB_SUCCESS;
203
  sync_satisfied = true;
204
  ObAllTenantInfo all_tenant_info;
205
  int64_t new_switch_ts = 0;
206
  bool need_update = false;
207

208
  if (OB_UNLIKELY(!is_user_tenant(tenant_id)
209
                  || OB_ISNULL(sql_proxy)
210
                  || OB_ISNULL(GCTX.srv_rpc_proxy_))) {
211
    ret = OB_ERR_UNEXPECTED;
212
    LOG_WARN("not user tenant or proxy is null", KR(ret), K(tenant_id),
213
                          KP(sql_proxy), KP(GCTX.srv_rpc_proxy_));
214
  } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, sql_proxy,
215
          false/*for_update*/, all_tenant_info))) {
216
    LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id));
217
  } else if (!is_clone && all_tenant_info.is_restore()) {
218
    need_update = true;
219
  } else if (is_clone && all_tenant_info.is_clone()) {
220
    need_update = true;
221
  }
222

223
  if (OB_SUCC(ret) && need_update) {
224
    //update tenant role to standby tenant
225
    if (all_tenant_info.get_sync_scn() != restore_scn) {
226
      sync_satisfied = false;
227
      LOG_WARN("tenant sync scn not equal to restore scn", KR(ret),
228
                      K(all_tenant_info), K(restore_scn));
229
    } else if (OB_FAIL(ObAllTenantInfoProxy::update_tenant_role(
230
            tenant_id, sql_proxy, all_tenant_info.get_switchover_epoch(),
231
            share::STANDBY_TENANT_ROLE, all_tenant_info.get_switchover_status(),
232
            share::NORMAL_SWITCHOVER_STATUS, new_switch_ts))) {
233
      LOG_WARN("failed to update tenant role", KR(ret), K(tenant_id), K(all_tenant_info));
234
    } else {
235
      ObTenantRoleTransitionService role_transition_service(tenant_id, sql_proxy,
236
                                GCTX.srv_rpc_proxy_, obrpc::ObSwitchTenantArg::OpType::INVALID);
237
      (void)role_transition_service.broadcast_tenant_info(
238
            ObTenantRoleTransitionConstants::RESTORE_TO_STANDBY_LOG_MOD_STR);
239
    }
240
  }
241
  return ret;
242
}
243

244
int ObRestoreCommonUtil::process_schema(common::ObMySQLProxy *sql_proxy,
245
                                        const uint64_t tenant_id)
246
{
247
  int ret = OB_SUCCESS;
248

249
  if (OB_UNLIKELY(!is_user_tenant(tenant_id))) {
250
    ret = OB_INVALID_ARGUMENT;
251
    LOG_WARN("invalid argument", KR(ret), K(tenant_id));
252
  } else if (OB_ISNULL(sql_proxy)) {
253
    ret = OB_ERR_UNEXPECTED;
254
    LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy));
255
  } else {
256
    //reset schema status
257
    ObSchemaStatusProxy proxy(*sql_proxy);
258
    ObRefreshSchemaStatus schema_status(tenant_id, OB_INVALID_TIMESTAMP, OB_INVALID_VERSION);
259
    if (OB_FAIL(proxy.init())) {
260
      LOG_WARN("failed to init schema proxy", KR(ret));
261
    } else if (OB_FAIL(proxy.set_tenant_schema_status(schema_status))) {
262
      LOG_WARN("failed to update schema status", KR(ret), K(schema_status));
263
    }
264
  }
265

266
  if (OB_SUCC(ret)) {
267
    obrpc::ObBroadcastSchemaArg arg;
268
    arg.tenant_id_ = tenant_id;
269
    if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) {
270
      ret = OB_ERR_UNEXPECTED;
271
      LOG_WARN("rs_rpc_proxy_ or rs_mgr_is null", KR(ret), KP(GCTX.rs_rpc_proxy_), KP(GCTX.rs_mgr_));
272
    } else if (OB_FAIL(GCTX.rs_rpc_proxy_->to_rs(*GCTX.rs_mgr_).broadcast_schema(arg))) {
273
      LOG_WARN("failed to broadcast schema", KR(ret), K(arg));
274
    }
275
  }
276

277
  return ret;
278
}
279

280
int ObRestoreCommonUtil::check_tenant_is_existed(ObMultiVersionSchemaService *schema_service,
281
                                                 const uint64_t tenant_id,
282
                                                 bool &is_existed)
283
{
284
  int ret = OB_SUCCESS;
285
  is_existed = true;
286
  ObSchemaGetterGuard schema_guard;
287
  bool tenant_dropped = false;
288

289
  if (OB_INVALID_TENANT_ID == tenant_id) {
290
    //maybe failed to create tenant
291
    is_existed = false;
292
    LOG_INFO("tenant maybe failed to create", KR(ret));
293
  } else if (OB_ISNULL(schema_service)) {
294
    ret = OB_ERR_UNEXPECTED;
295
    LOG_WARN("schema service is null", KR(ret), KP(schema_service));
296
  } else if (OB_FAIL(schema_service->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
297
    LOG_WARN("fail to get tenant schema guard", KR(ret));
298
  } else if (OB_SUCCESS != schema_guard.check_formal_guard()) {
299
    ret = OB_SCHEMA_ERROR;
300
    LOG_WARN("failed to check formal gurad", KR(ret));
301
  } else if (OB_FAIL(schema_guard.check_if_tenant_has_been_dropped(tenant_id, tenant_dropped))) {
302
    LOG_WARN("failed to check tenant is beed dropped", KR(ret), K(tenant_id));
303
  } else if (tenant_dropped) {
304
    is_existed = false;
305
    LOG_INFO("restore tenant has been dropped", KR(ret), K(tenant_id));
306
  } else {
307
    //check restore tenant's meta tenant is valid to read
308
    const share::schema::ObTenantSchema *tenant_schema = NULL;
309
    const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
310
    if (OB_FAIL(schema_guard.get_tenant_info(meta_tenant_id, tenant_schema))) {
311
      LOG_WARN("failed to get tenant info", KR(ret), K(meta_tenant_id));
312
    } else if (OB_ISNULL(tenant_schema)) {
313
      ret = OB_TENANT_NOT_EXIST;
314
      LOG_WARN("tenant not exist", KR(ret), K(meta_tenant_id));
315
    } else if (tenant_schema->is_normal()) {
316
      is_existed = true;
317
    } else {
318
      //other status cannot get result from meta
319
      is_existed = false;
320
      LOG_WARN("meta tenant of restore tenant not normal", KR(ret), KPC(tenant_schema));
321
    }
322
  }
323
  return ret;
324
}
325

326
int ObRestoreCommonUtil::set_tde_parameters(common::ObMySQLProxy *sql_proxy,
327
                                            obrpc::ObCommonRpcProxy *rpc_proxy,
328
                                            const uint64_t tenant_id,
329
                                            const ObString &tde_method,
330
                                            const ObString &kms_info)
331
{
332
  int ret = OB_SUCCESS;
333
#ifdef OB_BUILD_TDE_SECURITY
334
  ObSqlString sql;
335
  int64_t affected_row = 0;
336
  if (OB_UNLIKELY(!is_user_tenant(tenant_id)
337
                  || !ObTdeMethodUtil::is_valid(tde_method)
338
                  || NULL == sql_proxy
339
                  || NULL == rpc_proxy)) {
340
    ret = OB_INVALID_ARGUMENT;
341
    LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(tde_method), KP(sql_proxy), KP(rpc_proxy));
342
  } else if (OB_FAIL(sql.assign_fmt("ALTER SYSTEM SET tde_method = '%.*s'",
343
                                                    tde_method.length(), tde_method.ptr()))) {
344
    LOG_WARN("failed to assign fmt", KR(ret), K(tde_method));
345
  } else if (OB_FAIL(sql_proxy->write(tenant_id, sql.ptr(), affected_row))) {
346
    LOG_WARN("failed to execute", KR(ret), K(tenant_id), K(sql));
347
  } else if (ObTdeMethodUtil::is_internal(tde_method)) {
348
    // do nothing
349
  } else if (FALSE_IT(sql.reset())) {
350
  } else if (OB_UNLIKELY(kms_info.empty())) {
351
    ret = OB_INVALID_ARGUMENT;
352
    LOG_WARN("kms_info should not be empty", KR(ret));
353
  } else if (OB_FAIL(sql.assign_fmt("ALTER SYSTEM SET external_kms_info= '%.*s'",
354
                                                    kms_info.length(), kms_info.ptr()))) {
355
    LOG_WARN("failed to assign fmt", KR(ret));
356
  } else if (OB_FAIL(sql_proxy->write(tenant_id, sql.ptr(), affected_row))) {
357
    LOG_WARN("failed to execute", KR(ret), K(tenant_id));
358
  }
359
  if (OB_SUCC(ret)) {
360
    const int64_t DEFAULT_TIMEOUT = GCONF.internal_sql_execute_timeout;
361
    obrpc::ObReloadMasterKeyArg arg;
362
    obrpc::ObReloadMasterKeyResult result;
363
    arg.tenant_id_ = tenant_id;
364
    if (OB_FAIL(rpc_proxy->timeout(DEFAULT_TIMEOUT).reload_master_key(arg, result))) {
365
      LOG_WARN("fail to reload master key", KR(ret), K(arg), K(DEFAULT_TIMEOUT));
366
    } else if (result.master_key_id_ > 0 ) {
367
      bool is_active = false;
368
      const int64_t SLEEP_US = 5 * 1000 * 1000L; // 5s
369
      const int64_t MAX_WAIT_US = 60 * 1000 * 1000L; // 60s
370
      const int64_t start = ObTimeUtility::current_time();
371
      char master_key[OB_MAX_MASTER_KEY_LENGTH] = {'\0'};
372
      int64_t master_key_len = 0;
373
      uint64_t master_key_id = 0;
374
      while (OB_SUCC(ret) && !is_active) {
375
        if (ObTimeUtility::current_time() - start > MAX_WAIT_US) {
376
          ret = OB_TIMEOUT;
377
          LOG_WARN("use too much time", KR(ret), "cost_us", ObTimeUtility::current_time() - start);
378
        } else if (OB_FAIL(ObMasterKeyGetter::get_active_master_key(tenant_id, master_key,
379
                                                                OB_MAX_MASTER_KEY_LENGTH,
380
                                                                master_key_len, master_key_id))) {
381
          if (OB_KEYSTORE_OPEN_NO_MASTER_KEY == ret) {
382
            ret = OB_SUCCESS;
383
            LOG_INFO("master key is not active, need wait", K(tenant_id));
384
            usleep(SLEEP_US);
385
          } else {
386
            LOG_WARN("fail to get active master key", KR(ret), K(tenant_id));
387
          }
388
        } else {
389
          is_active = true;
390
        }
391
      }
392
    }
393
  }
394
#endif
395
  return ret;
396
}
397

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.