oceanbase

Форк
0
/
ob_primary_ls_service.cpp 
556 строк · 23.5 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14
#include "ob_primary_ls_service.h"
15
#include "lib/profile/ob_trace_id.h"
16
#include "share/ob_errno.h"
17
#include "share/ls/ob_ls_creator.h" //ObLSCreator
18
#include "share/ls/ob_ls_life_manager.h"//ObLSLifeAgentManager
19
#include "share/ls/ob_ls_table_operator.h"//ls_opt
20
#include "share/ob_share_util.h"//ObShareUtil
21
#include "observer/ob_server_struct.h"//GCTX
22
#include "storage/tx_storage/ob_ls_service.h"
23
#include "storage/tx_storage/ob_ls_handle.h"  //ObLSHandle
24
#include "logservice/palf/palf_base_info.h"//PalfBaseInfo
25
#include "rootserver/ob_ls_service_helper.h"//ObTenantLSInfo
26
#include "rootserver/ob_ls_recovery_reportor.h"//update_ls_recovery
27
#include "rootserver/ob_tenant_info_loader.h"
28

29
namespace oceanbase
30
{
31
using namespace common;
32
using namespace share;
33
using namespace transaction;
34
using namespace palf;
35
namespace rootserver
36
{
37

38
//////////////ObPrimaryLSService
39
int ObPrimaryLSService::init()
40
{
41
  int ret = OB_SUCCESS;
42
  tenant_id_ = MTL_ID();
43
  if (OB_UNLIKELY(inited_)) {
44
    ret = OB_INIT_TWICE;
45
    LOG_WARN("has inited", KR(ret));
46
  } else if (OB_FAIL(ObTenantThreadHelper::create("PLSSer", 
47
          lib::TGDefIDs::SimpleLSService, *this))) {
48
    LOG_WARN("failed to create thread", KR(ret));
49
  } else if (OB_FAIL(ObTenantThreadHelper::start())) {
50
    LOG_WARN("fail to start", KR(ret));
51
  } else {
52
    inited_ = true;
53
  }
54
  return ret;
55
}
56

57
void ObPrimaryLSService::destroy()
58
{
59
  ObTenantThreadHelper::destroy();
60
  tenant_id_ = OB_INVALID_TENANT_ID;
61
  inited_ = false;
62
}
63

64
void ObPrimaryLSService::do_work()
65
{
66
  int ret = OB_SUCCESS;
67
  if (OB_UNLIKELY(!inited_)) {
68
    ret = OB_NOT_INIT;
69
    LOG_WARN("not init", K(ret));
70
  } else if (OB_FAIL(wait_tenant_schema_and_version_ready_(tenant_id_, DATA_VERSION_4_1_0_0))) {
71
    LOG_WARN("failed to wait tenant schema version ready", KR(ret), K(tenant_id_), K(DATA_CURRENT_VERSION));
72
  } else {
73
    int64_t idle_time_us = 1000 * 1000L;
74
    int tmp_ret = OB_SUCCESS;
75
    share::schema::ObTenantSchema tenant_schema;
76
    while (!has_set_stop()) {
77
      tenant_schema.reset();
78
      ObCurTraceId::init(GCONF.self_addr_);
79
      DEBUG_SYNC(STOP_PRIMARY_LS_THREAD);
80
      if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema))) {
81
        LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_));
82
      } else {
83
        if (OB_TMP_FAIL(process_all_ls(tenant_schema))) {
84
          ret = OB_SUCC(ret) ? tmp_ret : ret;
85
          LOG_WARN("failed to process user tenant thread0", KR(ret),
86
              KR(tmp_ret), K(tenant_id_));
87
        }
88
        if (OB_TMP_FAIL(process_all_ls_status_to_steady_(tenant_schema))) {
89
          ret = OB_SUCC(ret) ? tmp_ret : ret;
90
          LOG_WARN("failed to process user tenant thread1", KR(ret), KR(tmp_ret),
91
              K(tenant_id_));
92
        }
93
      }
94

95
      LOG_INFO("[PRIMARY_LS_SERVICE] finish one round", KR(ret), K(tenant_schema));
96
      tenant_schema.reset();
97
      idle(idle_time_us);
98
    }// end while
99
  }
100
}
101

102

