oceanbase

Форк
0
/
ob_ls_balance_helper.cpp 
889 строк · 36.8 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12
#define USING_LOG_PREFIX BALANCE
13
#include "rootserver/ob_ls_balance_helper.h"
14
#include "rootserver/ob_primary_ls_service.h"//fetch max ls id
15
#include "lib/mysqlclient/ob_mysql_transaction.h"//trans
16
#include "observer/ob_server_struct.h"//GCTX
17
#include "share/schema/ob_schema_getter_guard.h"//ObSchemaGetGuard
18
#include "share/schema/ob_multi_version_schema_service.h"//ObMultiSchemaService
19
#include "share/schema/ob_table_schema.h"//ObTableSchema
20
#include "share/ob_balance_define.h"  // ObBalanceTaskID, ObBalanceJobID
21
#include "storage/tx/ob_unique_id_service.h" // ObUniqueIDService
22
#include "storage/ob_common_id_utils.h"     // ObCommonIDUtils
23
#include "ob_ls_balance_helper.h"
24

25
#define ISTAT(fmt, args...) FLOG_INFO("[LS_BALANCE] " fmt, ##args)
26
#define WSTAT(fmt, args...) FLOG_WARN("[LS_BALANCE] " fmt, ##args)
27

28
namespace oceanbase
29
{
30
using namespace share;
31
namespace rootserver
32
{
33
//////ObUnitGroupBalanceInfo
34
void ObUnitGroupBalanceInfo::reset()
35
{
36
  primary_zone_count_ = OB_INVALID_COUNT;
37
  unit_group_.reset();
38
  redundant_ls_array_.reset();
39
  normal_ls_array_.reset();
40
}
41

42
int ObUnitGroupBalanceInfo::add_ls_status_info(const ObLSStatusInfo &ls_info)
43
{
44
  int ret = OB_SUCCESS;
45
  //TODO has ls group id not match
46
  if (OB_UNLIKELY(!ls_info.is_valid())) {
47
    ret = OB_INVALID_ARGUMENT;
48
    LOG_WARN("invalid argument", KR(ret), K(ls_info));
49
  } else if (normal_ls_array_.count() >= primary_zone_count_
50
             || !is_active_unit_group()) {
51
    if (OB_FAIL(redundant_ls_array_.push_back(ls_info))) {
52
      LOG_WARN("failed to push back ls info", KR(ret), K(ls_info));
53
    }
54
  } else if (OB_FAIL(normal_ls_array_.push_back(ls_info))) {
55
    LOG_WARN("failed to push back ls info", KR(ret), K(ls_info));
56
  }
57
  return ret;
58
}
59

60
int ObUnitGroupBalanceInfo::remove_redundant_ls(const int64_t &index)
61
{
62
  int ret = OB_SUCCESS;
63
  if (OB_UNLIKELY(index >= redundant_ls_array_.count() || index < 0)) {
64
    ret = OB_INVALID_ARGUMENT;
65
    LOG_WARN("invalid argument", KR(ret), K(index));
66
  } else if (OB_FAIL(redundant_ls_array_.remove(index))) {
67
    LOG_WARN("failed to remove index", KR(ret), K(index));
68
  }
69
  return ret;
70
}
71

72

73
//////////////ObLSBalanceTaskHelper
74

75
ObLSBalanceTaskHelper::ObLSBalanceTaskHelper() :
76
    inited_(false),
77
    tenant_id_(OB_INVALID_TENANT_ID),
78
    primary_zone_num_(0),
79
    unit_group_balance_array_(),
80
    sql_proxy_(NULL),
81
    job_(),
82
    task_array_(),
83
    tenant_ls_bg_info_()
84
{
85
}
86

87
int ObLSBalanceTaskHelper::init(const uint64_t tenant_id,
88
           const share::ObLSStatusInfoArray &status_array,
89
           const ObIArray<share::ObSimpleUnitGroup> &unit_group_array,
90
           const int64_t primary_zone_num, ObMySQLProxy *sql_proxy)
91
{
92
  int ret = OB_SUCCESS;
93
  if (OB_UNLIKELY(0 == status_array.count() || 0 == unit_group_array.count()
94
                  || 0 >= primary_zone_num || OB_INVALID_TENANT_ID == tenant_id)) {
95
    ret = OB_INVALID_ARGUMENT;
96
    LOG_WARN("invalid argument", KR(ret), K(status_array), K(unit_group_array),
97
                                 K(primary_zone_num), K(tenant_id));
98
  } else if (OB_FAIL(tenant_ls_bg_info_.init(tenant_id))) {
99
    LOG_WARN("init tenant LS balance group info fail", KR(ret), K(tenant_id));
100
  } else {
101
    //1. init all unit balance info
102
    for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_array.count(); ++i) {
103
      ObUnitGroupBalanceInfo balance_info(unit_group_array.at(i), primary_zone_num);
104
      if (OB_FAIL(unit_group_balance_array_.push_back(balance_info))) {
105
        LOG_WARN("failed to push back balance info", KR(ret), K(balance_info), K(i));
106
      }
107
    }
108
    int64_t index = OB_INVALID_INDEX_INT64;
109
    for (int64_t i = 0; OB_SUCC(ret) && i < status_array.count(); ++i) {
110
      const ObLSStatusInfo &ls_status = status_array.at(i);
111
      if (OB_FAIL(find_unit_group_balance_index(ls_status.unit_group_id_, index))) {
112
        if (OB_ENTRY_NOT_EXIST == ret) {
113
          //normal, ls status must has target unit_group,
114
          //but maybe migrate unit and ls group balance concurrency
115
          LOG_WARN("has ls in not valid unit group", KR(ret), K(ls_status), K(unit_group_array));
116
          ret = OB_SUCCESS;
117
          index = unit_group_balance_array_.count();
118
          ObSimpleUnitGroup unit_group(ls_status.unit_group_id_, ObUnit::UNIT_STATUS_DELETING);
119
          ObUnitGroupBalanceInfo balance_info(unit_group, primary_zone_num);
120
          if (OB_FAIL(unit_group_balance_array_.push_back(balance_info))) {
121
            LOG_WARN("failed to push back balance info", KR(ret), K(balance_info));
122
          }
123
        } else {
124
          LOG_WARN("failed to find index", KR(ret), K(ls_status));
125
        }
126
      }
127
      if (FAILEDx(unit_group_balance_array_.at(index).add_ls_status_info(ls_status))) {
128
        LOG_WARN("failed to add ls status info", KR(ret), K(ls_status));
129
      }
130
    }
131
  }
132
  if (OB_SUCC(ret)) {
133
    primary_zone_num_ = primary_zone_num;
134
    tenant_id_ = tenant_id;
135
    sql_proxy_ = sql_proxy;
136
    job_.reset();
137
    task_array_.reset();
138
    inited_ = true;
139
  }
140
  return ret;
141
}
142

143
int ObLSBalanceTaskHelper::find_unit_group_balance_index(const uint64_t unit_group_id, int64_t &index)
144
{
145
  int ret = OB_SUCCESS;
146
  index = OB_INVALID_INDEX_INT64;
147
  if (OB_UNLIKELY(OB_INVALID_ID == unit_group_id)) {
148
    ret = OB_INVALID_ARGUMENT;
149
    LOG_WARN("invalid argument", KR(ret), K(unit_group_id));
150
  } else {
151
    for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
152
      if (unit_group_id == unit_group_balance_array_.at(i).get_unit_group_id()) {
153
        index = i;
154
        break;
155
      }
156
    }
157
    if (OB_SUCC(ret) && OB_INVALID_INDEX_INT64 == index) {
158
      ret = OB_ENTRY_NOT_EXIST;
159
      LOG_WARN("failed to find ls unit group", KR(ret), K(unit_group_id), K(unit_group_balance_array_));
160
    }
161
  }
162
  return ret;
163
}
164

