oceanbase
556 строк · 23.5 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14#include "ob_primary_ls_service.h"
15#include "lib/profile/ob_trace_id.h"
16#include "share/ob_errno.h"
17#include "share/ls/ob_ls_creator.h" //ObLSCreator
18#include "share/ls/ob_ls_life_manager.h"//ObLSLifeAgentManager
19#include "share/ls/ob_ls_table_operator.h"//ls_opt
20#include "share/ob_share_util.h"//ObShareUtil
21#include "observer/ob_server_struct.h"//GCTX
22#include "storage/tx_storage/ob_ls_service.h"
23#include "storage/tx_storage/ob_ls_handle.h" //ObLSHandle
24#include "logservice/palf/palf_base_info.h"//PalfBaseInfo
25#include "rootserver/ob_ls_service_helper.h"//ObTenantLSInfo
26#include "rootserver/ob_ls_recovery_reportor.h"//update_ls_recovery
27#include "rootserver/ob_tenant_info_loader.h"
28
29namespace oceanbase
30{
31using namespace common;
32using namespace share;
33using namespace transaction;
34using namespace palf;
35namespace rootserver
36{
37
38//////////////ObPrimaryLSService
39int ObPrimaryLSService::init()
40{
41int ret = OB_SUCCESS;
42tenant_id_ = MTL_ID();
43if (OB_UNLIKELY(inited_)) {
44ret = OB_INIT_TWICE;
45LOG_WARN("has inited", KR(ret));
46} else if (OB_FAIL(ObTenantThreadHelper::create("PLSSer",
47lib::TGDefIDs::SimpleLSService, *this))) {
48LOG_WARN("failed to create thread", KR(ret));
49} else if (OB_FAIL(ObTenantThreadHelper::start())) {
50LOG_WARN("fail to start", KR(ret));
51} else {
52inited_ = true;
53}
54return ret;
55}
56
57void ObPrimaryLSService::destroy()
58{
59ObTenantThreadHelper::destroy();
60tenant_id_ = OB_INVALID_TENANT_ID;
61inited_ = false;
62}
63
64void ObPrimaryLSService::do_work()
65{
66int ret = OB_SUCCESS;
67if (OB_UNLIKELY(!inited_)) {
68ret = OB_NOT_INIT;
69LOG_WARN("not init", K(ret));
70} else if (OB_FAIL(wait_tenant_schema_and_version_ready_(tenant_id_, DATA_VERSION_4_1_0_0))) {
71LOG_WARN("failed to wait tenant schema version ready", KR(ret), K(tenant_id_), K(DATA_CURRENT_VERSION));
72} else {
73int64_t idle_time_us = 1000 * 1000L;
74int tmp_ret = OB_SUCCESS;
75share::schema::ObTenantSchema tenant_schema;
76while (!has_set_stop()) {
77tenant_schema.reset();
78ObCurTraceId::init(GCONF.self_addr_);
79DEBUG_SYNC(STOP_PRIMARY_LS_THREAD);
80if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema))) {
81LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_));
82} else {
83if (OB_TMP_FAIL(process_all_ls(tenant_schema))) {
84ret = OB_SUCC(ret) ? tmp_ret : ret;
85LOG_WARN("failed to process user tenant thread0", KR(ret),
86KR(tmp_ret), K(tenant_id_));
87}
88if (OB_TMP_FAIL(process_all_ls_status_to_steady_(tenant_schema))) {
89ret = OB_SUCC(ret) ? tmp_ret : ret;
90LOG_WARN("failed to process user tenant thread1", KR(ret), KR(tmp_ret),
91K(tenant_id_));
92}
93}
94
95LOG_INFO("[PRIMARY_LS_SERVICE] finish one round", KR(ret), K(tenant_schema));
96tenant_schema.reset();
97idle(idle_time_us);
98}// end while
99}
100}
101
102
103int ObPrimaryLSService::process_all_ls(const share::schema::ObTenantSchema &tenant_schema)
104{
105int ret = OB_SUCCESS;
106const uint64_t tenant_id = tenant_schema.get_tenant_id();
107common::ObArray<ObLSStatusMachineParameter> machine_array;
108int64_t task_cnt = 0;
109if (OB_UNLIKELY(!inited_)) {
110ret = OB_NOT_INIT;
111LOG_WARN("not init", KR(ret));
112} else if (OB_UNLIKELY(!tenant_schema.is_valid())) {
113ret = OB_INVALID_ARGUMENT;
114LOG_WARN("tenant schema is invalid", KR(ret), K(tenant_schema));
115} else if (tenant_schema.is_creating()) {
116ret = OB_SCHEMA_EAGAIN;
117LOG_WARN("tenant schema not ready, no need process", KR(ret), K(tenant_schema));
118} else if (OB_FAIL(ObLSServiceHelper::construct_ls_status_machine(false, tenant_id,
119GCTX.sql_proxy_, machine_array))) {
120LOG_WARN("failed to construct ls status machine", KR(ret), K(tenant_id));
121} else if (tenant_schema.is_dropping()) {
122//if tenant schema is in dropping
123//set the creating ls to create_abort,
124//set the normal or dropping tenant to drop_tennat_pre
125if (OB_FAIL(set_tenant_dropping_status_(machine_array, task_cnt))) {
126LOG_WARN("failed to set tenant dropping status", KR(ret), K(task_cnt), K(machine_array));
127}
128}
129if (OB_SUCC(ret) && 0 == task_cnt) {
130if (OB_FAIL(try_set_next_ls_status_(machine_array))) {
131LOG_WARN("failed to set next ls status", KR(ret), K(machine_array));
132}
133}
134
135LOG_INFO("[PRIMARY_LS_SERVICE] finish process tenant",
136KR(ret), K(tenant_id), K(task_cnt), K(machine_array), K(tenant_schema));
137return ret;
138}
139
140int ObPrimaryLSService::set_tenant_dropping_status_(
141const common::ObIArray<ObLSStatusMachineParameter> &status_machine_array, int64_t &task_cnt)
142{
143int ret = OB_SUCCESS;
144ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
145if (OB_UNLIKELY(!inited_)) {
146ret = OB_NOT_INIT;
147LOG_WARN("not init", KR(ret));
148} else if (OB_ISNULL(tenant_info_loader)) {
149ret = OB_ERR_UNEXPECTED;
150LOG_WARN("tenant_info_loader is null", KR(ret), KP(tenant_info_loader));
151} else {
152share::ObLSAttrOperator ls_operator(MTL_ID(), GCTX.sql_proxy_);
153const ObTenantSwitchoverStatus working_sw_status = share::NORMAL_SWITCHOVER_STATUS;
154share::SCN tenant_sync_scn, sys_ls_target_scn;
155tenant_sync_scn.set_invalid();
156sys_ls_target_scn.set_invalid();
157for (int64_t i = 0; OB_SUCC(ret) && i < status_machine_array.count() && !has_set_stop(); ++i) {
158const share::ObLSAttr &attr = status_machine_array.at(i).ls_info_;
159if (attr.get_ls_id().is_sys_ls()) {
160if (attr.ls_is_normal()) {
161if (OB_FAIL(ls_operator.update_ls_status(attr.get_ls_id(),
162attr.get_ls_status(), share::OB_LS_PRE_TENANT_DROPPING, working_sw_status))) {
163LOG_WARN("failed to update ls status", KR(ret), K(attr));
164}
165task_cnt++;
166LOG_INFO("[PRIMARY_LS_SERVICE] set sys ls to pre tenant dropping", KR(ret), K(attr));
167}
168if (OB_FAIL(ret)) {
169} else if (!attr.ls_is_normal() && !attr.ls_is_pre_tenant_dropping()) {
170// if attr is normal, it means that the status has been switched to pre_tenant_dropping in this round
171// if attr is pre_tenant_dropping, it means that the status has been changed in a previous round
172// the other attr is tenant_dropping, we should skip checking
173} else if (OB_FAIL(ls_operator.get_pre_tenant_dropping_ora_rowscn(sys_ls_target_scn))) {
174LOG_WARN("fail to get sys_ls_end_scn", KR(ret), K(tenant_id_));
175}
176// find SYS LS
177break;
178}
179}//end for set sys ls change to pre tenant dropping
180
181//before check tenant_info sync scn larger than sys_ls pre tenant dropping scn
182//set creating ls to create_abort
183for (int64_t i = 0; OB_SUCC(ret) && i < status_machine_array.count() && !has_set_stop(); ++i) {
184const share::ObLSAttr &attr = status_machine_array.at(i).ls_info_;
185if (attr.ls_is_creating()) {
186task_cnt++;
187if (OB_FAIL(ls_operator.delete_ls(attr.get_ls_id(), attr.get_ls_status(), working_sw_status))) {
188LOG_WARN("failed to remove ls not normal", KR(ret), K(attr));
189}
190LOG_INFO("[PRIMARY_LS_SERVICE] tenant is dropping, delete ls in creating", KR(ret),
191K(attr));
192}
193}//end for process creating
194
195if (OB_SUCC(ret) && sys_ls_target_scn.is_valid()) {
196if (OB_FAIL(tenant_info_loader->get_sync_scn(tenant_sync_scn))) {
197LOG_WARN("get tenant_sync_scn failed", KR(ret));
198} else if (OB_UNLIKELY(!tenant_sync_scn.is_valid())) {
199ret = OB_ERR_UNEXPECTED;
200LOG_WARN("tenant_sync_scn not valid", KR(ret), K(tenant_sync_scn));
201} else if (tenant_sync_scn < sys_ls_target_scn) {
202ret = OB_NEED_WAIT;
203LOG_WARN("wait some time, tenant_sync_scn cannot be smaller than sys_ls_target_scn", KR(ret),
204K(tenant_id_), K(tenant_sync_scn), K(sys_ls_target_scn));
205}
206}
207for (int64_t i = 0; OB_SUCC(ret) && i < status_machine_array.count() && !has_set_stop(); ++i) {
208const share::ObLSAttr &attr = status_machine_array.at(i).ls_info_;
209if (OB_UNLIKELY(!attr.is_valid()) || attr.get_ls_id().is_sys_ls() || attr.ls_is_creating()) {
210// invalid attr might happens if the ls is deleted in __all_ls table but still exists in __all_ls_status table
211// no need process sys ls and creating ls
212} else if (!attr.ls_is_tenant_dropping()) {
213task_cnt++;
214//no matter the status is in normal or dropping
215//may be the status in status info is created
216if (OB_FAIL(ls_operator.update_ls_status(
217attr.get_ls_id(), attr.get_ls_status(),
218share::OB_LS_TENANT_DROPPING, working_sw_status))) {
219LOG_WARN("failed to update ls status", KR(ret), K(attr));
220}
221LOG_INFO("[PRIMARY_LS_SERVICE] set ls to tenant dropping", KR(ret), K(attr), K(i),
222K(tenant_sync_scn), K(sys_ls_target_scn));
223}
224}//end for
225}
226if (OB_SUCC(ret) && has_set_stop()) {
227ret = OB_IN_STOP_STATE;
228LOG_WARN("[PRIMARY_LS_SERVICE] thread stop", KR(ret));
229}
230return ret;
231}
232
233int ObPrimaryLSService::try_set_next_ls_status_(
234const common::ObIArray<ObLSStatusMachineParameter> &status_machine_array)
235{
236int ret = OB_SUCCESS;
237if (OB_UNLIKELY(!inited_)) {
238ret = OB_NOT_INIT;
239LOG_WARN("not init", KR(ret));
240} else {
241share::ObLSAttrOperator ls_operator(MTL_ID(), GCTX.sql_proxy_);
242const ObTenantSwitchoverStatus working_sw_status =
243share::NORMAL_SWITCHOVER_STATUS;
244for (int64_t i = 0; OB_SUCC(ret) && i < status_machine_array.count() && !has_set_stop(); ++i) {
245const ObLSStatusMachineParameter &machine = status_machine_array.at(i);
246const share::ObLSStatusInfo &status_info = machine.status_info_;
247const share::ObLSAttr &ls_info = machine.ls_info_;
248const uint64_t tenant_id = status_info.tenant_id_;
249if (OB_UNLIKELY(!machine.is_valid())) {
250ret = OB_INVALID_ARGUMENT;
251LOG_WARN("machine is invalid", KR(ret), K(machine));
252} else if (!ls_info.is_valid()) {
253if (status_info.ls_is_wait_offline()) {
254} else if (status_info.ls_is_create_abort()
255|| status_info.ls_is_creating()
256|| status_info.ls_is_created()) {
257//in switchover/failover, need create abort ls
258//in drop tenant, __all_ls will be deleted while status is creating
259} else {
260ret = OB_ERR_UNEXPECTED;
261LOG_WARN("status info is invalid", KR(ret), K(machine));
262}
263} else if (ls_info.ls_is_creating()) {
264if (status_info.ls_is_create_abort()) {
265//delete ls, the ls must is creating
266if (OB_FAIL(ls_operator.delete_ls(
267machine.ls_id_, share::OB_LS_CREATING, working_sw_status))) {
268LOG_WARN("failed to process creating info", KR(ret), K(machine));
269}
270} else if (status_info.ls_is_created()) {
271//set ls to normal
272if (OB_FAIL(ls_operator.update_ls_status(
273machine.ls_id_, ls_info.get_ls_status(), share::OB_LS_NORMAL, working_sw_status))) {
274LOG_WARN("failed to update ls status", KR(ret), K(machine));
275}
276} else if (status_info.ls_is_creating()) {
277} else {
278ret = OB_ERR_UNEXPECTED;
279LOG_WARN("status info is invalid", KR(ret), K(machine));
280}
281} else if (ls_info.ls_is_normal()) {
282if (status_info.ls_is_normal()) {
283} else if (status_info.ls_is_created()) {
284} else {
285ret = OB_ERR_UNEXPECTED;
286LOG_WARN("status info is invalid", KR(ret), K(machine));
287}
288} else if (ls_info.ls_is_dropping()) {
289if (!status_info.ls_is_dropping()) {
290} else if (OB_FAIL(try_delete_ls_(status_info))) {
291LOG_WARN("failed to try delete ls", KR(ret), K(status_info));
292}
293} else if (ls_info.ls_is_pre_tenant_dropping()) {
294if (!machine.ls_id_.is_sys_ls()) {
295ret = OB_ERR_UNEXPECTED;
296LOG_WARN("normal ls can not in pre tenant dropping status", KR(ret), K(machine));
297} else if (!status_info.ls_is_pre_tenant_dropping()) {
298} else if (OB_FAIL(sys_ls_tenant_drop_(status_info))) {
299LOG_WARN("failed to process sys ls", KR(ret), K(status_info));
300}
301} else if (ls_info.ls_is_tenant_dropping()) {
302if (!status_info.ls_is_tenant_dropping()) {
303// __all_ls_status should also be tenant_dropping to notify GC module to offline LS
304} else if (OB_FAIL(try_delete_ls_(status_info))) {
305LOG_WARN("failed to try delete ls", KR(ret), K(machine), K(status_info));
306}
307} else {
308//other status can not be in __all_ls
309//such as created, wait_offline
310ret = OB_ERR_UNEXPECTED;
311LOG_WARN("the ls not expected in all_ls", KR(ret), K(machine));
312}
313}
314}
315if (OB_SUCC(ret) && has_set_stop()) {
316ret = OB_IN_STOP_STATE;
317LOG_WARN("[PRIMARY_LS_SERVICE] thread stop", KR(ret));
318}
319return ret;
320}
321
322int ObPrimaryLSService::try_delete_ls_(const share::ObLSStatusInfo &status_info)
323{
324int ret = OB_SUCCESS;
325const int64_t start_time = ObTimeUtility::fast_current_time();
326bool can_offline = false;
327const ObTenantSwitchoverStatus working_sw_status = share::NORMAL_SWITCHOVER_STATUS;
328if (OB_UNLIKELY(!status_info.is_valid()
329|| (!status_info.ls_is_dropping() && !status_info.ls_is_tenant_dropping())
330|| (status_info.ls_id_.is_sys_ls() && !status_info.ls_is_tenant_dropping()))) {
331// SYS LS only can be in tenant_dropping, can not be in DROPPING
332ret = OB_INVALID_ARGUMENT;
333LOG_WARN("info not valid or not in dropping status or sys ls", KR(ret), K(status_info));
334} else {
335// send rpc to observer
336share::ObLSAttrOperator ls_operator(MTL_ID(), GCTX.sql_proxy_);
337if (OB_FAIL(check_ls_can_offline_by_rpc_(status_info, can_offline))) {
338LOG_WARN("failed to check ls can offline", KR(ret), K(status_info));
339} else if (can_offline) {
340// User LS should be deleted from __all_ls
341if (!status_info.ls_id_.is_sys_ls()) {
342if (OB_FAIL(ls_operator.delete_ls(status_info.ls_id_, status_info.status_, working_sw_status))) {
343LOG_WARN("failed to delete ls", KR(ret), K(status_info));
344}
345} else {
346// SYS LS can not be deleted from __all_ls, as SYS LS is blocked by GC module.
347// So, SYS LS should change __all_ls_status to WAIT_OFFLINE to end its status.
348if (OB_FAIL(ObLSServiceHelper::offline_ls(status_info.tenant_id_,
349status_info.ls_id_, status_info.status_, working_sw_status))) {
350LOG_WARN("failed to offline ls", KR(ret), K(status_info), K(working_sw_status));
351}
352}
353}
354}
355const int64_t cost = ObTimeUtility::fast_current_time() - start_time;
356LOG_INFO("[PRIMARY_LS_SERVICE] finish to try delete LS", KR(ret), K(status_info), K(cost), K(can_offline));
357return ret;
358}
359
360int ObPrimaryLSService::sys_ls_tenant_drop_(const share::ObLSStatusInfo &info)
361{
362int ret = OB_SUCCESS;
363const ObLSStatus target_status = share::OB_LS_TENANT_DROPPING;
364const ObLSStatus pre_status = share::OB_LS_PRE_TENANT_DROPPING;
365const ObTenantSwitchoverStatus working_sw_status = share::NORMAL_SWITCHOVER_STATUS;
366bool can_offline = false;
367if (OB_UNLIKELY(!info.is_valid()
368|| !info.ls_id_.is_sys_ls())) {
369ret = OB_INVALID_ARGUMENT;
370LOG_WARN("invalid argument", KR(ret), K(info));
371} else if (pre_status != info.status_) {
372ret = OB_ERR_UNEXPECTED;
373LOG_WARN("sys ls can not in other status", KR(ret), K(info));
374} else if (OB_FAIL(check_sys_ls_can_offline_(can_offline))) {
375LOG_WARN("failed to check sys ls can offline", KR(ret));
376} else if (can_offline) {
377share::ObLSAttrOperator ls_operator(MTL_ID(), GCTX.sql_proxy_);
378if (OB_FAIL(ls_operator.update_ls_status(info.ls_id_, pre_status, target_status, working_sw_status))) {
379LOG_WARN("failed to update ls status", KR(ret), K(info), K(pre_status), K(target_status));
380}
381}
382LOG_INFO("[PRIMARY_LS_SERVICE] set sys ls tenant dropping", KR(ret), K(info), K(can_offline));
383return ret;
384}
385
386int ObPrimaryLSService::check_sys_ls_can_offline_(bool &can_offline)
387{
388int ret = OB_SUCCESS;
389share::ObLSStatusInfoArray status_info_array;
390can_offline = true;
391const uint64_t tenant_id = MTL_ID();
392share::ObLSStatusOperator status_operator;
393if (OB_ISNULL(GCTX.sql_proxy_)) {
394ret = OB_ERR_UNEXPECTED;
395LOG_WARN("sql proxy is null", KR(ret));
396} else if (OB_FAIL(status_operator.get_all_ls_status_by_order(
397tenant_id, status_info_array, *GCTX.sql_proxy_))) {
398LOG_WARN("failed to get all ls status", KR(ret), K(tenant_id));
399} else if (0 == status_info_array.count()) {
400//sys ls not exist
401can_offline = true;
402}
403for (int64_t i = 0; OB_SUCC(ret) && i < status_info_array.count() && can_offline; ++i) {
404const share::ObLSStatusInfo &status_info = status_info_array.at(i);
405if (status_info.ls_id_.is_sys_ls()) {
406} else {
407can_offline = false;
408LOG_INFO("[PRIMARY_LS_SERVICE] sys ls can not offline", K(status_info));
409}
410}
411if (OB_SUCC(ret) && can_offline) {
412LOG_INFO("[PRIMARY_LS_SERVICE] sys ls can offline", K(status_info_array));
413}
414return ret;
415}
416
417int ObPrimaryLSService::check_ls_can_offline_by_rpc_(const share::ObLSStatusInfo &info, bool &can_offline)
418{
419int ret = OB_SUCCESS;
420ObAddr leader;
421if (OB_UNLIKELY(!info.is_valid())) {
422ret = OB_INVALID_ARGUMENT;
423LOG_WARN("info not valid", KR(ret), K(info));
424} else if (OB_ISNULL(GCTX.location_service_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) {
425ret = OB_ERR_UNEXPECTED;
426LOG_WARN("location service or proxy is null", KR(ret), KP(GCTX.location_service_),
427KP(GCTX.srv_rpc_proxy_));
428} else if (OB_FAIL(GCTX.location_service_->get_leader(GCONF.cluster_id, info.tenant_id_,
429info.ls_id_, false, leader))) {
430LOG_WARN("failed to get ls leader", KR(ret), K(info));
431} else {
432const int64_t timeout = GCONF.rpc_timeout;
433obrpc::ObCheckLSCanOfflineArg arg;
434can_offline = false;
435const uint64_t group_id = info.ls_is_tenant_dropping() ? OBCG_DBA_COMMAND : OBCG_DEFAULT;
436if (OB_FAIL(arg.init(info.tenant_id_, info.ls_id_, info.status_))) {
437LOG_WARN("failed to init arg", KR(ret), K(arg));
438} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader)
439.by(info.tenant_id_)
440.timeout(timeout)
441.group_id(group_id)
442.check_ls_can_offline(arg))) {
443can_offline = false;
444LOG_WARN("failed to check ls can offline", KR(ret), K(arg), K(info),
445K(timeout), K(leader));
446} else {
447can_offline = true;
448}
449}
450return ret;
451}
452
453int ObPrimaryLSService::process_all_ls_status_to_steady_(const share::schema::ObTenantSchema &tenant_schema)
454{
455int ret = OB_SUCCESS;
456if (!is_user_tenant(tenant_id_)) {
457ret = OB_ERR_UNEXPECTED;
458LOG_WARN("ls recovery thread must run on user tenant", KR(ret),
459K(tenant_id_));
460} else {
461ObTenantLSInfo tenant_info(GCTX.sql_proxy_, &tenant_schema, tenant_id_);
462if (OB_FAIL(ObLSServiceHelper::process_status_to_steady(false, share::NORMAL_SWITCHOVER_STATUS, tenant_info))) {
463LOG_WARN("failed to process status to steady", KR(ret));
464}
465}
466LOG_INFO("[PRIMARY_LS_SERVICE] finish process all ls status to steady", KR(ret), K(tenant_id_));
467return ret;
468}
469
470//the interface may reentry
471int ObPrimaryLSService::create_ls_for_create_tenant()
472{
473int ret = OB_SUCCESS;
474share::schema::ObTenantSchema tenant_schema;
475ObArray<ObZone> primary_zone;
476ObArray<share::ObSimpleUnitGroup> unit_group_array;
477share::ObLSAttrOperator ls_operator(tenant_id_, GCTX.sql_proxy_);
478if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema))) {
479LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_));
480} else if (!tenant_schema.is_creating()) {
481ret = OB_ERR_UNEXPECTED;
482LOG_WARN("only creating tenant can create user ls", KR(ret), K(tenant_schema));
483} else if (OB_FAIL(ObLSServiceHelper::get_primary_zone_unit_array(&tenant_schema,
484primary_zone, unit_group_array))) {
485LOG_WARN("failed to get primary zone unit array", KR(ret), K(tenant_schema));
486} else {
487// ensure __all_ls is emptry
488START_TRANSACTION(GCTX.sql_proxy_, tenant_id_)
489ObArray<share::ObLSAttr> ls_array;
490share::ObLSAttr sys_ls;
491if (FAILEDx(ls_operator.get_ls_attr(SYS_LS, true, trans, sys_ls))) {
492LOG_WARN("failed to get SYS_LS attr", KR(ret));
493} else if (OB_FAIL(ls_operator.get_all_ls_by_order(ls_array))) {
494LOG_WARN("failed to get all_ls by order", KR(ret));
495} else if (ls_array.count() > 1) {
496//nothing
497} else {
498uint64_t ls_group_id = OB_INVALID_ID;
499ObLSID ls_id;
500share::ObLSAttr new_ls;
501share::ObLSFlag flag;
502SCN create_scn;
503for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_array.count(); ++i) {
504if (unit_group_array.at(i).is_active()) {
505//create ls
506if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_group_id(GCTX.sql_proxy_, tenant_id_, ls_group_id))) {
507LOG_WARN("failed to fetch new LS group id", KR(ret), K(tenant_id_));
508}
509for (int64_t j = 0; OB_SUCC(ret) && j < primary_zone.count(); j++) {
510if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_id(GCTX.sql_proxy_, tenant_id_, ls_id))) {
511LOG_WARN("failed to fetch new LS id", KR(ret), K(tenant_id_));
512} else if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) {
513LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_));
514} else if (OB_FAIL(new_ls.init(ls_id, ls_group_id, flag, share::OB_LS_CREATING,
515share::OB_LS_OP_CREATE_PRE, create_scn))) {
516LOG_WARN("failed to init new operation", KR(ret), K(create_scn),
517K(ls_id), K(ls_group_id));
518} else if (OB_FAIL(ls_operator.insert_ls(
519new_ls, share::NORMAL_SWITCHOVER_STATUS, &trans))) {
520LOG_WARN("failed to insert new operation", KR(ret), K(new_ls));
521}
522}//end for each ls group
523}
524}//end for each unit group
525}
526END_TRANSACTION(trans)
527}
528return ret;
529}
530
531int ObPrimaryLSService::create_duplicate_ls()
532{
533int ret = OB_SUCCESS;
534share::ObLSAttrOperator ls_operator(tenant_id_, GCTX.sql_proxy_);
535share::ObLSID ls_id;
536SCN create_scn;
537const uint64_t ls_group_id = 0;
538share::ObLSAttr new_ls;
539ObLSFlag flag(ObLSFlag::DUPLICATE_FLAG);
540if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_id(GCTX.sql_proxy_, tenant_id_, ls_id))) {
541LOG_WARN("failed to fetch new LS id", KR(ret), K(tenant_id_));
542} else if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) {
543LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_));
544} else if (OB_FAIL(new_ls.init(ls_id, ls_group_id, flag, share::OB_LS_CREATING,
545share::OB_LS_OP_CREATE_PRE, create_scn))) {
546LOG_WARN("failed to init new operation", KR(ret), K(create_scn),
547K(ls_id), K(ls_group_id));
548} else if (OB_FAIL(ls_operator.insert_ls(
549new_ls, share::NORMAL_SWITCHOVER_STATUS))) {
550LOG_WARN("failed to insert new operation", KR(ret), K(new_ls));
551}
552LOG_INFO("[LS_MGR] create duplicate ls", KR(ret), K(new_ls));
553return ret;
554}
555}//end of rootserver
556}
557