oceanbase
236 строк · 6.6 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS_LB
14#include "ob_balance_info.h"
15
16#include "lib/container/ob_array.h"
17#include "lib/container/ob_array_iterator.h"
18#include "lib/container/ob_se_array.h"
19#include "lib/container/ob_se_array_iterator.h"
20#include "lib/hash/ob_hashset.h"
21#include "lib/profile/ob_trace_id.h"
22#include "share/ob_global_stat_proxy.h"
23#include "share/schema/ob_table_schema.h"
24#include "share/schema/ob_schema_getter_guard.h"
25#include "share/schema/ob_part_mgr_util.h"
26#include "ob_unit_manager.h"
27#include "ob_zone_manager.h"
28#include "ob_root_utils.h"
29#include "ob_root_service.h"
30#include "ob_resource_weight_parser.h"
31#include "observer/ob_server_struct.h"
32#include "observer/omt/ob_tenant_config_mgr.h"
33#include "lib/mysqlclient/ob_mysql_proxy.h"
34#include "storage/ob_file_system_router.h"
35#include "storage/tx/ob_i_ts_source.h"
36
37using namespace oceanbase::common;
38using namespace oceanbase::common::hash;
39using namespace oceanbase::rootserver;
40using namespace oceanbase::share;
41using namespace oceanbase::share::schema;
42
43////////////////
44ObStatisticsCalculator::ObStatisticsCalculator()
45:sum_(0)
46{}
47
48void ObStatisticsCalculator::reset()
49{
50values_.reset();
51sum_ = 0;
52}
53
54int ObStatisticsCalculator::add_value(double v)
55{
56sum_ += v;
57return values_.push_back(v);
58}
59
60double ObStatisticsCalculator::get_avg()
61{
62double avg = 0;
63if (values_.count() > 0) {
64avg = sum_ / static_cast<double>(values_.count());
65}
66return avg;
67}
68
69double ObStatisticsCalculator::get_standard_deviation()
70{
71double sd = 0;
72int64_t n = values_.count();
73if (n > 0) {
74double avg = get_avg();
75FOREACH(it, values_) {
76double d = (*it) - avg;
77sd += d * d;
78}
79sd = sqrt(sd/static_cast<double>(n));
80}
81return sd;
82}
83
84int ZoneUnit::assign(const ZoneUnit &other)
85{
86int ret = OB_SUCCESS;
87zone_ = other.zone_;
88active_unit_cnt_ = other.active_unit_cnt_;
89
90load_imbalance_ = other.load_imbalance_;
91cpu_imbalance_ = other.cpu_imbalance_;
92disk_imbalance_ = other.disk_imbalance_;
93iops_imbalance_ = other.iops_imbalance_;
94memory_imbalance_ = other.memory_imbalance_;
95load_avg_ = other.load_avg_;
96cpu_avg_ = other.cpu_avg_;
97disk_avg_ = other.disk_avg_;
98iops_avg_ = other.iops_avg_;
99memory_avg_ = other.memory_avg_;
100
101tg_pg_cnt_ = other.tg_pg_cnt_;
102if (OB_FAIL(copy_assign(all_unit_, other.all_unit_))) {
103LOG_WARN("failed to assign all_unit_", K(ret));
104}
105return ret;
106}
107
108bool ServerStat::can_migrate_in() const
109{
110return !blocked_ && active_ && online_;;
111}
112
113
114int UnitStat::assign(const UnitStat &other)
115{
116int ret = OB_SUCCESS;
117server_ = other.server_;
118in_pool_ = other.in_pool_;
119load_factor_ = other.load_factor_;
120capacity_ = other.capacity_;
121load_ = other.load_;
122tg_pg_cnt_ = other.tg_pg_cnt_;
123outside_replica_cnt_ = other.outside_replica_cnt_;
124inside_replica_cnt_ = other.inside_replica_cnt_;
125if (OB_FAIL(copy_assign(info_, other.info_))) {
126LOG_WARN("failed to assign info_", K(ret));
127}
128return ret;
129}
130
131UnitStat &UnitStat::operator=(const UnitStat &other)
132{
133int ret = OB_SUCCESS;
134if (this != &other) {
135if (OB_FAIL(assign(other))) {
136LOG_WARN("fail to assign", K(ret));
137}
138}
139return *this;
140}
141
142double UnitStat::get_load_if(ObResourceWeight &weights,
143const LoadFactor &load_factor, const bool plus) const
144{
145LoadFactor new_factor = load_factor_;
146if (plus) {
147new_factor += load_factor;
148} else {
149new_factor -= load_factor;
150}
151return weights.cpu_weight_ * (new_factor.get_cpu_usage()/get_cpu_limit())
152+ weights.memory_weight_ * (new_factor.get_memory_usage()/get_memory_limit())
153+ weights.disk_weight_ * (new_factor.get_disk_usage()/get_disk_limit())
154+ weights.iops_weight_ * (new_factor.get_iops_usage()/get_iops_limit());
155}
156
157double UnitStat::calc_load(ObResourceWeight &weights,
158const LoadFactor &load_factor) const
159{
160return weights.cpu_weight_ * (load_factor.get_cpu_usage()/get_cpu_limit())
161+ weights.memory_weight_ * (load_factor.get_memory_usage()/get_memory_limit())
162+ weights.disk_weight_ * (load_factor.get_disk_usage()/get_disk_limit())
163+ weights.iops_weight_ * (load_factor.get_iops_usage()/get_iops_limit());
164}
165
166int ServerReplicaCountMgr::init(const ObIArray<ObAddr> &servers)
167{
168int ret = OB_SUCCESS;
169// allow init twice
170inited_ = false;
171server_replica_counts_.reuse();
172ServerReplicaCount server_replica_count;
173FOREACH_CNT_X(server, servers, OB_SUCCESS == ret) {
174if (!server->is_valid()) {
175ret = OB_INVALID_ARGUMENT;
176LOG_WARN("invalid server", "server", *server, K(ret));
177} else {
178server_replica_count.reset();
179server_replica_count.server_ = *server;
180server_replica_count.replica_count_ = 0;
181if (OB_FAIL(server_replica_counts_.push_back(server_replica_count))) {
182LOG_WARN("push_back failed", K(ret));
183}
184}
185}
186if (OB_SUCC(ret)) {
187inited_ = true;
188}
189return ret;
190}
191
192int ServerReplicaCountMgr::accumulate(
193const ObAddr &server, const int64_t cnt)
194{
195int ret = OB_SUCCESS;
196if (!inited_) {
197ret = OB_NOT_INIT;
198LOG_WARN("not init", K(ret));
199} else if (!server.is_valid() || cnt < 0) {
200ret = OB_INVALID_ARGUMENT;
201LOG_WARN("invalid server", K(server), K(cnt), K(ret));
202} else {
203FOREACH_CNT(server_replica_count, server_replica_counts_) {
204if (server_replica_count->server_ == server) {
205server_replica_count->replica_count_ += cnt;
206break;
207}
208}
209}
210return ret;
211}
212
213int ServerReplicaCountMgr::get_replica_count(
214const ObAddr &server, int64_t &replica_count)
215{
216int ret = OB_SUCCESS;
217if (!inited_) {
218ret = OB_NOT_INIT;
219LOG_WARN("not init", K(ret));
220} else if (!server.is_valid()) {
221ret = OB_INVALID_ARGUMENT;
222LOG_WARN("invalid server", K(server), K(ret));
223} else {
224bool found = false;
225FOREACH_CNT_X(server_replica_count, server_replica_counts_, !found) {
226if (server_replica_count->server_ == server) {
227replica_count = server_replica_count->replica_count_;
228found = true;
229}
230}
231if (!found) {
232ret = OB_ENTRY_NOT_EXIST;
233}
234}
235return ret;
236}
237