165
int ObLSBalanceTaskHelper::check_need_ls_balance(bool &need_balance)
166
{
167
  int ret = OB_SUCCESS;
168
  if (OB_UNLIKELY(!inited_)) {
169
    ret = OB_NOT_INIT;
170
    LOG_WARN("not init", KR(ret));
171
  } else if (OB_UNLIKELY(unit_group_balance_array_.count() <= 0)) {
172
    ret = OB_INVALID_ARGUMENT;
173
    LOG_WARN("unit group balance array not expected", KR(ret));
174
  } else {
175
    need_balance = false;
176
    for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count() && !need_balance; ++i) {
177
      const ObUnitGroupBalanceInfo &balance_info = unit_group_balance_array_.at(i);
178
      if (balance_info.get_lack_ls_count() > 0 || balance_info.get_redundant_ls_array().count() > 0) {
179
        //has more ls or less ls
180
        need_balance = true;
181
        ISTAT("has more or less ls, need balance", K(balance_info));
182
      }
183
    }
184
  }
185
  return ret;
186
}
187

188
int ObLSBalanceTaskHelper::generate_ls_balance_task()
189
{
190
  int ret = OB_SUCCESS;
191
  ObMultiVersionSchemaService *schema_service = GCTX.schema_service_;
192

193
  if (OB_UNLIKELY(!inited_)) {
194
    ret = OB_NOT_INIT;
195
    LOG_WARN("not init", KR(ret));
196
  } else if (OB_FAIL(generate_balance_job_())) {
197
    LOG_WARN("failed to generate job", KR(ret));
198
  } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(schema_service)) {
199
    ret = OB_ERR_UNEXPECTED;
200
    LOG_WARN("sql proxy or schema service is null", KR(ret), K(sql_proxy_), K(schema_service));
201
  }
202
  // build tenant all balance group info for ALL LS
203
  else if (OB_FAIL(tenant_ls_bg_info_.build("LS_BALANCE", *sql_proxy_, *schema_service))) {
204
    LOG_WARN("build tenant all balance group info for all LS fail", KR(ret));
205
  } else {
206
    if (0 == job_.get_balance_strategy().string().compare(share::LS_BALANCE_BY_ALTER)) {
207
      if (OB_FAIL(generate_alter_task_())) {
208
        LOG_WARN("failed to generate alter task", KR(ret));
209
      }
210
    } else if (0 == job_.get_balance_strategy().string().compare(share::LS_BALANCE_BY_MIGRATE)) {
211
      // 1. first migrate task
212
      if (OB_FAIL(generate_migrate_task_())) {
213
        LOG_WARN("failed to generate migrate task", KR(ret));
214
      }
215
    } else if (0 == job_.get_balance_strategy().string().compare(share::LS_BALANCE_BY_EXPAND)) {
216
    //2. try expand
217
      if (OB_FAIL(generate_expand_task_())) {
218
        LOG_WARN("failed to generate expand task", KR(ret));
219
      }
220
    } else if (0 == job_.get_balance_strategy().string().compare(share::LS_BALANCE_BY_SHRINK)) {
221
    //3. try shrink
222
      if (OB_FAIL(generate_shrink_task_())) {
223
        LOG_WARN("failed to generate expand task", KR(ret));
224
      }
225
    } else {
226
      ret = OB_ERR_UNEXPECTED;
227
      LOG_WARN("no other balance job", KR(ret), K_(job));
228
    }
229
    if (OB_SUCC(ret) && 0 == task_array_.count()) {
230
      ret = OB_ERR_UNEXPECTED;
231
      LOG_WARN("has no task", KR(ret), K(job_));
232
    }
233
    ISTAT("generate task", KR(ret), K(job_), K(task_array_));
234
  }
235
  return ret;
236
}
237

