oceanbase
889 строк · 36.8 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12#define USING_LOG_PREFIX BALANCE
13#include "rootserver/ob_ls_balance_helper.h"
14#include "rootserver/ob_primary_ls_service.h"//fetch max ls id
15#include "lib/mysqlclient/ob_mysql_transaction.h"//trans
16#include "observer/ob_server_struct.h"//GCTX
17#include "share/schema/ob_schema_getter_guard.h"//ObSchemaGetGuard
18#include "share/schema/ob_multi_version_schema_service.h"//ObMultiSchemaService
19#include "share/schema/ob_table_schema.h"//ObTableSchema
20#include "share/ob_balance_define.h" // ObBalanceTaskID, ObBalanceJobID
21#include "storage/tx/ob_unique_id_service.h" // ObUniqueIDService
22#include "storage/ob_common_id_utils.h" // ObCommonIDUtils
23#include "ob_ls_balance_helper.h"
24
25#define ISTAT(fmt, args...) FLOG_INFO("[LS_BALANCE] " fmt, ##args)
26#define WSTAT(fmt, args...) FLOG_WARN("[LS_BALANCE] " fmt, ##args)
27
28namespace oceanbase
29{
30using namespace share;
31namespace rootserver
32{
33//////ObUnitGroupBalanceInfo
34void ObUnitGroupBalanceInfo::reset()
35{
36primary_zone_count_ = OB_INVALID_COUNT;
37unit_group_.reset();
38redundant_ls_array_.reset();
39normal_ls_array_.reset();
40}
41
42int ObUnitGroupBalanceInfo::add_ls_status_info(const ObLSStatusInfo &ls_info)
43{
44int ret = OB_SUCCESS;
45//TODO has ls group id not match
46if (OB_UNLIKELY(!ls_info.is_valid())) {
47ret = OB_INVALID_ARGUMENT;
48LOG_WARN("invalid argument", KR(ret), K(ls_info));
49} else if (normal_ls_array_.count() >= primary_zone_count_
50|| !is_active_unit_group()) {
51if (OB_FAIL(redundant_ls_array_.push_back(ls_info))) {
52LOG_WARN("failed to push back ls info", KR(ret), K(ls_info));
53}
54} else if (OB_FAIL(normal_ls_array_.push_back(ls_info))) {
55LOG_WARN("failed to push back ls info", KR(ret), K(ls_info));
56}
57return ret;
58}
59
60int ObUnitGroupBalanceInfo::remove_redundant_ls(const int64_t &index)
61{
62int ret = OB_SUCCESS;
63if (OB_UNLIKELY(index >= redundant_ls_array_.count() || index < 0)) {
64ret = OB_INVALID_ARGUMENT;
65LOG_WARN("invalid argument", KR(ret), K(index));
66} else if (OB_FAIL(redundant_ls_array_.remove(index))) {
67LOG_WARN("failed to remove index", KR(ret), K(index));
68}
69return ret;
70}
71
72
73//////////////ObLSBalanceTaskHelper
74
75ObLSBalanceTaskHelper::ObLSBalanceTaskHelper() :
76inited_(false),
77tenant_id_(OB_INVALID_TENANT_ID),
78primary_zone_num_(0),
79unit_group_balance_array_(),
80sql_proxy_(NULL),
81job_(),
82task_array_(),
83tenant_ls_bg_info_()
84{
85}
86
87int ObLSBalanceTaskHelper::init(const uint64_t tenant_id,
88const share::ObLSStatusInfoArray &status_array,
89const ObIArray<share::ObSimpleUnitGroup> &unit_group_array,
90const int64_t primary_zone_num, ObMySQLProxy *sql_proxy)
91{
92int ret = OB_SUCCESS;
93if (OB_UNLIKELY(0 == status_array.count() || 0 == unit_group_array.count()
94|| 0 >= primary_zone_num || OB_INVALID_TENANT_ID == tenant_id)) {
95ret = OB_INVALID_ARGUMENT;
96LOG_WARN("invalid argument", KR(ret), K(status_array), K(unit_group_array),
97K(primary_zone_num), K(tenant_id));
98} else if (OB_FAIL(tenant_ls_bg_info_.init(tenant_id))) {
99LOG_WARN("init tenant LS balance group info fail", KR(ret), K(tenant_id));
100} else {
101//1. init all unit balance info
102for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_array.count(); ++i) {
103ObUnitGroupBalanceInfo balance_info(unit_group_array.at(i), primary_zone_num);
104if (OB_FAIL(unit_group_balance_array_.push_back(balance_info))) {
105LOG_WARN("failed to push back balance info", KR(ret), K(balance_info), K(i));
106}
107}
108int64_t index = OB_INVALID_INDEX_INT64;
109for (int64_t i = 0; OB_SUCC(ret) && i < status_array.count(); ++i) {
110const ObLSStatusInfo &ls_status = status_array.at(i);
111if (OB_FAIL(find_unit_group_balance_index(ls_status.unit_group_id_, index))) {
112if (OB_ENTRY_NOT_EXIST == ret) {
113//normal, ls status must has target unit_group,
114//but maybe migrate unit and ls group balance concurrency
115LOG_WARN("has ls in not valid unit group", KR(ret), K(ls_status), K(unit_group_array));
116ret = OB_SUCCESS;
117index = unit_group_balance_array_.count();
118ObSimpleUnitGroup unit_group(ls_status.unit_group_id_, ObUnit::UNIT_STATUS_DELETING);
119ObUnitGroupBalanceInfo balance_info(unit_group, primary_zone_num);
120if (OB_FAIL(unit_group_balance_array_.push_back(balance_info))) {
121LOG_WARN("failed to push back balance info", KR(ret), K(balance_info));
122}
123} else {
124LOG_WARN("failed to find index", KR(ret), K(ls_status));
125}
126}
127if (FAILEDx(unit_group_balance_array_.at(index).add_ls_status_info(ls_status))) {
128LOG_WARN("failed to add ls status info", KR(ret), K(ls_status));
129}
130}
131}
132if (OB_SUCC(ret)) {
133primary_zone_num_ = primary_zone_num;
134tenant_id_ = tenant_id;
135sql_proxy_ = sql_proxy;
136job_.reset();
137task_array_.reset();
138inited_ = true;
139}
140return ret;
141}
142
143int ObLSBalanceTaskHelper::find_unit_group_balance_index(const uint64_t unit_group_id, int64_t &index)
144{
145int ret = OB_SUCCESS;
146index = OB_INVALID_INDEX_INT64;
147if (OB_UNLIKELY(OB_INVALID_ID == unit_group_id)) {
148ret = OB_INVALID_ARGUMENT;
149LOG_WARN("invalid argument", KR(ret), K(unit_group_id));
150} else {
151for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
152if (unit_group_id == unit_group_balance_array_.at(i).get_unit_group_id()) {
153index = i;
154break;
155}
156}
157if (OB_SUCC(ret) && OB_INVALID_INDEX_INT64 == index) {
158ret = OB_ENTRY_NOT_EXIST;
159LOG_WARN("failed to find ls unit group", KR(ret), K(unit_group_id), K(unit_group_balance_array_));
160}
161}
162return ret;
163}
164
165int ObLSBalanceTaskHelper::check_need_ls_balance(bool &need_balance)
166{
167int ret = OB_SUCCESS;
168if (OB_UNLIKELY(!inited_)) {
169ret = OB_NOT_INIT;
170LOG_WARN("not init", KR(ret));
171} else if (OB_UNLIKELY(unit_group_balance_array_.count() <= 0)) {
172ret = OB_INVALID_ARGUMENT;
173LOG_WARN("unit group balance array not expected", KR(ret));
174} else {
175need_balance = false;
176for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count() && !need_balance; ++i) {
177const ObUnitGroupBalanceInfo &balance_info = unit_group_balance_array_.at(i);
178if (balance_info.get_lack_ls_count() > 0 || balance_info.get_redundant_ls_array().count() > 0) {
179//has more ls or less ls
180need_balance = true;
181ISTAT("has more or less ls, need balance", K(balance_info));
182}
183}
184}
185return ret;
186}
187
188int ObLSBalanceTaskHelper::generate_ls_balance_task()
189{
190int ret = OB_SUCCESS;
191ObMultiVersionSchemaService *schema_service = GCTX.schema_service_;
192
193if (OB_UNLIKELY(!inited_)) {
194ret = OB_NOT_INIT;
195LOG_WARN("not init", KR(ret));
196} else if (OB_FAIL(generate_balance_job_())) {
197LOG_WARN("failed to generate job", KR(ret));
198} else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(schema_service)) {
199ret = OB_ERR_UNEXPECTED;
200LOG_WARN("sql proxy or schema service is null", KR(ret), K(sql_proxy_), K(schema_service));
201}
202// build tenant all balance group info for ALL LS
203else if (OB_FAIL(tenant_ls_bg_info_.build("LS_BALANCE", *sql_proxy_, *schema_service))) {
204LOG_WARN("build tenant all balance group info for all LS fail", KR(ret));
205} else {
206if (0 == job_.get_balance_strategy().string().compare(share::LS_BALANCE_BY_ALTER)) {
207if (OB_FAIL(generate_alter_task_())) {
208LOG_WARN("failed to generate alter task", KR(ret));
209}
210} else if (0 == job_.get_balance_strategy().string().compare(share::LS_BALANCE_BY_MIGRATE)) {
211// 1. first migrate task
212if (OB_FAIL(generate_migrate_task_())) {
213LOG_WARN("failed to generate migrate task", KR(ret));
214}
215} else if (0 == job_.get_balance_strategy().string().compare(share::LS_BALANCE_BY_EXPAND)) {
216//2. try expand
217if (OB_FAIL(generate_expand_task_())) {
218LOG_WARN("failed to generate expand task", KR(ret));
219}
220} else if (0 == job_.get_balance_strategy().string().compare(share::LS_BALANCE_BY_SHRINK)) {
221//3. try shrink
222if (OB_FAIL(generate_shrink_task_())) {
223LOG_WARN("failed to generate expand task", KR(ret));
224}
225} else {
226ret = OB_ERR_UNEXPECTED;
227LOG_WARN("no other balance job", KR(ret), K_(job));
228}
229if (OB_SUCC(ret) && 0 == task_array_.count()) {
230ret = OB_ERR_UNEXPECTED;
231LOG_WARN("has no task", KR(ret), K(job_));
232}
233ISTAT("generate task", KR(ret), K(job_), K(task_array_));
234}
235return ret;
236}
237
238int ObLSBalanceTaskHelper::generate_balance_job_()
239{
240int ret = OB_SUCCESS;
241if (OB_UNLIKELY(!inited_)) {
242ret = OB_NOT_INIT;
243LOG_WARN("not init", KR(ret));
244} else if (OB_UNLIKELY(unit_group_balance_array_.count() <= 0)
245|| OB_ISNULL(sql_proxy_)) {
246ret = OB_ERR_UNDEFINED;
247LOG_WARN("error unexpected", KR(ret), KP(sql_proxy_), K(unit_group_balance_array_));
248} else {
249bool lack_ls = false;
250bool redundant_ls = false;
251bool need_modify_ls_group = false;
252ObBalanceJobType job_type(ObBalanceJobType::BALANCE_JOB_LS);
253ObBalanceJobStatus job_status(ObBalanceJobStatus::BALANCE_JOB_STATUS_DOING);
254int64_t unit_group_num = 0;
255ObBalanceJobID job_id;
256ObString comment;
257const char* balance_stradegy = NULL;
258for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
259const ObUnitGroupBalanceInfo &balance_info = unit_group_balance_array_.at(i);
260if (balance_info.is_active_unit_group()) {
261unit_group_num++;
262}
263if (balance_info.get_lack_ls_count() > 0) {
264lack_ls = true;
265ISTAT("unit group has little ls than expected", K(balance_info));
266}
267if (balance_info.get_redundant_ls_array().count() > 0) {
268redundant_ls = true;
269ISTAT("unit group has more ls than expected", K(balance_info));
270}
271uint64_t ls_group_id = OB_INVALID_ID;
272for (int64_t j = 0;
273OB_SUCC(ret) && j < balance_info.get_normal_ls_array().count() &&
274!need_modify_ls_group; ++j) {
275const ObLSStatusInfo &ls_status_info = balance_info.get_normal_ls_array().at(j);
276if (OB_INVALID_ID == ls_status_info.ls_group_id_) {
277ret = OB_ERR_UNEXPECTED;
278LOG_WARN("ls group id not expected", KR(ret), K(ls_status_info));
279} else if (OB_INVALID_ID == ls_group_id) {
280ls_group_id = ls_status_info.ls_group_id_;
281} else if (ls_group_id != ls_status_info.ls_group_id_) {
282need_modify_ls_group = true;
283ISTAT("unit group has different ls group", K(ls_group_id), K(ls_status_info), K(balance_info));
284}
285}
286}
287if (OB_SUCC(ret)) {
288if (need_modify_ls_group) {
289balance_stradegy = share::LS_BALANCE_BY_ALTER;
290} else if (lack_ls && redundant_ls) {
291balance_stradegy = share::LS_BALANCE_BY_MIGRATE;
292} else if (lack_ls) {
293balance_stradegy = share::LS_BALANCE_BY_EXPAND;
294} else if (redundant_ls) {
295balance_stradegy = share::LS_BALANCE_BY_SHRINK;
296} else {
297ret = OB_ERR_UNEXPECTED;
298LOG_WARN("must has balance job for ls", KR(ret), K(unit_group_balance_array_));
299}
300
301if (FAILEDx(ObCommonIDUtils::gen_unique_id(tenant_id_, job_id))) {
302LOG_WARN("generate unique id for balance job fail", KR(ret), K(tenant_id_));
303} else if (OB_FAIL(job_.init(tenant_id_, job_id, job_type, job_status, primary_zone_num_,
304unit_group_num, comment, ObString(balance_stradegy)))) {
305LOG_WARN("failed to init job", KR(ret), K(tenant_id_), K(job_id), K(job_type),
306K(job_status), K(primary_zone_num_), K(unit_group_num), K(balance_stradegy));
307}
308}
309}
310return ret;
311}
312
313int ObLSBalanceTaskHelper::generate_alter_task_()
314{
315int ret = OB_SUCCESS;
316if (OB_UNLIKELY(!inited_)) {
317ret = OB_NOT_INIT;
318LOG_WARN("not init", KR(ret));
319} else {
320uint64_t ls_group_id = OB_INVALID_ID;
321for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
322ObUnitGroupBalanceInfo &balance_info = unit_group_balance_array_.at(i);
323ls_group_id = OB_INVALID_ID;
324for (int64_t j = 0; OB_SUCC(ret) && j < balance_info.get_normal_ls_array().count(); ++j) {
325const ObLSStatusInfo &ls_status_info = balance_info.get_normal_ls_array().at(j);
326if (OB_INVALID_ID == ls_group_id) {
327ls_group_id = ls_status_info.ls_group_id_;
328} else if (ls_group_id != ls_status_info.ls_group_id_) {
329if (OB_FAIL(construct_ls_alter_task_(ls_status_info.ls_id_, ls_group_id))) {
330LOG_WARN("failed to construct ls alter task", KR(ret),
331K(ls_status_info), K(ls_group_id));
332}
333}
334}
335}
336}
337return ret;
338}
339
340int ObLSBalanceTaskHelper::generate_migrate_task_()
341{
342int ret = OB_SUCCESS;
343if (OB_UNLIKELY(!inited_)) {
344ret = OB_NOT_INIT;
345LOG_WARN("not init", KR(ret));
346} else {
347//get a redundant ls, and found one unit group less ls
348bool new_task = true;
349for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count() && new_task; ++i) {
350ObUnitGroupBalanceInfo &balance_info = unit_group_balance_array_.at(i);
351for (int64_t j = balance_info.get_redundant_ls_array().count() - 1; OB_SUCC(ret) && j >= 0 && new_task; --j) {
352//get one unit group, which less than primary_zone_unit_num
353const ObLSStatusInfo &ls_status = balance_info.get_redundant_ls_array().at(j);
354new_task = false;
355for (int64_t k = 0; OB_SUCC(ret) && k < unit_group_balance_array_.count(); ++k) {
356ObUnitGroupBalanceInfo &dest_balance_info = unit_group_balance_array_.at(k);
357if (dest_balance_info.get_lack_ls_count() > 0) {
358new_task = true;
359if (balance_info.get_unit_group_id() == dest_balance_info.get_unit_group_id()) {
360ret = OB_ERR_UNEXPECTED;
361LOG_WARN("ls group can not has more ls and lack ls", KR(ret),
362K(i), K(k), K(j), K(balance_info), K(dest_balance_info));
363} else if (OB_FAIL(generate_ls_alter_task_(ls_status, dest_balance_info))) {
364LOG_WARN("failed to generate ls alter task", KR(ret), K(ls_status), K(dest_balance_info));
365}
366}
367}//end for k
368if (OB_SUCC(ret) && new_task) {
369//remove ls status from the unit group
370if (OB_FAIL(balance_info.remove_redundant_ls(j))) {
371LOG_WARN("failed to remove redundant ls", KR(ret), K(j));
372}
373}
374}//end for j
375}//end for i
376}
377return ret;
378}
379
380int ObLSBalanceTaskHelper::generate_ls_alter_task_(const ObLSStatusInfo &ls_status_info, ObUnitGroupBalanceInfo &dest_unit_group)
381{
382int ret = OB_SUCCESS;
383if (OB_UNLIKELY(!inited_)) {
384ret = OB_NOT_INIT;
385LOG_WARN("not init", KR(ret));
386} else if (OB_UNLIKELY(!ls_status_info.is_valid()
387|| dest_unit_group.get_lack_ls_count() <= 0)) {
388ret = OB_INVALID_ARGUMENT;
389LOG_WARN("invalid argument", KR(ret), K(ls_status_info), K(dest_unit_group));
390} else {
391uint64_t ls_group_id = OB_INVALID_ID;
392ObLSStatusInfo dest_ls_status;
393if (dest_unit_group.get_normal_ls_array().count() > 0) {
394ls_group_id = dest_unit_group.get_normal_ls_array().at(0).ls_group_id_;
395} else if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_group_id(sql_proxy_, tenant_id_, ls_group_id))) {
396LOG_WARN("failed to fetch new ls id", KR(ret), K(tenant_id_));
397}
398if (FAILEDx(construct_ls_alter_task_(ls_status_info.ls_id_, ls_group_id))) {
399LOG_WARN("failed to construct ls alter task", KR(ret), K(ls_status_info), K(ls_group_id));
400} else if (OB_FAIL(dest_ls_status.init(ls_status_info.tenant_id_,
401ls_status_info.ls_id_, ls_group_id,
402ls_status_info.status_,
403ls_status_info.unit_group_id_,
404ls_status_info.primary_zone_,
405ls_status_info.get_flag()))) {
406LOG_WARN("failed to init ls status", KR(ret), K(ls_group_id), K(ls_status_info));
407} else if (OB_FAIL(dest_unit_group.add_ls_status_info(dest_ls_status))) {
408LOG_WARN("failed to add ls status info", KR(ret), K(dest_ls_status));
409}
410}
411return ret;
412}
413
414int ObLSBalanceTaskHelper::generate_expand_task_()
415{
416int ret = OB_SUCCESS;
417if (OB_UNLIKELY(!inited_)) {
418ret = OB_NOT_INIT;
419LOG_WARN("not init", KR(ret));
420} else {
421int64_t lack_count = 0;
422ObSplitLSParamArray src_ls;
423ObArray<ObSplitLSParamArray> dest_ls;
424const double src_factor = 1;
425for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
426const ObUnitGroupBalanceInfo & balance_info = unit_group_balance_array_.at(i);
427for (int64_t j = 0; OB_SUCC(ret) && j < balance_info.get_normal_ls_array().count(); ++j) {
428ObSplitLSParam param(&balance_info.get_normal_ls_array().at(j), src_factor);
429if (OB_FAIL(src_ls.push_back(param))) {
430LOG_WARN("failed to push back param", KR(ret), K(param), K(i));
431}
432}
433if (OB_SUCC(ret)) {
434lack_count += balance_info.get_lack_ls_count();
435}
436}
437if (FAILEDx(construct_expand_dest_param_(lack_count, src_ls, dest_ls))) {
438LOG_WARN("failed to construct expand dest param", KR(ret), K(lack_count), K(src_ls));
439}
440int64_t dest_ls_index = 0;
441uint64_t ls_group_id = OB_INVALID_ID;
442for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
443ls_group_id = OB_INVALID_ID;
444const ObUnitGroupBalanceInfo &balance_info = unit_group_balance_array_.at(i);
445if (balance_info.get_normal_ls_array().count() > 0) {
446ls_group_id = balance_info.get_normal_ls_array().at(0).ls_group_id_;
447} else if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_group_id(sql_proxy_, tenant_id_, ls_group_id))) {
448LOG_WARN("failed to fetch new ls group id", KR(ret), K(tenant_id_));
449}
450for (int64_t j = 0; OB_SUCC(ret) && j < balance_info.get_lack_ls_count(); ++j) {
451if (OB_UNLIKELY(dest_ls_index >= dest_ls.count())) {
452ret = OB_ERR_UNEXPECTED;
453LOG_WARN("dest ls index not expected", KR(ret), K(dest_ls_index));
454} else if (OB_FAIL(generate_balance_task_for_expand_(dest_ls.at(dest_ls_index),
455ls_group_id))) {
456LOG_WARN("failed to get balance task", KR(ret), K(i), K(j), K(ls_group_id),
457"dest_ls_param", dest_ls.at(dest_ls_index));
458} else {
459++dest_ls_index;
460}
461}
462if (OB_SUCC(ret)) {
463lack_count += balance_info.get_lack_ls_count();
464}
465}
466}
467return ret;
468}
469
470int ObLSBalanceTaskHelper::generate_shrink_task_()
471{
472int ret = OB_SUCCESS;
473if (OB_UNLIKELY(!inited_)) {
474ret = OB_NOT_INIT;
475LOG_WARN("not init", KR(ret));
476} else {
477const int64_t normal_ls_count = job_.get_primary_zone_num() * job_.get_unit_group_num();
478ObSplitLSParamArray src_ls;
479ObArray<ObSplitLSParamArray> dest_ls;
480const double src_factor = 1;
481for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
482const ObUnitGroupBalanceInfo & balance_info = unit_group_balance_array_.at(i);
483for (int64_t j = 0; OB_SUCC(ret) && j < balance_info.get_redundant_ls_array().count(); ++j) {
484ObSplitLSParam param(&balance_info.get_redundant_ls_array().at(j), src_factor);
485if (OB_FAIL(src_ls.push_back(param))) {
486LOG_WARN("failed to push back param", KR(ret), K(param), K(i), K(j));
487}
488}
489}
490if (FAILEDx(construct_shrink_src_param_(normal_ls_count, src_ls, dest_ls))) {
491LOG_WARN("failed to construct expand dest param", KR(ret), K(normal_ls_count), K(src_ls));
492}
493int64_t dest_index = 0;
494for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
495const ObUnitGroupBalanceInfo & balance_info = unit_group_balance_array_.at(i);
496for (int64_t j = 0; OB_SUCC(ret) && j < balance_info.get_normal_ls_array().count(); ++j) {
497if (OB_UNLIKELY(dest_ls.count() < dest_index)) {
498ret = OB_ERR_UNEXPECTED;
499LOG_WARN("src ls is unexpected", KR(ret), K(dest_ls), K(dest_ls));
500} else if (OB_FAIL(generate_task_for_shrink_(
501dest_ls.at(dest_index++),
502balance_info.get_normal_ls_array().at(j)))) {
503LOG_WARN("failed to generate task for shrink", KR(ret), K(dest_index), K(dest_ls), K(j), K(balance_info));
504}
505}
506}
507}
508return ret;
509}
510
511int ObLSBalanceTaskHelper::generate_task_for_shrink_(
512const ObSplitLSParamArray &src_split_param,
513const ObLSStatusInfo &ls_status_info)
514{
515int ret = OB_SUCCESS;
516if (OB_UNLIKELY(!inited_)) {
517ret = OB_NOT_INIT;
518LOG_WARN("not init", KR(ret));
519} else if (OB_UNLIKELY(!job_.is_valid() || src_split_param.count() <= 0
520|| !ls_status_info.is_valid())) {
521ret = OB_ERR_UNDEFINED;
522LOG_WARN("error unexpected", KR(ret), K(job_), K(src_split_param), K(ls_status_info));
523} else {
524for (int64_t i = 0; OB_SUCC(ret) && i < src_split_param.count(); ++i) {
525const ObSplitLSParam ¶m = src_split_param.at(i);
526ObLSID merge_ls_id;
527if (fabs(param.get_current_factor() - 1.0) < OB_DOUBLE_EPSINON) {
528//nothing
529merge_ls_id = param.get_ls_info()->ls_id_;
530} else {
531if (param.get_ls_info()->ls_group_id_ == ls_status_info.ls_group_id_) {
532//need_transfer, no need merge
533if (OB_FAIL(generate_transfer_task_(param, ls_status_info))) {
534LOG_WARN("failed to generate transfer task", KR(ret), K(param));
535}
536} else {
537// need split
538ObSplitLSParamArray tmp_split_param;
539int64_t task_index = OB_INVALID_INDEX_INT64;
540if (OB_FAIL(tmp_split_param.push_back(param))) {
541LOG_WARN("failed to push back param", KR(ret), K(param));
542} else if (OB_FAIL(generate_ls_split_task_(tmp_split_param, task_index))) {
543LOG_WARN("failed to generate ls info", KR(ret), K(tmp_split_param));
544} else {
545merge_ls_id = task_array_.at(task_index).get_dest_ls_id();
546}
547}
548}
549if (OB_SUCC(ret)) {
550if (param.get_ls_info()->ls_group_id_ != ls_status_info.ls_group_id_) {
551//need alter task
552if (OB_FAIL(construct_ls_alter_task_(merge_ls_id, ls_status_info.ls_group_id_))) {
553LOG_WARN("failed to construct ls alter task", KR(ret), K(merge_ls_id), K(ls_status_info));
554}
555}
556}
557if (OB_SUCC(ret) && merge_ls_id.is_valid()) {
558//need merge
559if (OB_FAIL(construct_ls_merge_task_(merge_ls_id, ls_status_info.ls_id_,
560ls_status_info.ls_group_id_))) {
561LOG_WARN("failed to construct ls merge task", KR(ret), K(merge_ls_id), K(ls_status_info));
562}
563}
564}//end for
565}
566return ret;
567}
568
569int ObLSBalanceTaskHelper::generate_transfer_task_(
570const ObSplitLSParam ¶m, const ObLSStatusInfo &ls_status_info)
571{
572int ret = OB_SUCCESS;
573if (OB_UNLIKELY(!inited_)) {
574ret = OB_NOT_INIT;
575LOG_WARN("not init", KR(ret));
576} else if (OB_UNLIKELY(!param.is_valid() || !ls_status_info.is_valid())) {
577ret = OB_INVALID_ARGUMENT;
578LOG_WARN("invalid argument", KR(ret), K(param), K(ls_status_info));
579} else {
580ObBalanceTaskType task_type(
581ObBalanceTaskType::BALANCE_TASK_TRANSFER); // transfer task
582ObBalanceTask task;
583ObTransferPartList part_list;
584ObBalanceTaskID task_id;
585if (OB_FAIL(ObCommonIDUtils::gen_unique_id(tenant_id_, task_id))) {
586LOG_WARN("gen_unique_id for balance task failed", KR(ret), K(task_id),
587K_(tenant_id));
588} else if (OB_FAIL(construct_ls_part_info_(param, part_list))) {
589LOG_WARN("failed to construct ls part info", KR(ret), K(param));
590} else if (OB_FAIL(task.simple_init(
591tenant_id_, job_.get_job_id(), task_id, task_type,
592ls_status_info.ls_group_id_,
593param.get_ls_info()->ls_id_, ls_status_info.ls_id_, part_list))) {
594LOG_WARN("failed to init task", KR(ret), K(tenant_id_), K(job_),
595K(task_id), K(task_type), K(part_list));
596} else if (OB_FAIL(task_array_.push_back(task))) {
597LOG_WARN("failed to push back task", KR(ret), K(task));
598}
599ISTAT("generate transfer task", KR(ret), K(task), K(job_));
600}
601return ret;
602}
603
604int ObLSBalanceTaskHelper::construct_shrink_src_param_(const int64_t target_count, ObSplitLSParamArray &src_ls,
605ObIArray<ObSplitLSParamArray> &dest_split_array)
606{
607int ret = OB_SUCCESS;
608if (OB_UNLIKELY(!inited_)) {
609ret = OB_NOT_INIT;
610LOG_WARN("not init", KR(ret));
611} else if (OB_UNLIKELY(0 == target_count || 0 == src_ls.count())) {
612ret = OB_INVALID_ARGUMENT;
613LOG_WARN("invalid argument", KR(ret), K(target_count), K(src_ls));
614} else {
615const double each_ls_target_factor = double(src_ls.count()) / (target_count);
616if (each_ls_target_factor <= OB_DOUBLE_EPSINON) {
617ret = OB_ERR_UNEXPECTED;
618LOG_WARN("too many ls", KR(ret), K(each_ls_target_factor), K(target_count), K(src_ls));
619}
620for (int64_t i = 0; OB_SUCC(ret) && i < target_count; ++i) {
621double need_factor = each_ls_target_factor;
622ObSplitLSParamArray src_array;
623for (int64_t j = 0; OB_SUCC(ret) && j < src_ls.count() && need_factor > OB_DOUBLE_EPSINON; ++j) {
624ObSplitLSParam ¶m = src_ls.at(j);
625double get_factor = param.reduce_enough_factor(need_factor);
626if (!(get_factor)) { // strictly equal to zero
627//empty
628} else if (OB_DOUBLE_EPSINON >= get_factor) {
629ret = OB_ERR_UNEXPECTED;
630LOG_WARN("factor is too small", KR(ret), K(need_factor), K(src_ls), K(src_array), K(dest_split_array));
631} else {
632need_factor -= get_factor;
633if (OB_DOUBLE_EPSINON >= param.get_current_factor()) {
634param.reduce_all();
635//for ex
636//if current ls is 3, need shrink to 2, first ls need transfer, second need merge
637get_factor = 1;
638}
639ObSplitLSParam split_param(param.get_ls_info(), get_factor);
640LOG_TRACE("split param", KR(ret), K(split_param), K(i), K(j));
641if (OB_FAIL(src_array.push_back(split_param))) {
642LOG_WARN("failed to push back split param", KR(ret), K(split_param));
643}
644}
645}//end for j
646if (FAILEDx(dest_split_array.push_back(src_array))) {
647LOG_WARN("failed to push back src array", KR(ret), K(i), K(src_array));
648}
649}
650}
651return ret;
652}
653
654int ObLSBalanceTaskHelper::construct_expand_dest_param_(const int64_t lack_ls_count, ObSplitLSParamArray &src_ls,
655ObIArray<ObSplitLSParamArray> &dest_split_array)
656{
657int ret = OB_SUCCESS;
658if (OB_UNLIKELY(!inited_)) {
659ret = OB_NOT_INIT;
660LOG_WARN("not init", KR(ret));
661} else if (OB_UNLIKELY(0 == lack_ls_count || 0 == src_ls.count())) {
662ret = OB_INVALID_ARGUMENT;
663LOG_WARN("invalid argument", KR(ret), K(lack_ls_count), K(src_ls));
664} else {
665const double each_ls_target_factor = double(src_ls.count()) / (src_ls.count() + lack_ls_count);
666if (each_ls_target_factor <= OB_DOUBLE_EPSINON) {
667ret = OB_ERR_UNEXPECTED;
668LOG_WARN("too many lack ls count", KR(ret), K(each_ls_target_factor), K(lack_ls_count), K(src_ls));
669}
670for (int64_t i = 0; OB_SUCC(ret) && i < lack_ls_count; ++i) {
671double need_factor = each_ls_target_factor;
672ObSplitLSParamArray src_array;
673for (int64_t j = 0; OB_SUCC(ret) && j < src_ls.count() && need_factor > OB_DOUBLE_EPSINON; ++j) {
674ObSplitLSParam ¶m = src_ls.at(j);
675double get_factor = param.reduce_factor_for_dest(need_factor, each_ls_target_factor);
676if (get_factor > OB_DOUBLE_EPSINON) {
677ObSplitLSParam split_param(param.get_ls_info(), get_factor);
678need_factor -= get_factor;
679if (OB_FAIL(src_array.push_back(split_param))) {
680LOG_WARN("failed to push back split param", KR(ret), K(split_param));
681}
682}
683}
684if (OB_FAIL(ret)) {
685} else if (OB_UNLIKELY(0 >= src_array.count())) {
686ret = OB_ERR_UNEXPECTED;
687LOG_WARN("src array is empty", KR(ret), K(src_ls));
688} else if (OB_FAIL(dest_split_array.push_back(src_array))) {
689LOG_WARN("failed to push back src array", KR(ret), K(i), K(src_array));
690}
691}
692}
693return ret;
694}
695
696int ObLSBalanceTaskHelper::generate_balance_task_for_expand_(
697const ObSplitLSParamArray &dest_split_param, const uint64_t ls_group_id)
698{
699int ret = OB_SUCCESS;
700if (OB_UNLIKELY(!inited_)) {
701ret = OB_NOT_INIT;
702LOG_WARN("not init", KR(ret));
703} else if (OB_UNLIKELY(!job_.is_valid() || dest_split_param.count() <= 0
704|| OB_INVALID_ID == ls_group_id)) {
705ret = OB_ERR_UNEXPECTED;
706LOG_WARN("error unexpected", KR(ret), K(job_), K(dest_split_param), K(ls_group_id));
707} else {
708//generate new ls info for split
709int64_t task_begin_index = OB_INVALID_INDEX_INT64;
710if (OB_FAIL(generate_ls_split_task_(dest_split_param, task_begin_index))) {
711LOG_WARN("failed to generate ls info", KR(ret), K(dest_split_param));
712} else if (OB_UNLIKELY(task_begin_index < 0 || task_begin_index > task_array_.count())) {
713ret = OB_ERR_UNEXPECTED;
714LOG_WARN("task_begin_index is invalid", KR(ret), K(task_begin_index));
715}
716
717for (int64_t i = task_begin_index; OB_SUCC(ret) && i < task_array_.count(); ++i) {
718if (ls_group_id != task_array_.at(i).get_ls_group_id()) {
719if (OB_FAIL(construct_ls_alter_task_(task_array_.at(i).get_dest_ls_id(), ls_group_id))) {
720LOG_WARN("failed to init task", KR(ret), K(task_array_.at(i)), K(ls_group_id));
721}
722}
723}
724if (OB_SUCC(ret)) {
725ObLSID dest_ls_id = task_array_.at(task_begin_index).get_dest_ls_id();
726for (int64_t i = task_begin_index + 1; OB_SUCC(ret) && i < task_array_.count(); ++i) {
727if (task_array_.at(i).get_task_type().is_split_task()) {
728if (OB_FAIL(construct_ls_merge_task_(task_array_.at(i).get_dest_ls_id(),
729dest_ls_id, ls_group_id))) {
730LOG_WARN("failed to construct ls merge task", KR(ret),
731K(task_array_.at(i)), K(dest_ls_id), K(ls_group_id));
732}
733}
734}
735}
736}
737return ret;
738}
739int ObLSBalanceTaskHelper::generate_ls_split_task_(const ObSplitLSParamArray &dest_split_param,
740int64_t &task_begin_index)
741{
742int ret = OB_SUCCESS;
743if (OB_UNLIKELY(!inited_)) {
744ret = OB_NOT_INIT;
745LOG_WARN("not init", KR(ret));
746} else if (OB_UNLIKELY(!job_.is_valid() || dest_split_param.count() <= 0)) {
747ret = OB_ERR_UNDEFINED;
748LOG_WARN("error unexpected", KR(ret), K(job_), K(dest_split_param));
749}
750ObBalanceTask task;
751ObBalanceTaskType task_type(ObBalanceTaskType::BALANCE_TASK_SPLIT);//split task
752ObTransferPartList part_list;//TODO
753task_begin_index = task_array_.count();
754for (int64_t i = 0; OB_SUCC(ret) && i < dest_split_param.count(); ++i) {
755// split task has equal ls group id with source
756//TODO part_list fill partition_info of task
757task.reset();
758ObLSID dest_ls_id;
759ObBalanceTaskID task_id;
760const share::ObLSStatusInfo *src_ls = dest_split_param.at(i).get_ls_info();
761if (OB_ISNULL(src_ls)) {
762ret = OB_ERR_UNEXPECTED;
763LOG_WARN("src ls is null", KR(ret), K(i), K(dest_split_param));
764} else if (OB_FAIL(construct_ls_part_info_(dest_split_param.at(i), part_list))) {
765LOG_WARN("failed to construct ls part info", KR(ret), KPC(src_ls));
766} else if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_id(sql_proxy_, tenant_id_, dest_ls_id))) {
767LOG_WARN("failed to fetch new ls id", KR(ret), K(tenant_id_));
768} else if (OB_FAIL(ObCommonIDUtils::gen_unique_id(tenant_id_, task_id))) {
769LOG_WARN("failed to gen unique id", KR(ret), K(tenant_id_));
770} else if (OB_FAIL(task.simple_init(tenant_id_, job_.get_job_id(), task_id, task_type,
771src_ls->ls_group_id_, src_ls->ls_id_, dest_ls_id,
772part_list))) {
773LOG_WARN("failed to init task", KR(ret), K(tenant_id_), K(job_), K(task_id), K(task_type),
774KPC(src_ls), K(dest_ls_id), K(part_list));
775} else if (OB_FAIL(task_array_.push_back(task))) {
776LOG_WARN("failed to push back task", KR(ret), K(task));
777}
778ISTAT("generate split task", KR(ret), K(task), K(job_));
779}
780return ret;
781}
782
783int ObLSBalanceTaskHelper::construct_ls_alter_task_(const share::ObLSID &ls_id, const uint64_t ls_group_id)
784{
785int ret = OB_SUCCESS;
786if (OB_UNLIKELY(!inited_)) {
787ret = OB_NOT_INIT;
788LOG_WARN("not init", KR(ret));
789} else if (OB_UNLIKELY(!job_.is_valid() || !ls_id.is_valid()
790|| OB_INVALID_ID == ls_group_id)) {
791ret = OB_INVALID_ARGUMENT;
792LOG_WARN("invalid argument", KR(ret), K(job_), K(ls_id), K(ls_group_id));
793} else {
794//for alter
795ObBalanceTask task;
796ObBalanceTaskID task_id;
797ObBalanceTaskType task_type(ObBalanceTaskType::BALANCE_TASK_ALTER);
798ObTransferPartList part_list;
799ObLSID dest_ls_id;
800if (OB_FAIL(ObCommonIDUtils::gen_unique_id(tenant_id_, task_id))) {
801LOG_WARN("failed to gen unique id", KR(ret), K(tenant_id_));
802} else if (OB_FAIL(task.simple_init(tenant_id_, job_.get_job_id(),
803task_id, task_type, ls_group_id,
804ls_id,
805dest_ls_id,
806part_list))) {
807LOG_WARN("failed to init task", KR(ret), K(tenant_id_), K(job_),
808K(task_id), K(task_type), K(ls_id), K(part_list));
809} else if (OB_FAIL(task_array_.push_back(task))) {
810LOG_WARN("failed to push back task", KR(ret), K(task));
811}
812ISTAT("generate alter task", KR(ret), K(task), K(job_));
813}
814return ret;
815}
816
817int ObLSBalanceTaskHelper::construct_ls_merge_task_(
818const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id,
819const uint64_t ls_group_id)
820{
821int ret = OB_SUCCESS;
822if (OB_UNLIKELY(!inited_)) {
823ret = OB_NOT_INIT;
824LOG_WARN("not init", KR(ret));
825} else if (OB_UNLIKELY(!job_.is_valid() || !src_ls_id.is_valid()
826|| OB_INVALID_ID == ls_group_id
827|| !dest_ls_id.is_valid())) {
828ret = OB_INVALID_ARGUMENT;
829LOG_WARN("invalid argument", KR(ret), K(job_), K(src_ls_id), K(ls_group_id), K(dest_ls_id));
830} else {
831//for merge
832ObBalanceTask task;
833ObBalanceTaskID task_id;
834ObBalanceTaskType task_type(ObBalanceTaskType::BALANCE_TASK_MERGE);// merge task
835ObTransferPartList part_list;
836if (OB_FAIL(ObCommonIDUtils::gen_unique_id(tenant_id_, task_id))) {
837LOG_WARN("failed to gen unique id", KR(ret), K(tenant_id_));
838} else if (OB_FAIL(task.simple_init(tenant_id_, job_.get_job_id(),
839task_id, task_type, ls_group_id,
840src_ls_id,
841dest_ls_id,
842part_list))) {
843LOG_WARN("failed to init task", KR(ret), K(tenant_id_), K(job_),
844K(task_id), K(task_type), K(dest_ls_id), K(src_ls_id), K(part_list));
845} else if (OB_FAIL(task_array_.push_back(task))) {
846LOG_WARN("failed to push back task", KR(ret), K(task));
847}
848ISTAT("generate merge task", KR(ret), K(task), K(job_));
849}
850return ret;
851}
852
853int ObLSBalanceTaskHelper::construct_ls_part_info_(const ObSplitLSParam &src_ls, ObTransferPartList &part_list)
854{
855int ret = OB_SUCCESS;
856ObLSID src_ls_id = src_ls.get_ls_id();
857const double factor = src_ls.get_current_factor();
858ObLSBalanceGroupInfo *ls_bg_info = NULL;
859
860part_list.reset();
861
862if (OB_UNLIKELY(!inited_)) {
863ret = OB_NOT_INIT;
864LOG_WARN("not init", KR(ret));
865} else if (OB_UNLIKELY(!src_ls.is_valid() || !src_ls_id.is_valid())) {
866ret = OB_INVALID_ARGUMENT;
867LOG_WARN("src ls is invalid", KR(ret), K(src_ls), K(src_ls_id));
868} else if (OB_FAIL(tenant_ls_bg_info_.get(src_ls_id, ls_bg_info))) {
869if (OB_HASH_NOT_EXIST == ret) {
870ret = OB_SUCCESS;
871ISTAT("src ls is empty, no need to transfer out", KR(ret), K(src_ls_id));
872} else {
873LOG_WARN("get src ls balance group info fail", KR(ret), K(src_ls_id), K(src_ls));
874}
875} else if (OB_ISNULL(ls_bg_info)) {
876ret = OB_ERR_UNEXPECTED;
877LOG_WARN("invalid ls balance group info", KR(ret), K(ls_bg_info), K(src_ls_id));
878} else if (OB_FAIL(ls_bg_info->transfer_out_by_factor(factor, part_list))) {
879LOG_WARN("transfer out part list from LS balance group info fail", KR(ret), K(factor),
880KPC(ls_bg_info), K(part_list));
881}
882return ret;
883}
884
885#undef ISTAT
886#undef WSTAT
887
888}
889}
890