103
int ObPrimaryLSService::process_all_ls(const share::schema::ObTenantSchema &tenant_schema)
104
{
105
  int ret = OB_SUCCESS;
106
  const uint64_t tenant_id = tenant_schema.get_tenant_id();
107
  common::ObArray<ObLSStatusMachineParameter> machine_array;
108
  int64_t task_cnt = 0;
109
  if (OB_UNLIKELY(!inited_)) {
110
    ret = OB_NOT_INIT;
111
    LOG_WARN("not init", KR(ret));
112
  } else if (OB_UNLIKELY(!tenant_schema.is_valid())) {
113
    ret = OB_INVALID_ARGUMENT;
114
    LOG_WARN("tenant schema is invalid", KR(ret), K(tenant_schema));
115
  } else if (tenant_schema.is_creating()) {
116
    ret = OB_SCHEMA_EAGAIN;
117
    LOG_WARN("tenant schema not ready, no need process", KR(ret), K(tenant_schema));
118
  } else if (OB_FAIL(ObLSServiceHelper::construct_ls_status_machine(false, tenant_id,
119
             GCTX.sql_proxy_, machine_array))) {
120
    LOG_WARN("failed to construct ls status machine", KR(ret), K(tenant_id));
121
  } else if (tenant_schema.is_dropping()) {
122
    //if tenant schema is in dropping
123
    //set the creating ls to create_abort,
124
    //set the normal or dropping tenant to drop_tennat_pre
125
    if (OB_FAIL(set_tenant_dropping_status_(machine_array, task_cnt))) {
126
      LOG_WARN("failed to set tenant dropping status", KR(ret), K(task_cnt), K(machine_array));
127
    }
128
  }
129
  if (OB_SUCC(ret) && 0 == task_cnt) {
130
    if (OB_FAIL(try_set_next_ls_status_(machine_array))) {
131
      LOG_WARN("failed to set next ls status", KR(ret), K(machine_array));
132
    }
133
  }
134

135
  LOG_INFO("[PRIMARY_LS_SERVICE] finish process tenant",
136
      KR(ret), K(tenant_id), K(task_cnt), K(machine_array), K(tenant_schema));
137
  return ret;
138
}
139

140
int ObPrimaryLSService::set_tenant_dropping_status_(
141
    const common::ObIArray<ObLSStatusMachineParameter> &status_machine_array, int64_t &task_cnt)