238
int ObLSBalanceTaskHelper::generate_balance_job_()
239
{
240
  int ret = OB_SUCCESS;
241
  if (OB_UNLIKELY(!inited_)) {
242
    ret = OB_NOT_INIT;
243
    LOG_WARN("not init", KR(ret));
244
  } else if (OB_UNLIKELY(unit_group_balance_array_.count() <= 0)
245
      || OB_ISNULL(sql_proxy_)) {
246
    ret = OB_ERR_UNDEFINED;
247
    LOG_WARN("error unexpected", KR(ret), KP(sql_proxy_), K(unit_group_balance_array_));
248
  } else {
249
    bool lack_ls = false;
250
    bool redundant_ls = false;
251
    bool need_modify_ls_group = false;
252
    ObBalanceJobType job_type(ObBalanceJobType::BALANCE_JOB_LS);
253
    ObBalanceJobStatus job_status(ObBalanceJobStatus::BALANCE_JOB_STATUS_DOING);
254
    int64_t unit_group_num = 0;
255
    ObBalanceJobID job_id;
256
    ObString comment;
257
    const char* balance_stradegy = NULL;
258
    for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
259
      const ObUnitGroupBalanceInfo &balance_info = unit_group_balance_array_.at(i);
260
      if (balance_info.is_active_unit_group()) {
261
        unit_group_num++;
262
      }
263
      if (balance_info.get_lack_ls_count() > 0) {
264
        lack_ls = true;
265
        ISTAT("unit group has little ls than expected", K(balance_info));
266
      }
267
      if (balance_info.get_redundant_ls_array().count() > 0) {
268
        redundant_ls = true;
269
        ISTAT("unit group has more ls than expected", K(balance_info));
270
      }
271
      uint64_t ls_group_id = OB_INVALID_ID;
272
      for (int64_t j = 0;
273
           OB_SUCC(ret) && j < balance_info.get_normal_ls_array().count() &&
274
           !need_modify_ls_group; ++j) {
275
        const ObLSStatusInfo &ls_status_info = balance_info.get_normal_ls_array().at(j);
276
        if (OB_INVALID_ID == ls_status_info.ls_group_id_) {
277
          ret = OB_ERR_UNEXPECTED;
278
          LOG_WARN("ls group id not expected", KR(ret), K(ls_status_info));
279
        } else if (OB_INVALID_ID == ls_group_id) {
280
          ls_group_id = ls_status_info.ls_group_id_;
281
        } else if (ls_group_id != ls_status_info.ls_group_id_) {
282
          need_modify_ls_group = true;
283
          ISTAT("unit group has different ls group", K(ls_group_id), K(ls_status_info), K(balance_info));
284
        }
285
      }
286
    }
287
    if (OB_SUCC(ret)) {
288
      if (need_modify_ls_group) {
289
        balance_stradegy = share::LS_BALANCE_BY_ALTER;
290
      } else if (lack_ls && redundant_ls) {
291
        balance_stradegy = share::LS_BALANCE_BY_MIGRATE;
292
      } else if (lack_ls) {
293
        balance_stradegy = share::LS_BALANCE_BY_EXPAND;
294
      } else if (redundant_ls) {
295
        balance_stradegy = share::LS_BALANCE_BY_SHRINK;
296
      } else {
297
        ret = OB_ERR_UNEXPECTED;
298
        LOG_WARN("must has balance job for ls", KR(ret), K(unit_group_balance_array_));
299
      }
300

301
      if (FAILEDx(ObCommonIDUtils::gen_unique_id(tenant_id_, job_id))) {
302
        LOG_WARN("generate unique id for balance job fail", KR(ret), K(tenant_id_));
303
      } else if (OB_FAIL(job_.init(tenant_id_, job_id, job_type, job_status, primary_zone_num_,
304
              unit_group_num, comment, ObString(balance_stradegy)))) {
305
        LOG_WARN("failed to init job", KR(ret), K(tenant_id_), K(job_id), K(job_type),
306
            K(job_status), K(primary_zone_num_), K(unit_group_num), K(balance_stradegy));
307
      }
308
    }
309
  }
310
  return ret;
311
}
312

313
int ObLSBalanceTaskHelper::generate_alter_task_()
314
{
315
  int ret = OB_SUCCESS;
316
  if (OB_UNLIKELY(!inited_)) {
317
    ret = OB_NOT_INIT;
318
    LOG_WARN("not init", KR(ret));
319
  } else {
320
    uint64_t ls_group_id = OB_INVALID_ID;
321
    for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
322
      ObUnitGroupBalanceInfo &balance_info = unit_group_balance_array_.at(i);
323
      ls_group_id = OB_INVALID_ID;
324
      for (int64_t j = 0; OB_SUCC(ret) && j < balance_info.get_normal_ls_array().count(); ++j) {
325
        const ObLSStatusInfo &ls_status_info = balance_info.get_normal_ls_array().at(j);
326
        if (OB_INVALID_ID == ls_group_id) {
327
          ls_group_id = ls_status_info.ls_group_id_;
328
        } else if (ls_group_id != ls_status_info.ls_group_id_) {
329
          if (OB_FAIL(construct_ls_alter_task_(ls_status_info.ls_id_, ls_group_id))) {
330
            LOG_WARN("failed to construct ls alter task", KR(ret),
331
                     K(ls_status_info), K(ls_group_id));
332
          }
333
        }
334
      }
335
    }
336
  }
337
  return ret;
338
}
339

340
int ObLSBalanceTaskHelper::generate_migrate_task_()
341
{
342
  int ret = OB_SUCCESS;
343
  if (OB_UNLIKELY(!inited_)) {
344
    ret = OB_NOT_INIT;
345
    LOG_WARN("not init", KR(ret));
346
  } else {
347
    //get a redundant ls, and found one unit group less ls
348
    bool new_task = true;
349
    for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count() && new_task; ++i) {
350
      ObUnitGroupBalanceInfo &balance_info = unit_group_balance_array_.at(i);
351
      for (int64_t j = balance_info.get_redundant_ls_array().count() - 1; OB_SUCC(ret) && j >= 0 && new_task; --j) {
352
        //get one unit group, which less than primary_zone_unit_num
353
        const ObLSStatusInfo &ls_status = balance_info.get_redundant_ls_array().at(j);
354
        new_task = false;
355
        for (int64_t k = 0; OB_SUCC(ret) && k < unit_group_balance_array_.count(); ++k) {
356
          ObUnitGroupBalanceInfo &dest_balance_info = unit_group_balance_array_.at(k);
357
          if (dest_balance_info.get_lack_ls_count() > 0) {
358
            new_task = true;
359
            if (balance_info.get_unit_group_id() == dest_balance_info.get_unit_group_id()) {
360
              ret = OB_ERR_UNEXPECTED;
361
              LOG_WARN("ls group can not has more ls and lack ls", KR(ret),
362
                       K(i), K(k), K(j), K(balance_info), K(dest_balance_info));
363
            } else if (OB_FAIL(generate_ls_alter_task_(ls_status, dest_balance_info))) {
364
              LOG_WARN("failed to generate ls alter task", KR(ret), K(ls_status), K(dest_balance_info));
365
            }
366
          }
367
        }//end for k
368
        if (OB_SUCC(ret) && new_task) {
369
          //remove ls status from the unit group
370
          if (OB_FAIL(balance_info.remove_redundant_ls(j))) {
371
            LOG_WARN("failed to remove redundant ls", KR(ret), K(j));
372
          }
373
        }
374
      }//end for j
375
    }//end for i
376
  }
