oceanbase
396 строк · 16.8 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS_RESTORE
14
15#include "ob_restore_common_util.h"
16#include "share/ls/ob_ls_status_operator.h" //ObLSStatusOperator
17#include "share/ls/ob_ls_operator.h"//ObLSAttr
18#include "rootserver/ob_ls_service_helper.h"
19#include "rootserver/ob_tenant_role_transition_service.h"
20#include "src/share/ob_schema_status_proxy.h"
21#include "src/share/ob_rpc_struct.h"
22#include "rootserver/ob_ddl_service.h"
23#ifdef OB_BUILD_TDE_SECURITY
24#include "share/ob_master_key_getter.h"
25#endif
26
27using namespace oceanbase::share::schema;
28using namespace oceanbase::rootserver;
29using namespace oceanbase::share;
30using namespace oceanbase::common;
31
32int ObRestoreCommonUtil::notify_root_key(
33obrpc::ObSrvRpcProxy *srv_rpc_proxy_,
34common::ObMySQLProxy *sql_proxy_,
35const uint64_t tenant_id,
36const share::ObRootKey &root_key)
37{
38int ret = OB_SUCCESS;
39#ifdef OB_BUILD_TDE_SECURITY
40if (OB_UNLIKELY(!is_user_tenant(tenant_id))) {
41ret = OB_INVALID_ARGUMENT;
42LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
43} else if (OB_UNLIKELY(obrpc::RootKeyType::INVALID == root_key.key_type_)) {
44ret = OB_INVALID_ARGUMENT;
45LOG_WARN("invalid root_key", KR(ret), K(tenant_id));
46} else if (OB_ISNULL(srv_rpc_proxy_) || OB_ISNULL(sql_proxy_)) {
47ret = OB_ERR_UNEXPECTED;
48LOG_WARN("unexpected null svr rpc proxy or sql proxy", KR(ret),
49KP(srv_rpc_proxy_), KP(sql_proxy_));
50} else {
51obrpc::ObRootKeyArg arg;
52obrpc::ObRootKeyResult result;
53ObUnitTableOperator unit_operator;
54ObArray<ObUnit> units;
55ObArray<ObAddr> addrs;
56arg.tenant_id_ = tenant_id;
57arg.is_set_ = true;
58arg.key_type_ = root_key.key_type_;
59arg.root_key_ = root_key.key_;
60if (OB_FAIL(unit_operator.init(*sql_proxy_))) {
61LOG_WARN("failed to init unit operator", KR(ret));
62} else if (OB_FAIL(unit_operator.get_units_by_tenant(tenant_id, units))) {
63LOG_WARN("failed to get tenant unit", KR(ret), K(tenant_id));
64}
65for (int64_t i = 0; OB_SUCC(ret) && i < units.count(); i++) {
66const ObUnit &unit = units.at(i);
67if (OB_FAIL(addrs.push_back(unit.server_))) {
68LOG_WARN("failed to push back addr", KR(ret));
69}
70}
71if (OB_FAIL(ret)) {
72} else if (OB_FAIL(ObDDLService::notify_root_key(*srv_rpc_proxy_, arg, addrs, result))) {
73LOG_WARN("failed to notify root key", KR(ret));
74}
75}
76#endif
77return ret;
78}
79
80int ObRestoreCommonUtil::create_all_ls(
81common::ObMySQLProxy *sql_proxy,
82const uint64_t tenant_id,
83const share::schema::ObTenantSchema &tenant_schema,
84const common::ObIArray<share::ObLSAttr> &ls_attr_array,
85const uint64_t source_tenant_id)
86{
87int ret = OB_SUCCESS;
88ObLSStatusOperator status_op;
89ObLSStatusInfo status_info;
90if (OB_UNLIKELY(!is_user_tenant(tenant_id)
91|| !tenant_schema.is_valid()
92|| ls_attr_array.empty()
93|| OB_ISNULL(sql_proxy))) {
94ret = OB_INVALID_ARGUMENT;
95LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(tenant_schema),
96K(ls_attr_array), KP(sql_proxy));
97} else {
98common::ObMySQLTransaction trans;
99const int64_t exec_tenant_id = ObLSLifeIAgent::get_exec_tenant_id(tenant_id);
100
101if (OB_FAIL(trans.start(sql_proxy, exec_tenant_id))) {
102LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
103} else {
104//must be in trans
105//Multiple LS groups will be created here.
106//In order to ensure that each LS group can be evenly distributed in the unit group,
107//it is necessary to read the distribution of LS groups within the transaction.
108ObTenantLSInfo tenant_stat(sql_proxy, &tenant_schema, tenant_id, &trans);
109for (int64_t i = 0; OB_SUCC(ret) && i < ls_attr_array.count(); ++i) {
110const ObLSAttr &ls_info = ls_attr_array.at(i);
111ObLSFlag ls_flag = ls_info.get_ls_flag();
112if (ls_info.get_ls_id().is_sys_ls()) {
113} else if (OB_SUCC(status_op.get_ls_status_info(tenant_id, ls_info.get_ls_id(),
114status_info, trans))) {
115LOG_INFO("[RESTORE] ls already exist", K(ls_info), K(tenant_id));
116} else if (OB_ENTRY_NOT_EXIST != ret) {
117LOG_WARN("failed to get ls status info", KR(ret), K(tenant_id), K(ls_info));
118} else if (OB_FAIL(ObLSServiceHelper::create_new_ls_in_trans(
119ls_info.get_ls_id(), ls_info.get_ls_group_id(), ls_info.get_create_scn(),
120share::NORMAL_SWITCHOVER_STATUS, tenant_stat, trans, ls_flag, source_tenant_id))) {
121LOG_WARN("failed to add new ls status info", KR(ret), K(ls_info), K(source_tenant_id));
122}
123LOG_INFO("create init ls", KR(ret), K(ls_info), K(source_tenant_id));
124}
125}
126int tmp_ret = OB_SUCCESS;
127if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
128ret = OB_SUCC(ret) ? tmp_ret : ret;
129LOG_WARN("failed to end trans", KR(ret), KR(tmp_ret));
130}
131}
132return ret;
133}
134
135int ObRestoreCommonUtil::finish_create_ls(
136common::ObMySQLProxy *sql_proxy,
137const share::schema::ObTenantSchema &tenant_schema,
138const common::ObIArray<share::ObLSAttr> &ls_attr_array)
139{
140int ret = OB_SUCCESS;
141if (OB_UNLIKELY(!tenant_schema.is_valid()
142|| OB_ISNULL(sql_proxy))) {
143ret = OB_INVALID_ARGUMENT;
144LOG_WARN("invalid argument", KR(ret), K(tenant_schema), KP(sql_proxy));
145} else {
146const uint64_t tenant_id = tenant_schema.get_tenant_id();
147const int64_t exec_tenant_id = ObLSLifeIAgent::get_exec_tenant_id(tenant_id);
148common::ObMySQLTransaction trans;
149ObLSStatusOperator status_op;
150ObLSStatusInfoArray ls_array;
151ObLSStatus ls_info = share::OB_LS_EMPTY;//ls status in __all_ls
152if (OB_FAIL(status_op.get_all_ls_status_by_order(tenant_id, ls_array,
153*sql_proxy))) {
154LOG_WARN("failed to get all ls status", KR(ret), K(tenant_id));
155} else if (OB_FAIL(trans.start(sql_proxy, exec_tenant_id))) {
156LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
157} else {
158for (int64_t i = 0; OB_SUCC(ret) && i < ls_array.count(); ++i) {
159const ObLSStatusInfo &status_info = ls_array.at(i);
160if (OB_UNLIKELY(status_info.ls_is_creating())) {
161ret = OB_ERR_UNEXPECTED;
162LOG_WARN("ls should be created", KR(ret), K(status_info));
163} else {
164ret = OB_ENTRY_NOT_EXIST;
165for (int64_t j = 0; OB_ENTRY_NOT_EXIST == ret && j < ls_attr_array.count(); ++j) {
166if (ls_attr_array.at(i).get_ls_id() == status_info.ls_id_) {
167ret = OB_SUCCESS;
168ls_info = ls_attr_array.at(i).get_ls_status();
169}
170}
171if (OB_FAIL(ret)) {
172LOG_WARN("failed to find ls in attr", KR(ret), K(status_info), K(ls_attr_array));
173} else if (share::OB_LS_CREATING == ls_info) {
174//no need to update
175} else if (ls_info == status_info.status_) {
176//no need update
177} else if (OB_FAIL(status_op.update_ls_status_in_trans(
178tenant_id, status_info.ls_id_, status_info.status_,
179ls_info, share::NORMAL_SWITCHOVER_STATUS, trans))) {
180LOG_WARN("failed to update status", KR(ret), K(tenant_id), K(status_info), K(ls_info));
181} else {
182LOG_INFO("[RESTORE] update ls status", K(tenant_id), K(status_info), K(ls_info));
183}
184}
185}
186}
187int tmp_ret = OB_SUCCESS;
188if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
189ret = OB_SUCC(ret) ? tmp_ret : ret;
190LOG_WARN("failed to end trans", KR(ret), KR(tmp_ret));
191}
192}
193return ret;
194}
195
196int ObRestoreCommonUtil::try_update_tenant_role(common::ObMySQLProxy *sql_proxy,
197const uint64_t tenant_id,
198const share::SCN &restore_scn,
199const bool is_clone,
200bool &sync_satisfied)
201{
202int ret = OB_SUCCESS;
203sync_satisfied = true;
204ObAllTenantInfo all_tenant_info;
205int64_t new_switch_ts = 0;
206bool need_update = false;
207
208if (OB_UNLIKELY(!is_user_tenant(tenant_id)
209|| OB_ISNULL(sql_proxy)
210|| OB_ISNULL(GCTX.srv_rpc_proxy_))) {
211ret = OB_ERR_UNEXPECTED;
212LOG_WARN("not user tenant or proxy is null", KR(ret), K(tenant_id),
213KP(sql_proxy), KP(GCTX.srv_rpc_proxy_));
214} else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, sql_proxy,
215false/*for_update*/, all_tenant_info))) {
216LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id));
217} else if (!is_clone && all_tenant_info.is_restore()) {
218need_update = true;
219} else if (is_clone && all_tenant_info.is_clone()) {
220need_update = true;
221}
222
223if (OB_SUCC(ret) && need_update) {
224//update tenant role to standby tenant
225if (all_tenant_info.get_sync_scn() != restore_scn) {
226sync_satisfied = false;
227LOG_WARN("tenant sync scn not equal to restore scn", KR(ret),
228K(all_tenant_info), K(restore_scn));
229} else if (OB_FAIL(ObAllTenantInfoProxy::update_tenant_role(
230tenant_id, sql_proxy, all_tenant_info.get_switchover_epoch(),
231share::STANDBY_TENANT_ROLE, all_tenant_info.get_switchover_status(),
232share::NORMAL_SWITCHOVER_STATUS, new_switch_ts))) {
233LOG_WARN("failed to update tenant role", KR(ret), K(tenant_id), K(all_tenant_info));
234} else {
235ObTenantRoleTransitionService role_transition_service(tenant_id, sql_proxy,
236GCTX.srv_rpc_proxy_, obrpc::ObSwitchTenantArg::OpType::INVALID);
237(void)role_transition_service.broadcast_tenant_info(
238ObTenantRoleTransitionConstants::RESTORE_TO_STANDBY_LOG_MOD_STR);
239}
240}
241return ret;
242}
243
244int ObRestoreCommonUtil::process_schema(common::ObMySQLProxy *sql_proxy,
245const uint64_t tenant_id)
246{
247int ret = OB_SUCCESS;
248
249if (OB_UNLIKELY(!is_user_tenant(tenant_id))) {
250ret = OB_INVALID_ARGUMENT;
251LOG_WARN("invalid argument", KR(ret), K(tenant_id));
252} else if (OB_ISNULL(sql_proxy)) {
253ret = OB_ERR_UNEXPECTED;
254LOG_WARN("sql proxy is null", KR(ret), KP(sql_proxy));
255} else {
256//reset schema status
257ObSchemaStatusProxy proxy(*sql_proxy);
258ObRefreshSchemaStatus schema_status(tenant_id, OB_INVALID_TIMESTAMP, OB_INVALID_VERSION);
259if (OB_FAIL(proxy.init())) {
260LOG_WARN("failed to init schema proxy", KR(ret));
261} else if (OB_FAIL(proxy.set_tenant_schema_status(schema_status))) {
262LOG_WARN("failed to update schema status", KR(ret), K(schema_status));
263}
264}
265
266if (OB_SUCC(ret)) {
267obrpc::ObBroadcastSchemaArg arg;
268arg.tenant_id_ = tenant_id;
269if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) {
270ret = OB_ERR_UNEXPECTED;
271LOG_WARN("rs_rpc_proxy_ or rs_mgr_is null", KR(ret), KP(GCTX.rs_rpc_proxy_), KP(GCTX.rs_mgr_));
272} else if (OB_FAIL(GCTX.rs_rpc_proxy_->to_rs(*GCTX.rs_mgr_).broadcast_schema(arg))) {
273LOG_WARN("failed to broadcast schema", KR(ret), K(arg));
274}
275}
276
277return ret;
278}
279
280int ObRestoreCommonUtil::check_tenant_is_existed(ObMultiVersionSchemaService *schema_service,
281const uint64_t tenant_id,
282bool &is_existed)
283{
284int ret = OB_SUCCESS;
285is_existed = true;
286ObSchemaGetterGuard schema_guard;
287bool tenant_dropped = false;
288
289if (OB_INVALID_TENANT_ID == tenant_id) {
290//maybe failed to create tenant
291is_existed = false;
292LOG_INFO("tenant maybe failed to create", KR(ret));
293} else if (OB_ISNULL(schema_service)) {
294ret = OB_ERR_UNEXPECTED;
295LOG_WARN("schema service is null", KR(ret), KP(schema_service));
296} else if (OB_FAIL(schema_service->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
297LOG_WARN("fail to get tenant schema guard", KR(ret));
298} else if (OB_SUCCESS != schema_guard.check_formal_guard()) {
299ret = OB_SCHEMA_ERROR;
300LOG_WARN("failed to check formal gurad", KR(ret));
301} else if (OB_FAIL(schema_guard.check_if_tenant_has_been_dropped(tenant_id, tenant_dropped))) {
302LOG_WARN("failed to check tenant is beed dropped", KR(ret), K(tenant_id));
303} else if (tenant_dropped) {
304is_existed = false;
305LOG_INFO("restore tenant has been dropped", KR(ret), K(tenant_id));
306} else {
307//check restore tenant's meta tenant is valid to read
308const share::schema::ObTenantSchema *tenant_schema = NULL;
309const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
310if (OB_FAIL(schema_guard.get_tenant_info(meta_tenant_id, tenant_schema))) {
311LOG_WARN("failed to get tenant info", KR(ret), K(meta_tenant_id));
312} else if (OB_ISNULL(tenant_schema)) {
313ret = OB_TENANT_NOT_EXIST;
314LOG_WARN("tenant not exist", KR(ret), K(meta_tenant_id));
315} else if (tenant_schema->is_normal()) {
316is_existed = true;
317} else {
318//other status cannot get result from meta
319is_existed = false;
320LOG_WARN("meta tenant of restore tenant not normal", KR(ret), KPC(tenant_schema));
321}
322}
323return ret;
324}
325
326int ObRestoreCommonUtil::set_tde_parameters(common::ObMySQLProxy *sql_proxy,
327obrpc::ObCommonRpcProxy *rpc_proxy,
328const uint64_t tenant_id,
329const ObString &tde_method,
330const ObString &kms_info)
331{
332int ret = OB_SUCCESS;
333#ifdef OB_BUILD_TDE_SECURITY
334ObSqlString sql;
335int64_t affected_row = 0;
336if (OB_UNLIKELY(!is_user_tenant(tenant_id)
337|| !ObTdeMethodUtil::is_valid(tde_method)
338|| NULL == sql_proxy
339|| NULL == rpc_proxy)) {
340ret = OB_INVALID_ARGUMENT;
341LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(tde_method), KP(sql_proxy), KP(rpc_proxy));
342} else if (OB_FAIL(sql.assign_fmt("ALTER SYSTEM SET tde_method = '%.*s'",
343tde_method.length(), tde_method.ptr()))) {
344LOG_WARN("failed to assign fmt", KR(ret), K(tde_method));
345} else if (OB_FAIL(sql_proxy->write(tenant_id, sql.ptr(), affected_row))) {
346LOG_WARN("failed to execute", KR(ret), K(tenant_id), K(sql));
347} else if (ObTdeMethodUtil::is_internal(tde_method)) {
348// do nothing
349} else if (FALSE_IT(sql.reset())) {
350} else if (OB_UNLIKELY(kms_info.empty())) {
351ret = OB_INVALID_ARGUMENT;
352LOG_WARN("kms_info should not be empty", KR(ret));
353} else if (OB_FAIL(sql.assign_fmt("ALTER SYSTEM SET external_kms_info= '%.*s'",
354kms_info.length(), kms_info.ptr()))) {
355LOG_WARN("failed to assign fmt", KR(ret));
356} else if (OB_FAIL(sql_proxy->write(tenant_id, sql.ptr(), affected_row))) {
357LOG_WARN("failed to execute", KR(ret), K(tenant_id));
358}
359if (OB_SUCC(ret)) {
360const int64_t DEFAULT_TIMEOUT = GCONF.internal_sql_execute_timeout;
361obrpc::ObReloadMasterKeyArg arg;
362obrpc::ObReloadMasterKeyResult result;
363arg.tenant_id_ = tenant_id;
364if (OB_FAIL(rpc_proxy->timeout(DEFAULT_TIMEOUT).reload_master_key(arg, result))) {
365LOG_WARN("fail to reload master key", KR(ret), K(arg), K(DEFAULT_TIMEOUT));
366} else if (result.master_key_id_ > 0 ) {
367bool is_active = false;
368const int64_t SLEEP_US = 5 * 1000 * 1000L; // 5s
369const int64_t MAX_WAIT_US = 60 * 1000 * 1000L; // 60s
370const int64_t start = ObTimeUtility::current_time();
371char master_key[OB_MAX_MASTER_KEY_LENGTH] = {'\0'};
372int64_t master_key_len = 0;
373uint64_t master_key_id = 0;
374while (OB_SUCC(ret) && !is_active) {
375if (ObTimeUtility::current_time() - start > MAX_WAIT_US) {
376ret = OB_TIMEOUT;
377LOG_WARN("use too much time", KR(ret), "cost_us", ObTimeUtility::current_time() - start);
378} else if (OB_FAIL(ObMasterKeyGetter::get_active_master_key(tenant_id, master_key,
379OB_MAX_MASTER_KEY_LENGTH,
380master_key_len, master_key_id))) {
381if (OB_KEYSTORE_OPEN_NO_MASTER_KEY == ret) {
382ret = OB_SUCCESS;
383LOG_INFO("master key is not active, need wait", K(tenant_id));
384usleep(SLEEP_US);
385} else {
386LOG_WARN("fail to get active master key", KR(ret), K(tenant_id));
387}
388} else {
389is_active = true;
390}
391}
392}
393}
394#endif
395return ret;
396}
397