142
{
143
  int ret = OB_SUCCESS;
144
  ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
145
  if (OB_UNLIKELY(!inited_)) {
146
    ret = OB_NOT_INIT;
147
    LOG_WARN("not init", KR(ret));
148
  } else if (OB_ISNULL(tenant_info_loader)) {
149
    ret = OB_ERR_UNEXPECTED;
150
    LOG_WARN("tenant_info_loader is null", KR(ret), KP(tenant_info_loader));
151
  } else {
152
    share::ObLSAttrOperator ls_operator(MTL_ID(), GCTX.sql_proxy_);
153
    const ObTenantSwitchoverStatus working_sw_status = share::NORMAL_SWITCHOVER_STATUS;
154
    share::SCN tenant_sync_scn, sys_ls_target_scn;
155
    tenant_sync_scn.set_invalid();
156
    sys_ls_target_scn.set_invalid();
157
    for (int64_t i = 0; OB_SUCC(ret) && i < status_machine_array.count() && !has_set_stop(); ++i) {
158
      const share::ObLSAttr &attr = status_machine_array.at(i).ls_info_;
159
      if (attr.get_ls_id().is_sys_ls()) {
160
        if (attr.ls_is_normal()) {
161
          if (OB_FAIL(ls_operator.update_ls_status(attr.get_ls_id(),
162
          attr.get_ls_status(), share::OB_LS_PRE_TENANT_DROPPING, working_sw_status))) {
163
            LOG_WARN("failed to update ls status", KR(ret), K(attr));
164
          }
165
          task_cnt++;
166
          LOG_INFO("[PRIMARY_LS_SERVICE] set sys ls to pre tenant dropping", KR(ret), K(attr));
167
        }
168
        if (OB_FAIL(ret)) {
169
        } else if (!attr.ls_is_normal() && !attr.ls_is_pre_tenant_dropping()) {
170
          // if attr is normal, it means that the status has been switched to pre_tenant_dropping in this round
171
          // if attr is pre_tenant_dropping, it means that the status has been changed in a previous round
172
          // the other attr is tenant_dropping, we should skip checking
173
        } else if (OB_FAIL(ls_operator.get_pre_tenant_dropping_ora_rowscn(sys_ls_target_scn))) {
174
          LOG_WARN("fail to get sys_ls_end_scn", KR(ret), K(tenant_id_));
175
        }
176
        // find SYS LS
177
        break;
178
      }
179
    }//end for set sys ls change to pre tenant dropping
180

181
    //before check tenant_info sync scn larger than sys_ls pre tenant dropping scn
182
    //set creating ls to create_abort
183
    for (int64_t i = 0; OB_SUCC(ret) && i < status_machine_array.count() && !has_set_stop(); ++i) {
184
      const share::ObLSAttr &attr = status_machine_array.at(i).ls_info_;
185
      if (attr.ls_is_creating()) {
186
        task_cnt++;
187
        if (OB_FAIL(ls_operator.delete_ls(attr.get_ls_id(), attr.get_ls_status(), working_sw_status))) {
188
          LOG_WARN("failed to remove ls not normal", KR(ret), K(attr));
189
        }
190
        LOG_INFO("[PRIMARY_LS_SERVICE] tenant is dropping, delete ls in creating", KR(ret),
191
            K(attr));
192
      }
193
    }//end for process creating
194

195
    if (OB_SUCC(ret) && sys_ls_target_scn.is_valid()) {
196
      if (OB_FAIL(tenant_info_loader->get_sync_scn(tenant_sync_scn))) {
197
        LOG_WARN("get tenant_sync_scn failed", KR(ret));
198
      } else if (OB_UNLIKELY(!tenant_sync_scn.is_valid())) {
199
        ret = OB_ERR_UNEXPECTED;
200
        LOG_WARN("tenant_sync_scn not valid", KR(ret), K(tenant_sync_scn));
201
      } else if (tenant_sync_scn < sys_ls_target_scn) {
202
        ret = OB_NEED_WAIT;
203
        LOG_WARN("wait some time, tenant_sync_scn cannot be smaller than sys_ls_target_scn", KR(ret),
204
            K(tenant_id_), K(tenant_sync_scn), K(sys_ls_target_scn));
205
      }
206
    }
207
    for (int64_t i = 0; OB_SUCC(ret) && i < status_machine_array.count() && !has_set_stop(); ++i) {
208
      const share::ObLSAttr &attr = status_machine_array.at(i).ls_info_;
209
      if (OB_UNLIKELY(!attr.is_valid()) || attr.get_ls_id().is_sys_ls() || attr.ls_is_creating()) {
210
        // invalid attr might happens if the ls is deleted in __all_ls table but still exists in __all_ls_status table
211
        // no need process sys ls and creating ls
212
      } else if (!attr.ls_is_tenant_dropping()) {
213
        task_cnt++;
214
        //no matter the status is in normal or dropping
215
        //may be the status in status info is created
216
        if (OB_FAIL(ls_operator.update_ls_status(
217
                attr.get_ls_id(), attr.get_ls_status(),
218
                share::OB_LS_TENANT_DROPPING, working_sw_status))) {
219
          LOG_WARN("failed to update ls status", KR(ret), K(attr));
220
        }
221
        LOG_INFO("[PRIMARY_LS_SERVICE] set ls to tenant dropping", KR(ret), K(attr), K(i),
222
            K(tenant_sync_scn), K(sys_ls_target_scn));
223
      }
224
    }//end for
225
  }
226
  if (OB_SUCC(ret) && has_set_stop()) {
227
    ret = OB_IN_STOP_STATE;
228
    LOG_WARN("[PRIMARY_LS_SERVICE] thread stop", KR(ret));
229
  }
230
  return ret;
231
}
232

233
int ObPrimaryLSService::try_set_next_ls_status_(
234
    const common::ObIArray<ObLSStatusMachineParameter> &status_machine_array)