377
  return ret;
378
}
379

380
int ObLSBalanceTaskHelper::generate_ls_alter_task_(const ObLSStatusInfo &ls_status_info, ObUnitGroupBalanceInfo &dest_unit_group)
381
{
382
  int ret = OB_SUCCESS;
383
  if (OB_UNLIKELY(!inited_)) {
384
    ret = OB_NOT_INIT;
385
    LOG_WARN("not init", KR(ret));
386
  } else if (OB_UNLIKELY(!ls_status_info.is_valid()
387
                      || dest_unit_group.get_lack_ls_count() <= 0)) {
388
    ret = OB_INVALID_ARGUMENT;
389
    LOG_WARN("invalid argument", KR(ret), K(ls_status_info), K(dest_unit_group));
390
  } else {
391
    uint64_t ls_group_id = OB_INVALID_ID;
392
    ObLSStatusInfo dest_ls_status;
393
    if (dest_unit_group.get_normal_ls_array().count() > 0) {
394
      ls_group_id = dest_unit_group.get_normal_ls_array().at(0).ls_group_id_;
395
    } else if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_group_id(sql_proxy_, tenant_id_, ls_group_id))) {
396
      LOG_WARN("failed to fetch new ls id", KR(ret), K(tenant_id_));
397
    }
398
    if (FAILEDx(construct_ls_alter_task_(ls_status_info.ls_id_, ls_group_id))) {
399
      LOG_WARN("failed to construct ls alter task", KR(ret), K(ls_status_info), K(ls_group_id));
400
    } else if (OB_FAIL(dest_ls_status.init(ls_status_info.tenant_id_,
401
                                           ls_status_info.ls_id_, ls_group_id,
402
                                           ls_status_info.status_,
403
                                           ls_status_info.unit_group_id_,
404
                                           ls_status_info.primary_zone_,
405
                                           ls_status_info.get_flag()))) {
406
      LOG_WARN("failed to init ls status", KR(ret), K(ls_group_id), K(ls_status_info));
407
    } else if (OB_FAIL(dest_unit_group.add_ls_status_info(dest_ls_status))) {
408
      LOG_WARN("failed to add ls status info", KR(ret), K(dest_ls_status));
409
    }
410
  }
411
  return ret;
412
}
413

414
int ObLSBalanceTaskHelper::generate_expand_task_()
415
{
416
  int ret = OB_SUCCESS;
417
  if (OB_UNLIKELY(!inited_)) {
418
    ret = OB_NOT_INIT;
419
    LOG_WARN("not init", KR(ret));
420
  } else {
421
    int64_t lack_count = 0;
422
    ObSplitLSParamArray src_ls;
423
    ObArray<ObSplitLSParamArray> dest_ls;
424
    const double src_factor = 1;
425
    for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
426
      const ObUnitGroupBalanceInfo & balance_info = unit_group_balance_array_.at(i);
427
      for (int64_t j = 0; OB_SUCC(ret) && j < balance_info.get_normal_ls_array().count(); ++j) {
428
        ObSplitLSParam param(&balance_info.get_normal_ls_array().at(j), src_factor);
429
        if (OB_FAIL(src_ls.push_back(param))) {
430
          LOG_WARN("failed to push back param", KR(ret), K(param), K(i));
431
        }
432
      }
433
      if (OB_SUCC(ret)) {
434
        lack_count += balance_info.get_lack_ls_count();
435
      }
436
    }
437
    if (FAILEDx(construct_expand_dest_param_(lack_count, src_ls, dest_ls))) {
438
      LOG_WARN("failed to construct expand dest param", KR(ret), K(lack_count), K(src_ls));
439
    }
440
    int64_t dest_ls_index = 0;
441
    uint64_t ls_group_id = OB_INVALID_ID;
442
    for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
443
      ls_group_id = OB_INVALID_ID;
444
      const ObUnitGroupBalanceInfo &balance_info = unit_group_balance_array_.at(i);
445
      if (balance_info.get_normal_ls_array().count() > 0) {
446
        ls_group_id = balance_info.get_normal_ls_array().at(0).ls_group_id_;
447
      } else if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_group_id(sql_proxy_, tenant_id_, ls_group_id))) {
448
        LOG_WARN("failed to fetch new ls group id", KR(ret), K(tenant_id_));
449
      }
450
      for (int64_t j = 0; OB_SUCC(ret) && j < balance_info.get_lack_ls_count(); ++j) {
451
        if (OB_UNLIKELY(dest_ls_index >= dest_ls.count())) {
452
          ret = OB_ERR_UNEXPECTED;
453
          LOG_WARN("dest ls index not expected", KR(ret), K(dest_ls_index));
454
        } else if (OB_FAIL(generate_balance_task_for_expand_(dest_ls.at(dest_ls_index),
455
                                                      ls_group_id))) {
456
          LOG_WARN("failed to get balance task", KR(ret), K(i), K(j), K(ls_group_id),
457
                   "dest_ls_param", dest_ls.at(dest_ls_index));
458
        } else {
459
          ++dest_ls_index;
460
        }
461
      }
