oceanbase
547 строк · 25.9 Кб
1// Copyright (c) 2021 OceanBase
2// OceanBase is licensed under Mulan PubL v2.
3// You can use this software according to the terms and conditions of the Mulan PubL v2.
4// You may obtain a copy of Mulan PubL v2 at:
5// http://license.coscl.org.cn/MulanPubL-2.0
6// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
7// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
8// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
9// See the Mulan PubL v2 for more details.
10#include <cstdio>
11#include <gtest/gtest.h>
12#include <signal.h>
13#define private public
14#include "env/ob_simple_log_cluster_env.h"
15#undef private
16
17const std::string TEST_NAME = "throttling_arb";
18
19using namespace oceanbase::common;
20using namespace oceanbase;
21namespace oceanbase
22{
23using namespace logservice;
24
25namespace logservice
26{
27
28void ObArbitrationService::update_arb_timeout_()
29{
30arb_timeout_us_ = 2 * 1000 * 1000L;
31if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
32CLOG_LOG_RET(WARN, OB_ERR_UNEXPECTED, "update_arb_timeout_", K_(self), K_(arb_timeout_us));
33}
34}
35}
36
37namespace unittest
38{
39
40class TestObSimpleLogThrottleArb : public ObSimpleLogClusterTestEnv
41{
42public:
43TestObSimpleLogThrottleArb() : ObSimpleLogClusterTestEnv()
44{}
45
46void set_palf_disk_options(PalfEnvImpl &palf_env_impl, const PalfDiskOptions &disk_options)
47{
48palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = disk_options;
49palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_ = disk_options;
50}
51};
52
53int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;
54int64_t ObSimpleLogClusterTestBase::node_cnt_ = 5;
55bool ObSimpleLogClusterTestBase::need_add_arb_server_ = true;
56std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
57
58MockLocCB loc_cb;
59
60const int64_t KB = 1024L;
61const int64_t MB = 1024 * 1024L;
62TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_major)
63{
64oceanbase::common::ObClusterVersion::get_instance().cluster_version_ = CLUSTER_VERSION_4_1_0_0;
65SET_CASE_LOG_FILE(TEST_NAME, "arb_throttling_major");
66int ret = OB_SUCCESS;
67PALF_LOG(INFO, "begin test_2f1a_throttling_major");
68int64_t leader_idx = 0;
69int64_t arb_replica_idx = -1;
70PalfHandleImplGuard leader;
71std::vector<PalfHandleImplGuard*> palf_list;
72const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
73const int64_t id = ATOMIC_AAF(&palf_id_, 1);
74common::ObMember dummy_member;
75ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
76ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
77loc_cb.leader_ = leader.palf_handle_impl_->self_;
78
79PalfEnv *palf_env = NULL;
80ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
81const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_;
82
83const int64_t another_f_idx = (leader_idx+1)%3;
84const int64_t follower_D_idx = (leader_idx + 3);
85
86set_disk_options_for_throttling(palf_env->palf_env_impl_);
87ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
88set_disk_options_for_throttling(palf_env->palf_env_impl_);
89ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
90set_disk_options_for_throttling(palf_env->palf_env_impl_);
91
92PALF_LOG(INFO, "[CASE 1]: prepare for throttling");
93leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
94ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
95LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
96wait_lsn_until_flushed(max_lsn, leader);
97
98int64_t throttling_percentage = 60;
99ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
100palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
101
102ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
103palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
104
105ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
106palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
107
108PALF_LOG(INFO, "[CASE 1.1]: MAJOR degrade");
109block_net(leader_idx, another_f_idx);
110sleep(1);
111
112// do not check OB_SUCCESS, may return OB_NOT_MASTER during degrading member
113leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
114palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
115ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
116int64_t begin_ts = common::ObClockGenerator::getClock();
117EXPECT_TRUE(is_degraded(leader, another_f_idx));
118int64_t end_ts = common::ObClockGenerator::getClock();
119int64_t used_time = end_ts - begin_ts;
120ASSERT_TRUE(used_time < 2 * 1000 * 1000L);
121PALF_LOG(INFO, " MAJOR degrade", K(used_time));
122ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
123
124//submit some log
125palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
126usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
127ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 1 * MB));
128
129palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
130usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
131PALF_LOG(INFO, "[CASE 1.2] MAJOR upgrade");
132leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
133palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
134ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB));
135unblock_net(leader_idx, another_f_idx);
136begin_ts = common::ObClockGenerator::getClock();
137PALF_LOG(INFO, "[CASE 1.2] begin MAJOR upgrade", K(used_time));
138ASSERT_TRUE(is_upgraded(leader, id));
139end_ts = common::ObClockGenerator::getClock();
140used_time = end_ts - begin_ts;
141PALF_LOG(INFO, "[CASE 1.2] end MAJOR upgrade", K(used_time));
142ASSERT_TRUE(used_time < 5 * 1000 * 1000L);
143ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
144
145PALF_LOG(INFO, "[CASE 1.3]: MAJOR replace_member(OB_TIMEOUT)");
146leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
147palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
148palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
149ASSERT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 128 * KB));
150
151LogConfigVersion config_version;
152ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
153ret = leader.palf_handle_impl_->replace_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1),
154ObMember(get_cluster()[another_f_idx]->get_addr(), 1),
155config_version,
156CONFIG_CHANGE_TIMEOUT);
157//timeout because added member can flush new meta when prev log is throttling
158ASSERT_TRUE(OB_TIMEOUT == ret || OB_SUCCESS == ret);
159int64_t new_leader_idx = OB_TIMEOUT == ret ? another_f_idx : follower_D_idx;
160ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
161
162PALF_LOG(INFO, "[CASE 1.4]: MAJOR switch_leader");
163PalfHandleImplGuard new_leader;
164leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
165palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
166palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
167ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
168int64_t switch_start_ts = common::ObClockGenerator::getClock();
169ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader));
170int64_t switch_end_ts = common::ObClockGenerator::getClock();
171used_time = switch_end_ts - switch_start_ts;
172PALF_LOG(INFO, "[CASE 1.4 end ] end switch_leader", K(used_time));
173// ASSERT_EQ(true, used_time < 2 * 1000 * 1000);
174
175new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
176ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
177max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
178wait_lsn_until_flushed(max_lsn, new_leader);
179
180ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
181set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
182
183ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
184set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
185
186ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
187set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
188
189revert_cluster_palf_handle_guard(palf_list);
190leader.reset();
191new_leader.reset();
192delete_paxos_group(id);
193PALF_LOG(INFO, "end test_2f1a_degrade_upgrade", K(id));
194}
195
196TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_leader)
197{
198oceanbase::common::ObClusterVersion::get_instance().cluster_version_ = CLUSTER_VERSION_4_1_0_0;
199SET_CASE_LOG_FILE(TEST_NAME, "arb_throttling_minor_leader");
200int ret = OB_SUCCESS;
201PALF_LOG(INFO, "begin arb_throttling_minor_leader");
202int64_t leader_idx = 0;
203int64_t arb_replica_idx = -1;
204PalfHandleImplGuard leader;
205std::vector<PalfHandleImplGuard*> palf_list;
206const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
207const int64_t id = ATOMIC_AAF(&palf_id_, 1);
208common::ObMember dummy_member;
209ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
210ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
211loc_cb.leader_ = leader.palf_handle_impl_->self_;
212
213PalfEnv *palf_env = NULL;
214ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
215const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_;
216
217const int64_t another_f_idx = (leader_idx+1)%3;
218const int64_t follower_D_idx = (leader_idx + 3);
219
220set_disk_options_for_throttling(palf_env->palf_env_impl_);
221ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
222set_disk_options_for_throttling(palf_env->palf_env_impl_);
223ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
224set_disk_options_for_throttling(palf_env->palf_env_impl_);
225
226PALF_LOG(INFO, "[CASE 2]: prepare for throttling");
227leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
228ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
229LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
230wait_lsn_until_flushed(max_lsn, leader);
231
232int64_t throttling_percentage = 60;
233ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
234palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
235
236PALF_LOG(INFO, "[CASE 2.1]: MONOR_LEADER degrade");
237block_net(leader_idx, another_f_idx);
238sleep(1);
239
240// do not check OB_SUCCESS, may return OB_NOT_MASTER during degrading member
241leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
242ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
243int64_t begin_ts = common::ObClockGenerator::getClock();
244EXPECT_TRUE(is_degraded(leader, another_f_idx));
245int64_t end_ts = common::ObClockGenerator::getClock();
246int64_t used_time = end_ts - begin_ts;
247ASSERT_TRUE(used_time < 2 * 1000 * 1000L);
248PALF_LOG(INFO, " MINOR_LEADER degrade", K(used_time));
249ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
250
251//submit some log
252palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
253usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
254ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 1 * MB));
255
256palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
257usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
258PALF_LOG(INFO, "[CASE 2.2] MINOR_LEADER upgrade");
259//upgrade may need many time because of target replica has not fetch log
260leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
261ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
262unblock_net(leader_idx, another_f_idx);
263begin_ts = common::ObClockGenerator::getClock();
264PALF_LOG(INFO, "[CASE 2.2] begin MINOR_LEADER upgrade", K(used_time));
265ASSERT_TRUE(is_upgraded(leader, id));
266end_ts = common::ObClockGenerator::getClock();
267used_time = end_ts - begin_ts;
268PALF_LOG(INFO, "[CASE 2.2] end MINOR_LEADER upgrade", K(used_time));
269ASSERT_TRUE(used_time < 5 * 1000 * 1000L);
270ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
271
272// PALF_LOG(INFO, "[CASE 2.3]: MINOR_LEADER replace_member");
273// leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
274//
275// ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
276//
277// LogConfigVersion config_version;
278//
279// const int64_t CONFIG_CHANGE_TIMEOUT_NEW = 20 * 1000 * 1000L; // 10s
280// ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
281// ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->replace_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1),
282// ObMember(get_cluster()[another_f_idx]->get_addr(), 1),
283// config_version,
284// CONFIG_CHANGE_TIMEOUT_NEW));
285// ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
286
287PALF_LOG(INFO, "[CASE 2.4]: MINOR_LEADER switch_leader");
288int64_t new_leader_idx = another_f_idx;
289PalfHandleImplGuard new_leader;
290leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
291ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
292int64_t switch_start_ts = common::ObClockGenerator::getClock();
293ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader));
294int64_t switch_end_ts = common::ObClockGenerator::getClock();
295used_time = switch_end_ts - switch_start_ts;
296PALF_LOG(INFO, "[CASE 2.4 end ] end switch_leader", K(used_time));
297// ASSERT_EQ(true, used_time < 2 * 1000 * 1000);
298
299new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
300ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
301max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
302wait_lsn_until_flushed(max_lsn, new_leader);
303
304ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
305set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
306
307ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
308set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
309
310ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
311set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
312
313revert_cluster_palf_handle_guard(palf_list);
314leader.reset();
315new_leader.reset();
316delete_paxos_group(id);
317PALF_LOG(INFO, "end test_2f1a_degrade_upgrade", K(id));
318}
319
320TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_follower)
321{
322oceanbase::common::ObClusterVersion::get_instance().cluster_version_ = CLUSTER_VERSION_4_1_0_0;
323SET_CASE_LOG_FILE(TEST_NAME, "arb_throttling_minor_follower");
324int ret = OB_SUCCESS;
325PALF_LOG(INFO, "begin arb_throttling_minor_follower");
326int64_t leader_idx = 0;
327int64_t arb_replica_idx = -1;
328PalfHandleImplGuard leader;
329std::vector<PalfHandleImplGuard*> palf_list;
330const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
331const int64_t id = ATOMIC_AAF(&palf_id_, 1);
332common::ObMember dummy_member;
333ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
334ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
335loc_cb.leader_ = leader.palf_handle_impl_->self_;
336
337PalfEnv *palf_env = NULL;
338ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
339const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_;
340
341const int64_t another_f_idx = (leader_idx+1)%3;
342const int64_t follower_D_idx = (leader_idx + 3);
343
344set_disk_options_for_throttling(palf_env->palf_env_impl_);
345ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
346set_disk_options_for_throttling(palf_env->palf_env_impl_);
347ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
348set_disk_options_for_throttling(palf_env->palf_env_impl_);
349
350PALF_LOG(INFO, "[CASE 3]: prepare for throttling");
351leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
352palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
353palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
354ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
355LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
356wait_lsn_until_flushed(max_lsn, leader);
357
358int64_t throttling_percentage = 60;
359ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
360palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
361
362PALF_LOG(INFO, "[CASE 3.1]: MONOR_LEADER degrade");
363block_net(leader_idx, another_f_idx);
364sleep(1);
365
366// do not check OB_SUCCESS, may return OB_NOT_MASTER during degrading member
367leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
368palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
369palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
370ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
371int64_t begin_ts = common::ObClockGenerator::getClock();
372EXPECT_TRUE(is_degraded(leader, another_f_idx));
373int64_t end_ts = common::ObClockGenerator::getClock();
374int64_t used_time = end_ts - begin_ts;
375ASSERT_TRUE(used_time < 2 * 1000 * 1000L);
376PALF_LOG(INFO, " MINOR_FOLLOWER degrade", K(used_time));
377ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
378
379//submit some log
380palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
381usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
382ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 1 * MB));
383
384//palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
385// usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
386PALF_LOG(INFO, "[CASE 3.2] MINOR_FOLLOWER upgrade");
387leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
388palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
389palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
390ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
391unblock_net(leader_idx, another_f_idx);
392begin_ts = common::ObClockGenerator::getClock();
393PALF_LOG(INFO, "[CASE 3.2] begin MINOR_FOLLOWER upgrade", K(used_time));
394ASSERT_TRUE(is_upgraded(leader, id));
395end_ts = common::ObClockGenerator::getClock();
396used_time = end_ts - begin_ts;
397PALF_LOG(INFO, "[CASE 3.2] end MINOR_FOLLOWER upgrade", K(used_time));
398ASSERT_TRUE(used_time < 3 * 1000 * 1000L);
399ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
400
401PALF_LOG(INFO, "[CASE 3.3]: MINOR_FOLLOWER replace_member");
402usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
403leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
404palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
405palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
406ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
407ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
408palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
409
410LogConfigVersion config_version;
411ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
412ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->replace_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1),
413ObMember(get_cluster()[another_f_idx]->get_addr(), 1),
414config_version,
415CONFIG_CHANGE_TIMEOUT));
416ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
417ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
418palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
419usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
420
421PALF_LOG(INFO, "[CASE 3.4]: MINOR_FOLLOWER switch_leader");
422int64_t new_leader_idx = follower_D_idx;
423PalfHandleImplGuard new_leader;
424leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
425palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
426palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
427ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
428int64_t switch_start_ts = common::ObClockGenerator::getClock();
429ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader));
430int64_t switch_end_ts = common::ObClockGenerator::getClock();
431used_time = switch_end_ts - switch_start_ts;
432PALF_LOG(INFO, "[CASE 3.4 end ] end switch_leader", K(used_time));
433// ASSERT_EQ(true, used_time < 2 * 1000 * 1000);
434
435new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
436ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
437max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
438wait_lsn_until_flushed(max_lsn, new_leader);
439
440ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
441set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
442
443ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
444set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
445
446ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
447set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
448
449revert_cluster_palf_handle_guard(palf_list);
450leader.reset();
451new_leader.reset();
452delete_paxos_group(id);
453PALF_LOG(INFO, "end test_2f1a_degrade_upgrade", K(id));
454}
455
456
457TEST_F(TestObSimpleLogThrottleArb, test_4f1a_degrade_upgrade)
458{
459SET_CASE_LOG_FILE(TEST_NAME, "arb_4f1a_throttling");
460int ret = OB_SUCCESS;
461PALF_LOG(INFO, "begin test_4f1a_throttling");
462int64_t leader_idx = 0;
463int64_t arb_replica_idx = -1;
464PalfHandleImplGuard leader;
465const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
466const int64_t id = ATOMIC_AAF(&palf_id_, 1);
467common::ObMember dummy_member;
468std::vector<PalfHandleImplGuard*> palf_list;
469ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
470
471LogConfigVersion config_version;
472ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
473ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[3]->get_addr(), 1), 3, config_version, CONFIG_CHANGE_TIMEOUT));
474
475ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
476ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[4]->get_addr(), 1), 4, config_version, CONFIG_CHANGE_TIMEOUT));
477ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
478loc_cb.leader_ = leader.palf_handle_impl_->self_;
479
480const int64_t another_f1_idx = (leader_idx+1)%5;
481const int64_t another_f2_idx = (leader_idx+3)%5;
482const int64_t another_f3_idx = (leader_idx+4)%5;
483
484PalfEnv *palf_env = NULL;
485ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
486set_disk_options_for_throttling(palf_env->palf_env_impl_);
487ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f1_idx, palf_env));
488set_disk_options_for_throttling(palf_env->palf_env_impl_);
489ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f2_idx, palf_env));
490set_disk_options_for_throttling(palf_env->palf_env_impl_);
491ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f3_idx, palf_env));
492set_disk_options_for_throttling(palf_env->palf_env_impl_);
493
494
495PALF_LOG(INFO, "[CASE 4]: prepare for throttling");
496leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
497palf_list[another_f1_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
498palf_list[another_f2_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
499palf_list[another_f3_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
500ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
501LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
502wait_lsn_until_flushed(max_lsn, leader);
503
504int64_t throttling_percentage = 60;
505ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
506palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
507sleep(2);
508PALF_LOG(INFO, "[CASE 4.1]: 4f1A B is throttling while A is leader, degrade C & D");
509block_all_net(another_f2_idx);
510block_all_net(another_f3_idx);
511
512int64_t begin_ts = common::ObClockGenerator::getClock();
513leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
514palf_list[another_f1_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
515palf_list[another_f2_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
516palf_list[another_f3_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
517ASSERT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 256 * KB));
518usleep(10 * 1000);
519ASSERT_TRUE(is_degraded(leader, another_f2_idx));
520ASSERT_TRUE(is_degraded(leader, another_f3_idx));
521int64_t end_ts = common::ObClockGenerator::getClock();
522int64_t used_time = end_ts - begin_ts;
523PALF_LOG(INFO, " [CASE] 4f1a degrade", K(used_time));
524
525// 确保lease过期,验证loc_cb是否可以找到leader拉日志
526sleep(5);
527unblock_all_net(another_f2_idx);
528unblock_all_net(another_f3_idx);
529ASSERT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 256));
530
531PALF_LOG(INFO, " [CASE] 4f1a before upgrade", K(used_time));
532ASSERT_TRUE(is_upgraded(leader, id));
533PALF_LOG(INFO, " [CASE] end upgrade", K(used_time));
534
535revert_cluster_palf_handle_guard(palf_list);
536leader.reset();
537delete_paxos_group(id);
538PALF_LOG(INFO, "end test_4f1a_degrade_upgrade", K(id));
539}
540
541} // end unittest
542} // end oceanbase
543
544int main(int argc, char **argv)
545{
546RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
547}
548