235
{
236
  int ret = OB_SUCCESS;
237
  if (OB_UNLIKELY(!inited_)) {
238
    ret = OB_NOT_INIT;
239
    LOG_WARN("not init", KR(ret));
240
  } else {
241
    share::ObLSAttrOperator ls_operator(MTL_ID(), GCTX.sql_proxy_);
242
    const ObTenantSwitchoverStatus working_sw_status =
243
        share::NORMAL_SWITCHOVER_STATUS;
244
    for (int64_t i = 0; OB_SUCC(ret) && i < status_machine_array.count() && !has_set_stop(); ++i) {
245
      const ObLSStatusMachineParameter &machine = status_machine_array.at(i);
246
      const share::ObLSStatusInfo &status_info = machine.status_info_;
247
      const share::ObLSAttr &ls_info =  machine.ls_info_;
248
      const uint64_t tenant_id = status_info.tenant_id_;
249
      if (OB_UNLIKELY(!machine.is_valid())) {
250
        ret = OB_INVALID_ARGUMENT;
251
        LOG_WARN("machine is invalid", KR(ret), K(machine));
252
      } else if (!ls_info.is_valid()) {
253
        if (status_info.ls_is_wait_offline()) {
254
        } else if (status_info.ls_is_create_abort()
255
            || status_info.ls_is_creating()
256
            || status_info.ls_is_created()) {
257
          //in switchover/failover, need create abort ls
258
          //in drop tenant, __all_ls will be deleted while status is creating
259
        } else {
260
          ret = OB_ERR_UNEXPECTED;
261
          LOG_WARN("status info is invalid", KR(ret), K(machine));
262
        }
263
      } else if (ls_info.ls_is_creating()) {
264
        if (status_info.ls_is_create_abort()) {
265
          //delete ls, the ls must is creating
266
          if (OB_FAIL(ls_operator.delete_ls(
267
                  machine.ls_id_, share::OB_LS_CREATING, working_sw_status))) {
268
            LOG_WARN("failed to process creating info", KR(ret), K(machine));
269
          }
270
        } else if (status_info.ls_is_created()) {
271
          //set ls to normal
272
          if (OB_FAIL(ls_operator.update_ls_status(
273
                  machine.ls_id_, ls_info.get_ls_status(), share::OB_LS_NORMAL, working_sw_status))) {
274
            LOG_WARN("failed to update ls status", KR(ret), K(machine));
275
          }
276
        } else if (status_info.ls_is_creating()) {
277
        } else {
278
          ret = OB_ERR_UNEXPECTED;
279
          LOG_WARN("status info is invalid", KR(ret), K(machine));
280
        }
281
      } else if (ls_info.ls_is_normal()) {
282
        if (status_info.ls_is_normal()) {
283
        } else if (status_info.ls_is_created()) {
284
        } else {
285
          ret = OB_ERR_UNEXPECTED;
286
          LOG_WARN("status info is invalid", KR(ret), K(machine));
287
        }
288
      } else if (ls_info.ls_is_dropping()) {
289
        if (!status_info.ls_is_dropping()) {
290
        } else if (OB_FAIL(try_delete_ls_(status_info))) {
291
          LOG_WARN("failed to try delete ls", KR(ret), K(status_info));
292
        }
293
      } else if (ls_info.ls_is_pre_tenant_dropping()) {
294
        if (!machine.ls_id_.is_sys_ls()) {
295
          ret = OB_ERR_UNEXPECTED;
296
          LOG_WARN("normal ls can not in pre tenant dropping status", KR(ret), K(machine));
297
        } else if (!status_info.ls_is_pre_tenant_dropping()) {
298
        } else if (OB_FAIL(sys_ls_tenant_drop_(status_info))) {
299
          LOG_WARN("failed to process sys ls", KR(ret), K(status_info));
300
        }
301
      } else if (ls_info.ls_is_tenant_dropping()) {
302
        if (!status_info.ls_is_tenant_dropping()) {
303
          // __all_ls_status should also be tenant_dropping to notify GC module to offline LS
304
        } else if (OB_FAIL(try_delete_ls_(status_info))) {
305
          LOG_WARN("failed to try delete ls", KR(ret), K(machine), K(status_info));
306
        }
307
      } else {
308
        //other status can not be in __all_ls
309
        //such as created, wait_offline
310
        ret = OB_ERR_UNEXPECTED;
311
        LOG_WARN("the ls not expected in all_ls", KR(ret), K(machine));
312
      }
313
    }
314
  }
315
  if (OB_SUCC(ret) && has_set_stop()) {
316
    ret = OB_IN_STOP_STATE;
317
    LOG_WARN("[PRIMARY_LS_SERVICE] thread stop", KR(ret));
318
  }
319
  return ret;
320
}
321