462
      if (OB_SUCC(ret)) {
463
        lack_count += balance_info.get_lack_ls_count();
464
      }
465
    }
466
  }
467
  return ret;
468
}
469

470
int ObLSBalanceTaskHelper::generate_shrink_task_()
471
{
472
  int ret = OB_SUCCESS;
473
  if (OB_UNLIKELY(!inited_)) {
474
    ret = OB_NOT_INIT;
475
    LOG_WARN("not init", KR(ret));
476
  } else {
477
    const int64_t normal_ls_count = job_.get_primary_zone_num() * job_.get_unit_group_num();
478
    ObSplitLSParamArray src_ls;
479
    ObArray<ObSplitLSParamArray> dest_ls;
480
    const double src_factor = 1;
481
    for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
482
      const ObUnitGroupBalanceInfo & balance_info = unit_group_balance_array_.at(i);
483
      for (int64_t j = 0; OB_SUCC(ret) && j < balance_info.get_redundant_ls_array().count(); ++j) {
484
        ObSplitLSParam param(&balance_info.get_redundant_ls_array().at(j), src_factor);
485
        if (OB_FAIL(src_ls.push_back(param))) {
486
          LOG_WARN("failed to push back param", KR(ret), K(param), K(i), K(j));
487
        }
488
      }
489
    }
490
    if (FAILEDx(construct_shrink_src_param_(normal_ls_count, src_ls, dest_ls))) {
491
      LOG_WARN("failed to construct expand dest param", KR(ret), K(normal_ls_count), K(src_ls));
492
    }
493
    int64_t dest_index = 0;
494
    for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_balance_array_.count(); ++i) {
495
      const ObUnitGroupBalanceInfo & balance_info = unit_group_balance_array_.at(i);
496
      for (int64_t j = 0; OB_SUCC(ret) && j < balance_info.get_normal_ls_array().count(); ++j) {
497
        if (OB_UNLIKELY(dest_ls.count() < dest_index)) {
498
          ret = OB_ERR_UNEXPECTED;
499
          LOG_WARN("src ls is unexpected", KR(ret), K(dest_ls), K(dest_ls));
500
        } else if (OB_FAIL(generate_task_for_shrink_(
501
                       dest_ls.at(dest_index++),
502
                       balance_info.get_normal_ls_array().at(j)))) {
503
          LOG_WARN("failed to generate task for shrink", KR(ret), K(dest_index), K(dest_ls), K(j), K(balance_info));
504
        }
505
      }
506
    }
507
  }
508
  return ret;
509
}
510

511
int ObLSBalanceTaskHelper::generate_task_for_shrink_(
512
    const ObSplitLSParamArray &src_split_param,
513
    const ObLSStatusInfo &ls_status_info)
514
{
515
  int ret = OB_SUCCESS;
516
  if (OB_UNLIKELY(!inited_)) {
517
    ret = OB_NOT_INIT;
518
    LOG_WARN("not init", KR(ret));
519
  } else if (OB_UNLIKELY(!job_.is_valid() || src_split_param.count() <= 0
520
                         || !ls_status_info.is_valid())) {
521
    ret = OB_ERR_UNDEFINED;
522
    LOG_WARN("error unexpected", KR(ret), K(job_), K(src_split_param), K(ls_status_info));
523
  } else {
524
    for (int64_t i = 0; OB_SUCC(ret) && i < src_split_param.count(); ++i) {
525
      const ObSplitLSParam &param = src_split_param.at(i);
526
      ObLSID merge_ls_id;
527
      if (fabs(param.get_current_factor() - 1.0) < OB_DOUBLE_EPSINON) {
528
        //nothing
529
        merge_ls_id = param.get_ls_info()->ls_id_;
530
      } else {
531
        if (param.get_ls_info()->ls_group_id_ == ls_status_info.ls_group_id_) {
532
          //need_transfer, no need merge
533
          if (OB_FAIL(generate_transfer_task_(param, ls_status_info))) {
534
            LOG_WARN("failed to generate transfer task", KR(ret), K(param));
535
          }
536
        } else {
537
          // need split
538
          ObSplitLSParamArray tmp_split_param;
539
          int64_t task_index = OB_INVALID_INDEX_INT64;
540
          if (OB_FAIL(tmp_split_param.push_back(param))) {
541
            LOG_WARN("failed to push back param", KR(ret), K(param));
542
          } else if (OB_FAIL(generate_ls_split_task_(tmp_split_param, task_index))) {
543
            LOG_WARN("failed to generate ls info", KR(ret), K(tmp_split_param));
544
          } else {
545
            merge_ls_id = task_array_.at(task_index).get_dest_ls_id();
546
          }
547
        }
548
      }
549
      if (OB_SUCC(ret)) {
550
        if (param.get_ls_info()->ls_group_id_ != ls_status_info.ls_group_id_) {
551
          //need alter task
552
          if (OB_FAIL(construct_ls_alter_task_(merge_ls_id, ls_status_info.ls_group_id_))) {
553
            LOG_WARN("failed to construct ls alter task", KR(ret), K(merge_ls_id), K(ls_status_info));
554
          }
555
        }
556
      }
557
      if (OB_SUCC(ret) && merge_ls_id.is_valid()) {
558
        //need merge
559
        if (OB_FAIL(construct_ls_merge_task_(merge_ls_id, ls_status_info.ls_id_,
560
                                            ls_status_info.ls_group_id_))) {
561
          LOG_WARN("failed to construct ls merge task", KR(ret), K(merge_ls_id), K(ls_status_info));
562
        }
563
      }
564
    }//end for
565
  }
566
  return ret;
567
}
568

569
int ObLSBalanceTaskHelper::generate_transfer_task_(
570
    const ObSplitLSParam &param, const ObLSStatusInfo &ls_status_info)
