oceanbase
410 строк · 22.0 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#include <cstdio>
14#include <gtest/gtest.h>
15#include <signal.h>
16#include <share/scn.h>
17#define private public
18#include "env/ob_simple_log_cluster_env.h"
19#include "env/ob_simple_log_server.h"
20#undef private
21
22const std::string TEST_NAME = "log_throttling";
23
24using namespace oceanbase::common;
25using namespace oceanbase;
26namespace oceanbase
27{
28using namespace logservice;
29using namespace palf;
30namespace unittest
31{
32class TestObSimpleLogClusterLogThrottling : public ObSimpleLogClusterTestEnv
33{
34public:
35TestObSimpleLogClusterLogThrottling() : ObSimpleLogClusterTestEnv()
36{}
37};
38
39int64_t ObSimpleLogClusterTestBase::member_cnt_ = 1;
40int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1;
41std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
42bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false;
43
44MockLocCB loc_cb;
45
46TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_sys)
47{
48SET_CASE_LOG_FILE(TEST_NAME, "log_throttling_sys_log_stream");
49int ret = OB_SUCCESS;
50const int64_t id = 1;
51PALF_LOG(INFO, "begin test throttling_sy", K(id));
52int64_t leader_idx = 0;
53const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
54PalfHandleImplGuard leader;
55PalfEnv *palf_env = NULL;
56EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, &loc_cb, leader_idx, leader));
57EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
58const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_;
59loc_cb.leader_ = get_cluster()[leader_idx]->get_addr();
60
61const int64_t MB = 1024 * 1024L;
62int64_t total_disk_size = 400 * MB ;
63int64_t utilization_limit_threshold = 95;
64int64_t throttling_percentage = 60;
65
66PalfEnvImpl &palf_env_impl = palf_env->palf_env_impl_;
67palf_env_impl.is_inited_ = true;
68palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
69palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = total_disk_size;
70palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_threshold_ = 90;
71palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_limit_threshold_ = utilization_limit_threshold;
72int64_t unrecyclable_size = 0;
73palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
74PalfThrottleOptions throttle_options;
75palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
76LogIOWorker *log_io_worker = leader.palf_handle_impl_->log_engine_.log_io_worker_;
77LogWritingThrottle *throttle = log_io_worker->throttle_;
78// sys log stream no need throttling
79PalfThrottleOptions invalid_throttle_options;
80
81PALF_LOG(INFO, "prepare for throttling");
82leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
83ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
84LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
85wait_lsn_until_flushed(max_lsn, leader);
86
87PALF_LOG(INFO, "test sys log stream will not been throttled");
88usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
89int64_t cur_ts = common::ObClockGenerator::getClock();
90leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
91ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB));
92max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
93wait_lsn_until_flushed(max_lsn, leader);
94ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
95ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_());
96ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
97int64_t break_ts = common::ObClockGenerator::getClock();
98ASSERT_EQ(true, (break_ts - cur_ts) < 1 * 1000 * 1000);
99
100PALF_LOG(INFO, "end test throttling_sys_log_stream", K(id));
101leader.reset();
102
103palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = disk_options;
104palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_ = disk_options;
105delete_paxos_group(id);
106}
107
108TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
109{
110SET_CASE_LOG_FILE(TEST_NAME, "log_throttling_basic");
111int ret = OB_SUCCESS;
112const int64_t id = ATOMIC_AAF(&palf_id_, 1);
113PALF_LOG(INFO, "begin test throttlin_basic", K(id));
114int64_t leader_idx = 0;
115const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
116PalfHandleImplGuard leader;
117PalfEnv *palf_env = NULL;
118EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, &loc_cb, leader_idx, leader));
119EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
120loc_cb.leader_ = get_cluster()[leader_idx]->get_addr();
121
122const int64_t MB = 1024 * 1024L;
123int64_t total_disk_size = 300 * MB ;
124int64_t utilization_limit_threshold = 95;
125int64_t throttling_percentage = 40;
126
127PalfEnvImpl &palf_env_impl = palf_env->palf_env_impl_;
128palf_env_impl.is_inited_ = true;
129palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
130palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = total_disk_size;
131palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_threshold_ = 80;
132palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_limit_threshold_ = utilization_limit_threshold;
133int64_t unrecyclable_size = 0;
134palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
135PalfThrottleOptions throttle_options;
136palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
137LogIOWorker *log_io_worker = leader.palf_handle_impl_->log_engine_.log_io_worker_;
138LogWritingThrottle *throttle = log_io_worker->throttle_;
139PalfThrottleOptions invalid_throttle_options;
140
141PALF_LOG(INFO, "[CASE 1]: test no need throttling while unrecyclable_log_disk_size is no more than trigger_size");
142EXPECT_EQ(OB_SUCCESS, submit_log(leader, 40, id, 1 * MB));
143LSN max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
144leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
145wait_lsn_until_flushed(max_lsn_1, leader);
146ASSERT_EQ(true, throttle->last_update_ts_ != OB_INVALID_TIMESTAMP);
147ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
148ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_());
149ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
150
151PALF_LOG(INFO, "[CASE 2] no need throttling while log_disk_throttling_percentage_ is off");
152usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
153palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
154EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, id, 1 * MB));
155max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
156wait_lsn_until_flushed(max_lsn_1, leader);
157ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
158ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_());
159ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
160
161PALF_LOG(INFO, "[CASE 3] need throttling");
162LogEntryHeader log_entry_header;
163LogGroupEntryHeader group_header;
164const int64_t log_entry_header_size = log_entry_header.get_serialize_size();
165const int64_t log_group_header_size = group_header.get_serialize_size();
166const int64_t header_size =log_entry_header_size + log_group_header_size;
167palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
168usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
169palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
170LSN before_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
171EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
172LSN end_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
173const int64_t log_size = end_lsn - before_lsn;
174max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
175wait_lsn_until_flushed(max_lsn_1, leader);
176PALF_LOG(INFO, "case 3: after submit_log", K(before_lsn), K(end_lsn), K(max_lsn_1));
177ASSERT_EQ(throttle_options, throttle->throttling_options_);
178ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
179ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
180ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
181ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
182ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
183ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
184ASSERT_EQ(log_size, throttle->appended_log_size_cur_round_);
185ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_);
186ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
187
188PALF_LOG(INFO, "[CASE 4] no need throttling because trigger_percentage is set to 100", KPC(throttle));
189palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
190usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
191EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
192max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
193wait_lsn_until_flushed(max_lsn_1, leader);
194ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
195ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
196ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
197ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
198ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
199ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
200ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
201ASSERT_EQ(0, throttle->appended_log_size_cur_round_);
202ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_);
203ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
204
205PALF_LOG(INFO, "[CASE 5] no need throttling because log_disk_size is set to 500M", KPC(throttle));
206palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 40;
207usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
208palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
209EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
210max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
211wait_lsn_until_flushed(max_lsn_1, leader);
212ASSERT_EQ(throttle_options, throttle->throttling_options_);
213ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
214ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
215PALF_LOG(INFO, "[CASE 5] no need throttling because log_disk_size is set to 500M", K(throttle));
216ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
217ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
218ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
219ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
220ASSERT_EQ(log_size, throttle->appended_log_size_cur_round_);
221ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_);
222ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
223palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = 500 * MB;
224usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
225palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
226EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
227max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
228wait_lsn_until_flushed(max_lsn_1, leader);
229ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
230ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
231ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
232ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle->stat_.stop_ts_);
233ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
234ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
235ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
236ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
237ASSERT_EQ(0, throttle->appended_log_size_cur_round_);
238max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
239wait_lsn_until_flushed(max_lsn_1, leader);
240
241int64_t prev_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
242PALF_LOG(INFO, "[CASE 6] flush meta task no need throttling", K(max_lsn_1));
243int64_t cur_ts = common::ObClockGenerator::getClock();
244palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = 300 * MB;
245usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
246LogEngine *log_engine = &leader.palf_handle_impl_->log_engine_;
247IOTaskCond io_task_cond_1(id, log_engine->palf_epoch_);
248EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_1));
249ASSERT_EQ(6, log_io_worker->purge_throttling_task_submitted_seq_);
250ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
251leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
252EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1 * MB));
253max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
254//wait all flush log tasks pushed into queue of LogIOWorker
255wait_lsn_until_submitted(max_lsn_1, leader);
256IOTaskCond io_task_cond_2(id, log_engine->palf_epoch_);
257EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_2));
258ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_);
259ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
260io_task_cond_1.cond_.signal();
261wait_lsn_until_flushed(max_lsn_1, leader);
262usleep(10 * 1000);
263ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_);
264ASSERT_EQ(6, log_io_worker->purge_throttling_task_handled_seq_);
265io_task_cond_2.cond_.signal();
266usleep(10 * 1000);
267ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_);
268ASSERT_EQ(7, log_io_worker->purge_throttling_task_handled_seq_);
269palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
270ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
271ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
272ASSERT_EQ(true, throttle->stat_.start_ts_ > cur_ts);
273ASSERT_EQ(0, throttle->stat_.total_throttling_size_);
274ASSERT_EQ(0, throttle->stat_.total_throttling_task_cnt_);
275ASSERT_EQ(10, throttle->stat_.total_skipped_task_cnt_);
276ASSERT_EQ(10 * (log_size), throttle->stat_.total_skipped_size_);
277int64_t cur_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
278// no io reduce during writing throttling
279ASSERT_EQ(cur_has_batched_size, prev_has_batched_size);
280const double old_decay_factor = throttle->decay_factor_;
281
282PALF_LOG(INFO, "[CASE 7] defactor is will change when log_disk_throttling_maximum_duration changes", K(max_lsn_1));
283palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_maximum_duration_ = 1800 * 1000 * 1000L;
284usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
285EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, id, 1024));
286max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
287wait_lsn_until_flushed(max_lsn_1, leader);
288PALF_LOG(INFO, "YYY change when log_disk_throttling_maximum_duration changes", K(old_decay_factor), KPC(throttle));
289ASSERT_EQ(true, throttle->decay_factor_ < old_decay_factor);
290
291
292PALF_LOG(INFO, "[CASE 8] need break from writing throttling while unrecyclable size fallbacks", K(max_lsn_1));
293EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, id, 1 * MB));
294
295cur_ts = common::ObClockGenerator::getClock();
296leader.get_palf_handle_impl()->log_engine_.update_base_lsn_used_for_gc(LSN(70 * MB));
297LSN max_lsn_2 = leader.palf_handle_impl_->sw_.get_max_lsn();
298wait_lsn_until_flushed(max_lsn_2, leader);
299int64_t break_ts = common::ObClockGenerator::getClock();
300EXPECT_EQ(true, (break_ts-cur_ts) < 1 * 1000 * 1000);
301ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
302ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
303
304
305palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_throttling_percentage_ = throttling_percentage;
306palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_ = 400 * MB;
307palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_utilization_threshold_ = 70;
308palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_utilization_limit_threshold_ = utilization_limit_threshold;
309palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = 400 * MB;
310usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
311palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
312EXPECT_EQ(OB_SUCCESS, submit_log(leader, 85, id, 2 * MB));
313max_lsn_2 = leader.palf_handle_impl_->sw_.get_max_lsn();
314wait_lsn_until_flushed(max_lsn_2, leader);
315leader.get_palf_handle_impl()->log_engine_.update_base_lsn_used_for_gc(LSN(130 * MB));
316
317usleep(1000 * 1000);//wait for gc
318LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
319LSN begin_lsn;
320leader.get_palf_handle_impl()->get_begin_lsn(begin_lsn);
321ASSERT_EQ(true, begin_lsn > LSN(0));
322
323palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_utilization_threshold_ = 94;
324palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_threshold_ = 94;
325//EXPECT_EQ(OB_SUCCESS, submit_log(leader, 35, id, 2 * MB));
326EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, id, 2 * MB));
327max_lsn_2 = leader.palf_handle_impl_->sw_.get_max_lsn();
328wait_lsn_until_flushed(max_lsn_2, leader);
329
330max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
331leader.get_palf_handle_impl()->get_begin_lsn(begin_lsn);
332
333palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 40;
334usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
335
336cur_ts = common::ObClockGenerator::getClock();
337leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
338ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB));
339max_lsn_2 = leader.palf_handle_impl_->sw_.get_max_lsn();
340wait_lsn_until_flushed(max_lsn_2, leader);
341break_ts = common::ObClockGenerator::getClock();
342EXPECT_EQ(true, (break_ts - cur_ts) > 1 * 1000 * 1000);
343
344usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
345PALF_LOG(INFO, "[CASE 8] need break from writing throttling while log_disk_throttling_percentage_ is off", K(throttle));
346leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
347EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB));
348usleep(1 * 1000);//sleep to come into throttling
349cur_ts = common::ObClockGenerator::getClock();
350palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
351LSN max_lsn_3 = leader.palf_handle_impl_->sw_.get_max_lsn();
352wait_lsn_until_flushed(max_lsn_3, leader);
353break_ts = common::ObClockGenerator::getClock();
354EXPECT_EQ(true, (break_ts - cur_ts) < 1 * 1000 * 1000);
355
356palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 40;
357usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
358PALF_LOG(INFO, "[CASE 9] need break from writing throttling while flush meta task is submitted", K(throttle));
359leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
360EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB));
361usleep(1000 * 1000);
362IOTaskCond io_task_cond_3(id, log_engine->palf_epoch_);
363EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3));
364cur_ts = common::ObClockGenerator::getClock();
365max_lsn_3 = leader.palf_handle_impl_->sw_.get_max_lsn();
366wait_lsn_until_flushed(max_lsn_3, leader);
367break_ts = common::ObClockGenerator::getClock();
368EXPECT_EQ(true, (break_ts - cur_ts) < 1 * 1000 * 1000);
369usleep(100 * 1000);
370io_task_cond_3.cond_.signal();
371usleep(100 * 1000);
372
373PALF_LOG(INFO, "[CASE 10] no io reduce during writing throttling", K(throttle));
374int64_t batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
375leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
376EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id, 128));
377max_lsn_3 = leader.palf_handle_impl_->sw_.get_max_lsn();
378wait_lsn_until_flushed(max_lsn_3, leader);
379int64_t new_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
380EXPECT_EQ(batched_size, new_batched_size);
381
382PALF_LOG(INFO, "[CASE 11] need break from writing throttling while flashback task is submitted", K(throttle));
383int64_t mode_version = INVALID_PROPOSAL_ID;
384SCN max_scn_before_flashback = leader.palf_handle_impl_->get_max_scn();
385LSN max_lsn_before_flashback = leader.palf_handle_impl_->sw_.get_max_lsn();
386cur_ts = common::ObClockGenerator::getClock();
387const int64_t KB = 1024;
388EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1024 * KB));
389usleep(20 * 1000);
390switch_append_to_flashback(leader, mode_version);
391constexpr int64_t timeout_ts_us = 3 * 1000 * 1000;
392EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn_before_flashback, timeout_ts_us));
393LSN max_lsn_after_flashback = leader.palf_handle_impl_->sw_.get_max_lsn();
394SCN max_scn_after_flashback = leader.palf_handle_impl_->sw_.get_max_scn();
395ASSERT_EQ(max_lsn_after_flashback, max_lsn_before_flashback);
396ASSERT_EQ(max_scn_after_flashback, max_scn_before_flashback);
397
398PALF_LOG(INFO, "end test throttling_basic", K(id));
399leader.reset();
400delete_paxos_group(id);
401PALF_LOG(INFO, "destroy", K(id));
402}
403
404} // end unittest
405} // end oceanbase
406
407int main(int argc, char **argv)
408{
409RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
410}
411