322
int ObPrimaryLSService::try_delete_ls_(const share::ObLSStatusInfo &status_info)
323
{
324
  int ret = OB_SUCCESS;
325
  const int64_t start_time = ObTimeUtility::fast_current_time();
326
  bool can_offline = false;
327
  const ObTenantSwitchoverStatus working_sw_status = share::NORMAL_SWITCHOVER_STATUS;
328
  if (OB_UNLIKELY(!status_info.is_valid()
329
      || (!status_info.ls_is_dropping() && !status_info.ls_is_tenant_dropping())
330
      || (status_info.ls_id_.is_sys_ls() && !status_info.ls_is_tenant_dropping()))) {
331
    // SYS LS only can be in tenant_dropping, can not be in DROPPING
332
    ret = OB_INVALID_ARGUMENT;
333
    LOG_WARN("info not valid or not in dropping status or sys ls", KR(ret), K(status_info));
334
  } else {
335
    // send rpc to observer
336
    share::ObLSAttrOperator ls_operator(MTL_ID(), GCTX.sql_proxy_);
337
    if (OB_FAIL(check_ls_can_offline_by_rpc_(status_info, can_offline))) {
338
      LOG_WARN("failed to check ls can offline", KR(ret), K(status_info));
339
    } else if (can_offline) {
340
      // User LS should be deleted from __all_ls
341
      if (!status_info.ls_id_.is_sys_ls()) {
342
        if (OB_FAIL(ls_operator.delete_ls(status_info.ls_id_, status_info.status_, working_sw_status))) {
343
          LOG_WARN("failed to delete ls", KR(ret), K(status_info));
344
        }
345
      } else {
346
        // SYS LS can not be deleted from __all_ls, as SYS LS is blocked by GC module.
347
        // So, SYS LS should change __all_ls_status to WAIT_OFFLINE to end its status.
348
        if (OB_FAIL(ObLSServiceHelper::offline_ls(status_info.tenant_id_,
349
            status_info.ls_id_, status_info.status_, working_sw_status))) {
350
          LOG_WARN("failed to offline ls", KR(ret), K(status_info), K(working_sw_status));
351
        }
352
      }
353
    }
354
  }
355
  const int64_t cost = ObTimeUtility::fast_current_time() - start_time;
356
  LOG_INFO("[PRIMARY_LS_SERVICE] finish to try delete LS", KR(ret), K(status_info), K(cost), K(can_offline));
357
  return ret;
358
}
359

360
int ObPrimaryLSService::sys_ls_tenant_drop_(const share::ObLSStatusInfo &info)
361
{
362
  int ret = OB_SUCCESS;
363
  const ObLSStatus target_status = share::OB_LS_TENANT_DROPPING;
364
  const ObLSStatus pre_status = share::OB_LS_PRE_TENANT_DROPPING;
365
  const ObTenantSwitchoverStatus working_sw_status = share::NORMAL_SWITCHOVER_STATUS;
366
  bool can_offline = false;
367
  if (OB_UNLIKELY(!info.is_valid()
368
                  || !info.ls_id_.is_sys_ls())) {
369
      ret = OB_INVALID_ARGUMENT;
370
      LOG_WARN("invalid argument", KR(ret), K(info));
371
  } else if (pre_status != info.status_) {
372
    ret = OB_ERR_UNEXPECTED;
373
    LOG_WARN("sys ls can not in other status", KR(ret), K(info));
374
  } else if (OB_FAIL(check_sys_ls_can_offline_(can_offline))) {
375
    LOG_WARN("failed to check sys ls can offline", KR(ret));
376
  } else if (can_offline) {
377
    share::ObLSAttrOperator ls_operator(MTL_ID(), GCTX.sql_proxy_);
378
    if (OB_FAIL(ls_operator.update_ls_status(info.ls_id_, pre_status, target_status, working_sw_status))) {
379
      LOG_WARN("failed to update ls status", KR(ret), K(info), K(pre_status), K(target_status));
380
    }
381
  }
382
  LOG_INFO("[PRIMARY_LS_SERVICE] set sys ls tenant dropping", KR(ret), K(info), K(can_offline));
383
  return ret;
384
}
385