571
{
572
   int ret = OB_SUCCESS;
573
  if (OB_UNLIKELY(!inited_)) {
574
    ret = OB_NOT_INIT;
575
    LOG_WARN("not init", KR(ret));
576
  } else if (OB_UNLIKELY(!param.is_valid() || !ls_status_info.is_valid())) {
577
    ret = OB_INVALID_ARGUMENT;
578
    LOG_WARN("invalid argument", KR(ret), K(param), K(ls_status_info));
579
  } else {
580
    ObBalanceTaskType task_type(
581
        ObBalanceTaskType::BALANCE_TASK_TRANSFER);  // transfer task
582
    ObBalanceTask task;
583
    ObTransferPartList part_list;
584
    ObBalanceTaskID task_id;
585
    if (OB_FAIL(ObCommonIDUtils::gen_unique_id(tenant_id_, task_id))) {
586
      LOG_WARN("gen_unique_id for balance task failed", KR(ret), K(task_id),
587
               K_(tenant_id));
588
    } else if (OB_FAIL(construct_ls_part_info_(param, part_list))) {
589
      LOG_WARN("failed to construct ls part info", KR(ret), K(param));
590
    } else if (OB_FAIL(task.simple_init(
591
                   tenant_id_, job_.get_job_id(), task_id, task_type,
592
                   ls_status_info.ls_group_id_,
593
                   param.get_ls_info()->ls_id_, ls_status_info.ls_id_, part_list))) {
594
      LOG_WARN("failed to init task", KR(ret), K(tenant_id_), K(job_),
595
               K(task_id), K(task_type), K(part_list));
596
    } else if (OB_FAIL(task_array_.push_back(task))) {
597
      LOG_WARN("failed to push back task", KR(ret), K(task));
598
    }
599
    ISTAT("generate transfer task", KR(ret), K(task), K(job_));
600
  }
601
  return ret;
602
}
603

604
int ObLSBalanceTaskHelper::construct_shrink_src_param_(const int64_t target_count, ObSplitLSParamArray &src_ls,
605
      ObIArray<ObSplitLSParamArray> &dest_split_array)
606
{
607
  int ret = OB_SUCCESS;
608
  if (OB_UNLIKELY(!inited_)) {
609
    ret = OB_NOT_INIT;
610
    LOG_WARN("not init", KR(ret));
611
  } else if (OB_UNLIKELY(0 == target_count || 0 == src_ls.count())) {
612
    ret = OB_INVALID_ARGUMENT;
613
    LOG_WARN("invalid argument", KR(ret), K(target_count), K(src_ls));
614
  } else {
615
    const double each_ls_target_factor = double(src_ls.count()) / (target_count);
616
    if (each_ls_target_factor <= OB_DOUBLE_EPSINON) {
617
      ret = OB_ERR_UNEXPECTED;
618
      LOG_WARN("too many ls", KR(ret), K(each_ls_target_factor), K(target_count), K(src_ls));
619
    }
620
    for (int64_t i = 0; OB_SUCC(ret) && i < target_count; ++i) {
621
      double need_factor = each_ls_target_factor;
622
      ObSplitLSParamArray src_array;
623
      for (int64_t j = 0; OB_SUCC(ret) && j < src_ls.count() && need_factor > OB_DOUBLE_EPSINON; ++j) {
624
        ObSplitLSParam &param = src_ls.at(j);
625
        double get_factor = param.reduce_enough_factor(need_factor);
626
        if (!(get_factor)) { // strictly equal to zero
627
          //empty
628
        } else if (OB_DOUBLE_EPSINON >= get_factor) {
629
          ret = OB_ERR_UNEXPECTED;
630
          LOG_WARN("factor is too small", KR(ret), K(need_factor), K(src_ls), K(src_array), K(dest_split_array));
631
        } else {
632
          need_factor -= get_factor;
633
          if (OB_DOUBLE_EPSINON >= param.get_current_factor()) {
634
            param.reduce_all();
635
            //for ex
636
            //if current ls is 3, need shrink to 2, first ls need transfer, second need merge
637
            get_factor = 1;
638
          }
639
          ObSplitLSParam split_param(param.get_ls_info(), get_factor);
640
          LOG_TRACE("split param", KR(ret), K(split_param), K(i), K(j));
641
          if (OB_FAIL(src_array.push_back(split_param))) {
642
            LOG_WARN("failed to push back split param", KR(ret), K(split_param));
643
          }
644
        }
645
      }//end for j
646
      if (FAILEDx(dest_split_array.push_back(src_array))) {
647
        LOG_WARN("failed to push back src array", KR(ret), K(i), K(src_array));
648
      }
649
    }
650
  }
651
  return ret;
652
}
653

654
int ObLSBalanceTaskHelper::construct_expand_dest_param_(const int64_t lack_ls_count, ObSplitLSParamArray &src_ls,
655
      ObIArray<ObSplitLSParamArray> &dest_split_array)
656
{
657
  int ret = OB_SUCCESS;
658
  if (OB_UNLIKELY(!inited_)) {
659
    ret = OB_NOT_INIT;
660
    LOG_WARN("not init", KR(ret));
661
  } else if (OB_UNLIKELY(0 == lack_ls_count || 0 == src_ls.count())) {
662
    ret = OB_INVALID_ARGUMENT;
663
    LOG_WARN("invalid argument", KR(ret), K(lack_ls_count), K(src_ls));
664
  } else {
665
    const double each_ls_target_factor = double(src_ls.count()) / (src_ls.count() + lack_ls_count);
666
    if (each_ls_target_factor <= OB_DOUBLE_EPSINON) {
667
      ret = OB_ERR_UNEXPECTED;
668
      LOG_WARN("too many lack ls count", KR(ret), K(each_ls_target_factor), K(lack_ls_count), K(src_ls));
669
    }
670
    for (int64_t i = 0; OB_SUCC(ret) && i < lack_ls_count; ++i) {
671
      double need_factor = each_ls_target_factor;
672
      ObSplitLSParamArray src_array;
673
      for (int64_t j = 0; OB_SUCC(ret) && j < src_ls.count() && need_factor > OB_DOUBLE_EPSINON; ++j) {
674
        ObSplitLSParam &param = src_ls.at(j);
675
        double get_factor = param.reduce_factor_for_dest(need_factor, each_ls_target_factor);
676
        if (get_factor > OB_DOUBLE_EPSINON) {
677
          ObSplitLSParam split_param(param.get_ls_info(), get_factor);
678
          need_factor -= get_factor;
679
          if (OB_FAIL(src_array.push_back(split_param))) {
680
            LOG_WARN("failed to push back split param", KR(ret), K(split_param));
681
          }
682
        }
683
      }
684
      if (OB_FAIL(ret)) {
685
      } else if (OB_UNLIKELY(0 >= src_array.count())) {
686
        ret = OB_ERR_UNEXPECTED;
687
        LOG_WARN("src array is empty", KR(ret), K(src_ls));
688
      } else if (OB_FAIL(dest_split_array.push_back(src_array))) {
689
        LOG_WARN("failed to push back src array", KR(ret), K(i), K(src_array));
690
      }
691
    }
692
  }
693
  return ret;
694
}
695

696
int ObLSBalanceTaskHelper::generate_balance_task_for_expand_(
697
    const ObSplitLSParamArray &dest_split_param, const uint64_t ls_group_id)
698
{
699
  int ret = OB_SUCCESS;
700
  if (OB_UNLIKELY(!inited_)) {
701
    ret = OB_NOT_INIT;
702
    LOG_WARN("not init", KR(ret));
703
  } else if (OB_UNLIKELY(!job_.is_valid() || dest_split_param.count() <= 0
704
                         || OB_INVALID_ID == ls_group_id)) {
705
    ret = OB_ERR_UNEXPECTED;
706
    LOG_WARN("error unexpected", KR(ret), K(job_), K(dest_split_param), K(ls_group_id));
707
  } else {
708
    //generate new ls info for split
709
    int64_t task_begin_index = OB_INVALID_INDEX_INT64;
710
    if (OB_FAIL(generate_ls_split_task_(dest_split_param, task_begin_index))) {
711
      LOG_WARN("failed to generate ls info", KR(ret), K(dest_split_param));
712
    } else if (OB_UNLIKELY(task_begin_index < 0 || task_begin_index > task_array_.count())) {
713
      ret = OB_ERR_UNEXPECTED;
714
      LOG_WARN("task_begin_index is invalid", KR(ret), K(task_begin_index));
715
    }
716

717
    for (int64_t i = task_begin_index; OB_SUCC(ret) && i < task_array_.count(); ++i) {
718
      if (ls_group_id != task_array_.at(i).get_ls_group_id()) {
719
        if (OB_FAIL(construct_ls_alter_task_(task_array_.at(i).get_dest_ls_id(), ls_group_id))) {
720
          LOG_WARN("failed to init task", KR(ret), K(task_array_.at(i)), K(ls_group_id));
721
        }
722
      }
723
    }
724
    if (OB_SUCC(ret)) {
725
      ObLSID dest_ls_id = task_array_.at(task_begin_index).get_dest_ls_id();
726
      for (int64_t i = task_begin_index + 1; OB_SUCC(ret) && i < task_array_.count(); ++i) {
727
        if (task_array_.at(i).get_task_type().is_split_task()) {
728
          if (OB_FAIL(construct_ls_merge_task_(task_array_.at(i).get_dest_ls_id(),
729
                  dest_ls_id, ls_group_id))) {
730
            LOG_WARN("failed to construct ls merge task", KR(ret),
731
                K(task_array_.at(i)), K(dest_ls_id), K(ls_group_id));
732
          }
733
        }
734
      }
735
    }
736
  }
737
  return ret;
738
}
739
int ObLSBalanceTaskHelper::generate_ls_split_task_(const ObSplitLSParamArray &dest_split_param,
740
                                                       int64_t &task_begin_index)
741
{
742
  int ret = OB_SUCCESS;
743
  if (OB_UNLIKELY(!inited_)) {
744
    ret = OB_NOT_INIT;
745
    LOG_WARN("not init", KR(ret));
746
  } else if (OB_UNLIKELY(!job_.is_valid() || dest_split_param.count() <= 0)) {
747
    ret = OB_ERR_UNDEFINED;
748
    LOG_WARN("error unexpected", KR(ret), K(job_), K(dest_split_param));
749
  }
750
  ObBalanceTask task;
751
  ObBalanceTaskType task_type(ObBalanceTaskType::BALANCE_TASK_SPLIT);//split task
752
  ObTransferPartList part_list;//TODO
753
  task_begin_index = task_array_.count();
754
  for (int64_t i = 0; OB_SUCC(ret) && i < dest_split_param.count(); ++i) {
755
    // split task has equal ls group id with source
756
    //TODO part_list fill partition_info of task
757
    task.reset();
758
    ObLSID dest_ls_id;
759
    ObBalanceTaskID task_id;
760
    const share::ObLSStatusInfo *src_ls = dest_split_param.at(i).get_ls_info();
761
    if (OB_ISNULL(src_ls)) {
762
      ret = OB_ERR_UNEXPECTED;
763
      LOG_WARN("src ls is null", KR(ret), K(i), K(dest_split_param));
764
    } else if (OB_FAIL(construct_ls_part_info_(dest_split_param.at(i), part_list))) {
765
      LOG_WARN("failed to construct ls part info", KR(ret), KPC(src_ls));
766
    } else if (OB_FAIL(ObLSServiceHelper::fetch_new_ls_id(sql_proxy_, tenant_id_, dest_ls_id))) {
767
      LOG_WARN("failed to fetch new ls id", KR(ret), K(tenant_id_));
768
    } else if (OB_FAIL(ObCommonIDUtils::gen_unique_id(tenant_id_, task_id))) {
769
      LOG_WARN("failed to gen unique id", KR(ret), K(tenant_id_));
770
    } else if (OB_FAIL(task.simple_init(tenant_id_, job_.get_job_id(), task_id, task_type,
771
                          src_ls->ls_group_id_, src_ls->ls_id_, dest_ls_id,
772
                          part_list))) {
773
      LOG_WARN("failed to init task", KR(ret), K(tenant_id_), K(job_), K(task_id), K(task_type),
774
               KPC(src_ls), K(dest_ls_id), K(part_list));
775
    } else if (OB_FAIL(task_array_.push_back(task))) {
776
      LOG_WARN("failed to push back task", KR(ret), K(task));
777
    }
778
    ISTAT("generate split task", KR(ret), K(task), K(job_));
779
  }
780
  return ret;
781
}
782