386
int ObPrimaryLSService::check_sys_ls_can_offline_(bool &can_offline)
387
{
388
  int ret = OB_SUCCESS;
389
  share::ObLSStatusInfoArray status_info_array;
390
  can_offline = true;
391
  const uint64_t tenant_id = MTL_ID();
392
  share::ObLSStatusOperator status_operator;
393
  if (OB_ISNULL(GCTX.sql_proxy_)) {
394
    ret = OB_ERR_UNEXPECTED;
395
    LOG_WARN("sql proxy is null", KR(ret));
396
  } else if (OB_FAIL(status_operator.get_all_ls_status_by_order(
397
                 tenant_id, status_info_array, *GCTX.sql_proxy_))) {
398
    LOG_WARN("failed to get all ls status", KR(ret), K(tenant_id));
399
  } else if (0 == status_info_array.count()) {
400
    //sys ls not exist
401
    can_offline = true;
402
  }
403
  for (int64_t i = 0; OB_SUCC(ret) && i < status_info_array.count() && can_offline; ++i) {
404
    const share::ObLSStatusInfo &status_info = status_info_array.at(i);
405
    if (status_info.ls_id_.is_sys_ls()) {
406
    } else {
407
      can_offline = false;
408
      LOG_INFO("[PRIMARY_LS_SERVICE] sys ls can not offline", K(status_info));
409
    }
410
  }
411
  if (OB_SUCC(ret) && can_offline) {
412
    LOG_INFO("[PRIMARY_LS_SERVICE] sys ls can offline", K(status_info_array));
413
  }
414
  return ret;
415
}
416

417
int ObPrimaryLSService::check_ls_can_offline_by_rpc_(const share::ObLSStatusInfo &info, bool &can_offline)
418
{
419
  int ret = OB_SUCCESS;
420
  ObAddr leader;
421
  if (OB_UNLIKELY(!info.is_valid())) {
422
    ret = OB_INVALID_ARGUMENT;
423
    LOG_WARN("info not valid", KR(ret), K(info));
424
  } else if (OB_ISNULL(GCTX.location_service_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) {
425
    ret = OB_ERR_UNEXPECTED;
426
    LOG_WARN("location service or proxy is null", KR(ret), KP(GCTX.location_service_),
427
             KP(GCTX.srv_rpc_proxy_));
428
  } else if (OB_FAIL(GCTX.location_service_->get_leader(GCONF.cluster_id, info.tenant_id_,
429
             info.ls_id_, false, leader))) {
430
    LOG_WARN("failed to get ls leader", KR(ret), K(info));
431
  } else {
432
    const int64_t timeout = GCONF.rpc_timeout;
433
    obrpc::ObCheckLSCanOfflineArg arg;
434
    can_offline = false;
435
    const uint64_t group_id = info.ls_is_tenant_dropping() ? OBCG_DBA_COMMAND : OBCG_DEFAULT;
436
    if (OB_FAIL(arg.init(info.tenant_id_, info.ls_id_, info.status_))) {
437
      LOG_WARN("failed to init arg", KR(ret), K(arg));
438
    } else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader)
439
                           .by(info.tenant_id_)
440
                           .timeout(timeout)
441
                           .group_id(group_id)
442
                           .check_ls_can_offline(arg))) {
443
      can_offline = false;
444
      LOG_WARN("failed to check ls can offline", KR(ret), K(arg), K(info),
445
               K(timeout), K(leader));
446
    } else {
447
      can_offline = true;
448
    }
449
  }
450
  return ret;
451
}
452

453
int ObPrimaryLSService::process_all_ls_status_to_steady_(const share::schema::ObTenantSchema &tenant_schema)
454
{
455
  int ret = OB_SUCCESS;
456
  if (!is_user_tenant(tenant_id_)) {
457
    ret = OB_ERR_UNEXPECTED;
458
    LOG_WARN("ls recovery thread must run on user tenant", KR(ret),
459
             K(tenant_id_));
460
  } else {
461
    ObTenantLSInfo tenant_info(GCTX.sql_proxy_, &tenant_schema, tenant_id_);
462
    if (OB_FAIL(ObLSServiceHelper::process_status_to_steady(false, share::NORMAL_SWITCHOVER_STATUS, tenant_info))) {
463
      LOG_WARN("failed to process status to steady", KR(ret));
464
    }
465
  }
466
  LOG_INFO("[PRIMARY_LS_SERVICE] finish process all ls status to steady", KR(ret), K(tenant_id_));
467
  return ret;
468
}
469

470
//the interface may reentry
471
int ObPrimaryLSService::create_ls_for_create_tenant()
472
{
473
  int ret = OB_SUCCESS;
474
  share::schema::ObTenantSchema tenant_schema;
475
  ObArray<ObZone> primary_zone;
476
  ObArray<share::ObSimpleUnitGroup> unit_group_array;
477
  share::ObLSAttrOperator ls_operator(tenant_id_, GCTX.sql_proxy_);
478
  if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema))) {
479
    LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_));