783
int ObLSBalanceTaskHelper::construct_ls_alter_task_(const share::ObLSID &ls_id, const uint64_t ls_group_id)
784
{
785
  int ret = OB_SUCCESS;
786
  if (OB_UNLIKELY(!inited_)) {
787
    ret = OB_NOT_INIT;
788
    LOG_WARN("not init", KR(ret));
789
  } else if (OB_UNLIKELY(!job_.is_valid() || !ls_id.is_valid()
790
                         || OB_INVALID_ID == ls_group_id)) {
791
    ret = OB_INVALID_ARGUMENT;
792
    LOG_WARN("invalid argument", KR(ret), K(job_), K(ls_id), K(ls_group_id));
793
  } else {
794
    //for alter
795
    ObBalanceTask task;
796
    ObBalanceTaskID task_id;
797
    ObBalanceTaskType task_type(ObBalanceTaskType::BALANCE_TASK_ALTER);
798
    ObTransferPartList part_list;
799
    ObLSID dest_ls_id;
800
    if (OB_FAIL(ObCommonIDUtils::gen_unique_id(tenant_id_, task_id))) {
801
      LOG_WARN("failed to gen unique id", KR(ret), K(tenant_id_));
802
    } else if (OB_FAIL(task.simple_init(tenant_id_, job_.get_job_id(),
803
                                            task_id, task_type, ls_group_id,
804
                                            ls_id,
805
                                            dest_ls_id,
806
                                            part_list))) {
807
      LOG_WARN("failed to init task", KR(ret), K(tenant_id_), K(job_),
808
                   K(task_id), K(task_type), K(ls_id), K(part_list));
809
    } else if (OB_FAIL(task_array_.push_back(task))) {
810
      LOG_WARN("failed to push back task", KR(ret), K(task));
811
    }
812
    ISTAT("generate alter task", KR(ret), K(task), K(job_));
813
  }
814
  return ret;
815
}
816

817
int ObLSBalanceTaskHelper::construct_ls_merge_task_(
818
    const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id,
819
    const uint64_t ls_group_id)
820
{
821
  int ret = OB_SUCCESS;
822
  if (OB_UNLIKELY(!inited_)) {
823
    ret = OB_NOT_INIT;
824
    LOG_WARN("not init", KR(ret));
825
  } else if (OB_UNLIKELY(!job_.is_valid() || !src_ls_id.is_valid()
826
                         || OB_INVALID_ID == ls_group_id
827
                         || !dest_ls_id.is_valid())) {
828
    ret = OB_INVALID_ARGUMENT;
829
    LOG_WARN("invalid argument", KR(ret), K(job_), K(src_ls_id), K(ls_group_id), K(dest_ls_id));
830
  } else {
831
    //for merge
832
    ObBalanceTask task;
833
    ObBalanceTaskID task_id;
834
    ObBalanceTaskType task_type(ObBalanceTaskType::BALANCE_TASK_MERGE);// merge task
835
    ObTransferPartList part_list;
836
    if (OB_FAIL(ObCommonIDUtils::gen_unique_id(tenant_id_, task_id))) {
837
      LOG_WARN("failed to gen unique id", KR(ret), K(tenant_id_));
838
    } else if (OB_FAIL(task.simple_init(tenant_id_, job_.get_job_id(),
839
                                            task_id, task_type, ls_group_id,
840
                                            src_ls_id,
841
                                            dest_ls_id,
842
                                            part_list))) {
843
      LOG_WARN("failed to init task", KR(ret), K(tenant_id_), K(job_),
844
                   K(task_id), K(task_type), K(dest_ls_id), K(src_ls_id), K(part_list));
845
    } else if (OB_FAIL(task_array_.push_back(task))) {
846
      LOG_WARN("failed to push back task", KR(ret), K(task));
847
    }
848
    ISTAT("generate merge task", KR(ret), K(task), K(job_));
849
  }
850
  return ret;
851
}
852

853
int ObLSBalanceTaskHelper::construct_ls_part_info_(const ObSplitLSParam &src_ls, ObTransferPartList &part_list)
854
{
855
  int ret = OB_SUCCESS;
856
  ObLSID src_ls_id = src_ls.get_ls_id();
857
  const double factor = src_ls.get_current_factor();
858
  ObLSBalanceGroupInfo *ls_bg_info = NULL;
859

860
  part_list.reset();
861

862
  if (OB_UNLIKELY(!inited_)) {
863
    ret = OB_NOT_INIT;
864
    LOG_WARN("not init", KR(ret));
865
  } else if (OB_UNLIKELY(!src_ls.is_valid() || !src_ls_id.is_valid())) {
866
    ret = OB_INVALID_ARGUMENT;
867
    LOG_WARN("src ls is invalid", KR(ret), K(src_ls), K(src_ls_id));
868
  } else if (OB_FAIL(tenant_ls_bg_info_.get(src_ls_id, ls_bg_info))) {
869
    if (OB_HASH_NOT_EXIST == ret) {
870
      ret = OB_SUCCESS;
871
      ISTAT("src ls is empty, no need to transfer out", KR(ret), K(src_ls_id));
872
    } else {
873
      LOG_WARN("get src ls balance group info fail", KR(ret), K(src_ls_id), K(src_ls));
874
    }
875
  } else if (OB_ISNULL(ls_bg_info)) {
876
    ret = OB_ERR_UNEXPECTED;
877
    LOG_WARN("invalid ls balance group info", KR(ret), K(ls_bg_info), K(src_ls_id));
878
  } else if (OB_FAIL(ls_bg_info->transfer_out_by_factor(factor, part_list))) {
879
    LOG_WARN("transfer out part list from LS balance group info fail", KR(ret), K(factor),
880
        KPC(ls_bg_info), K(part_list));
881
  }
882
  return ret;
883
}
884

885
#undef ISTAT
886
#undef WSTAT
887

888
}
889
}
890

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.