480
  } else if (!tenant_schema.is_creating()) {
481
    ret = OB_ERR_UNEXPECTED;
482
    LOG_WARN("only creating tenant can create user ls", KR(ret), K(tenant_schema));
483
  } else if (OB_FAIL(ObLSServiceHelper::get_primary_zone_unit_array(&tenant_schema,
484
          primary_zone, unit_group_array))) {
485
    LOG_WARN("failed to get primary zone unit array", KR(ret), K(tenant_schema));
486
  } else {
487
    // ensure __all_ls is emptry
488
    START_TRANSACTION(GCTX.sql_proxy_, tenant_id_)
489
    ObArray<share::ObLSAttr> ls_array;
490
    share::ObLSAttr sys_ls;
491
    if (FAILEDx(ls_operator.get_ls_attr(SYS_LS, true, trans, sys_ls))) {
492
      LOG_WARN("failed to get SYS_LS attr", KR(ret));
493
    } else if (OB_FAIL(ls_operator.get_all_ls_by_order(ls_array))) {
494
      LOG_WARN("failed to get all_ls by order", KR(ret));
495
    } else if (ls_array.count() > 1) {
496
      //nothing
497
    } else {
498
      uint64_t ls_group_id = OB_INVALID_ID;
499
      ObLSID ls_id;
500
      share::ObLSAttr new_ls;
501
      share::ObLSFlag flag;
502
      SCN create_scn;
503
      for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_array.count(); ++i) {
504
        if (unit_group_array.at(i).is_active()) {
505
          //create ls
506
          if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_group_id(GCTX.sql_proxy_, tenant_id_, ls_group_id))) {
507
            LOG_WARN("failed to fetch new LS group id", KR(ret), K(tenant_id_));
508
          }
509
          for (int64_t j = 0; OB_SUCC(ret) && j < primary_zone.count(); j++) {
510
            if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_id(GCTX.sql_proxy_, tenant_id_, ls_id))) {
511
              LOG_WARN("failed to fetch new LS id", KR(ret), K(tenant_id_));
512
            } else if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) {
513
              LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_));
514
            } else if (OB_FAIL(new_ls.init(ls_id, ls_group_id, flag, share::OB_LS_CREATING,
515
                           share::OB_LS_OP_CREATE_PRE, create_scn))) {
516
              LOG_WARN("failed to init new operation", KR(ret), K(create_scn),
517
                       K(ls_id), K(ls_group_id));
518
            } else if (OB_FAIL(ls_operator.insert_ls(
519
                           new_ls, share::NORMAL_SWITCHOVER_STATUS, &trans))) {
520
              LOG_WARN("failed to insert new operation", KR(ret), K(new_ls));
521
            }
522
          }//end for each ls group
523
        }
524
      }//end for each unit group
525
    }
526
    END_TRANSACTION(trans)
527
  }
528
  return ret;
529
}
530

531
int ObPrimaryLSService::create_duplicate_ls()
532
{
533
  int ret = OB_SUCCESS;
534
  share::ObLSAttrOperator ls_operator(tenant_id_, GCTX.sql_proxy_);
535
  share::ObLSID ls_id;
536
  SCN create_scn;
537
  const uint64_t ls_group_id = 0;
538
  share::ObLSAttr new_ls;
539
  ObLSFlag flag(ObLSFlag::DUPLICATE_FLAG);
540
  if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_id(GCTX.sql_proxy_, tenant_id_, ls_id))) {
541
    LOG_WARN("failed to fetch new LS id", KR(ret), K(tenant_id_));
542
  } else if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) {
543
    LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_));
544
  } else if (OB_FAIL(new_ls.init(ls_id, ls_group_id, flag, share::OB_LS_CREATING,
545
                                 share::OB_LS_OP_CREATE_PRE, create_scn))) {
546
    LOG_WARN("failed to init new operation", KR(ret), K(create_scn),
547
             K(ls_id), K(ls_group_id));
548
  } else if (OB_FAIL(ls_operator.insert_ls(
549
              new_ls, share::NORMAL_SWITCHOVER_STATUS))) {
550
    LOG_WARN("failed to insert new operation", KR(ret), K(new_ls));
551
  }
552
  LOG_INFO("[LS_MGR] create duplicate ls", KR(ret), K(new_ls));
553
  return ret;
554
}
555
}//end of rootserver
556
}
557

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.