oceanbase

Форк
0
/
test_ob_simple_log_throttling.cpp 
410 строк · 22.0 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#include <cstdio>
14
#include <gtest/gtest.h>
15
#include <signal.h>
16
#include <share/scn.h>
17
#define private public
18
#include "env/ob_simple_log_cluster_env.h"
19
#include "env/ob_simple_log_server.h"
20
#undef private
21

22
const std::string TEST_NAME = "log_throttling";
23

24
using namespace oceanbase::common;
25
using namespace oceanbase;
26
namespace oceanbase
27
{
28
using namespace logservice;
29
using namespace palf;
30
namespace unittest
31
{
32
class TestObSimpleLogClusterLogThrottling : public ObSimpleLogClusterTestEnv
33
{
34
public:
35
  TestObSimpleLogClusterLogThrottling() :  ObSimpleLogClusterTestEnv()
36
  {}
37
};
38

39
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 1;
40
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1;
41
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
42
bool ObSimpleLogClusterTestBase::need_add_arb_server_  = false;
43

44
MockLocCB loc_cb;
45

46
TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_sys)
47
{
48
  SET_CASE_LOG_FILE(TEST_NAME, "log_throttling_sys_log_stream");
49
  int ret = OB_SUCCESS;
50
  const int64_t id = 1;
51
  PALF_LOG(INFO, "begin test throttling_sy", K(id));
52
  int64_t leader_idx = 0;
53
  const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
54
  PalfHandleImplGuard leader;
55
  PalfEnv *palf_env = NULL;
56
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, &loc_cb, leader_idx, leader));
57
  EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
58
  const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_;
59
  loc_cb.leader_ = get_cluster()[leader_idx]->get_addr();
60

61
  const int64_t MB = 1024 * 1024L;
62
  int64_t total_disk_size = 400 * MB ;
63
  int64_t utilization_limit_threshold = 95;
64
  int64_t throttling_percentage = 60;
65

66
  PalfEnvImpl &palf_env_impl = palf_env->palf_env_impl_;
67
  palf_env_impl.is_inited_ = true;
68
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
69
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = total_disk_size;
70
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_threshold_ = 90;
71
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_limit_threshold_ = utilization_limit_threshold;
72
  int64_t unrecyclable_size = 0;
73
  palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
74
  PalfThrottleOptions throttle_options;
75
  palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
76
  LogIOWorker *log_io_worker = leader.palf_handle_impl_->log_engine_.log_io_worker_;
77
  LogWritingThrottle *throttle = log_io_worker->throttle_;
78
  // sys log stream no need throttling
79
  PalfThrottleOptions invalid_throttle_options;
80

81
  PALF_LOG(INFO, "prepare for throttling");
82
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
83
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
84
  LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
85
  wait_lsn_until_flushed(max_lsn, leader);
86

87
  PALF_LOG(INFO, "test sys log stream will not been throttled");
88
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
89
  int64_t cur_ts = common::ObClockGenerator::getClock();
90
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
91
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB));
92
  max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
93
  wait_lsn_until_flushed(max_lsn, leader);
94
  ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
95
  ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_());
96
  ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
97
  int64_t break_ts = common::ObClockGenerator::getClock();
98
  ASSERT_EQ(true, (break_ts - cur_ts) < 1 * 1000 * 1000);
99

100
  PALF_LOG(INFO, "end test throttling_sys_log_stream", K(id));
101
  leader.reset();
102

103
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = disk_options;
104
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_ = disk_options;
105
  delete_paxos_group(id);
106
}
107

108
TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
109
{
110
  SET_CASE_LOG_FILE(TEST_NAME, "log_throttling_basic");
111
  int ret = OB_SUCCESS;
112
  const int64_t id = ATOMIC_AAF(&palf_id_, 1);
113
  PALF_LOG(INFO, "begin test throttlin_basic", K(id));
114
  int64_t leader_idx = 0;
115
  const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
116
  PalfHandleImplGuard leader;
117
  PalfEnv *palf_env = NULL;
118
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, &loc_cb, leader_idx, leader));
119
  EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
120
  loc_cb.leader_ = get_cluster()[leader_idx]->get_addr();
121

122
  const int64_t MB = 1024 * 1024L;
123
  int64_t total_disk_size = 300 * MB ;
124
  int64_t utilization_limit_threshold = 95;
125
  int64_t throttling_percentage = 40;
126

127
  PalfEnvImpl &palf_env_impl = palf_env->palf_env_impl_;
128
  palf_env_impl.is_inited_ = true;
129
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
130
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = total_disk_size;
131
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_threshold_ = 80;
132
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_limit_threshold_ = utilization_limit_threshold;
133
  int64_t unrecyclable_size = 0;
134
  palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
135
  PalfThrottleOptions throttle_options;
136
  palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
137
  LogIOWorker *log_io_worker = leader.palf_handle_impl_->log_engine_.log_io_worker_;
138
  LogWritingThrottle *throttle = log_io_worker->throttle_;
139
  PalfThrottleOptions invalid_throttle_options;
140

141
  PALF_LOG(INFO, "[CASE 1]: test no need throttling while unrecyclable_log_disk_size is no more than trigger_size");
142
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 40, id, 1 * MB));
143
  LSN max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
144
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
145
  wait_lsn_until_flushed(max_lsn_1, leader);
146
  ASSERT_EQ(true, throttle->last_update_ts_ != OB_INVALID_TIMESTAMP);
147
  ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
148
  ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_());
149
  ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
150

151
  PALF_LOG(INFO, "[CASE 2] no need throttling while log_disk_throttling_percentage_ is off");
152
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
153
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
154
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, id, 1 * MB));
155
  max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
156
  wait_lsn_until_flushed(max_lsn_1, leader);
157
  ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
158
  ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_());
159
  ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
160

161
  PALF_LOG(INFO, "[CASE 3] need throttling");
162
  LogEntryHeader log_entry_header;
163
  LogGroupEntryHeader group_header;
164
  const int64_t log_entry_header_size = log_entry_header.get_serialize_size();
165
  const int64_t log_group_header_size = group_header.get_serialize_size();
166
  const int64_t header_size =log_entry_header_size + log_group_header_size;
167
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
168
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
169
  palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
170
  LSN before_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
171
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
172
  LSN end_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
173
  const int64_t log_size = end_lsn - before_lsn;
174
  max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
175
  wait_lsn_until_flushed(max_lsn_1, leader);
176
  PALF_LOG(INFO, "case 3: after submit_log", K(before_lsn), K(end_lsn), K(max_lsn_1));
177
  ASSERT_EQ(throttle_options, throttle->throttling_options_);
178
  ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
179
  ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
180
  ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
181
  ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
182
  ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
183
  ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
184
  ASSERT_EQ(log_size, throttle->appended_log_size_cur_round_);
185
  ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_);
186
  ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
187

188
  PALF_LOG(INFO, "[CASE 4] no need throttling because trigger_percentage is set to 100", KPC(throttle));
189
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
190
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
191
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
192
  max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
193
  wait_lsn_until_flushed(max_lsn_1, leader);
194
  ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
195
  ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
196
  ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
197
  ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
198
  ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
199
  ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
200
  ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
201
  ASSERT_EQ(0, throttle->appended_log_size_cur_round_);
202
  ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_);
203
  ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
204

205
  PALF_LOG(INFO, "[CASE 5] no need throttling because log_disk_size is set to 500M", KPC(throttle));
206
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 40;
207
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
208
  palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
209
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
210
  max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
211
  wait_lsn_until_flushed(max_lsn_1, leader);
212
  ASSERT_EQ(throttle_options, throttle->throttling_options_);
213
  ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
214
  ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
215
  PALF_LOG(INFO, "[CASE 5] no need throttling because log_disk_size is set to 500M", K(throttle));
216
  ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
217
  ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
218
  ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
219
  ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
220
  ASSERT_EQ(log_size, throttle->appended_log_size_cur_round_);
221
  ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_);
222
  ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
223
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = 500 * MB;
224
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
225
  palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
226
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
227
  max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
228
  wait_lsn_until_flushed(max_lsn_1, leader);
229
  ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
230
  ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
231
  ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
232
  ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle->stat_.stop_ts_);
233
  ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
234
  ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
235
  ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
236
  ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
237
  ASSERT_EQ(0, throttle->appended_log_size_cur_round_);
238
  max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
239
  wait_lsn_until_flushed(max_lsn_1, leader);
240

241
  int64_t prev_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
242
  PALF_LOG(INFO, "[CASE 6] flush meta task no need throttling", K(max_lsn_1));
243
  int64_t cur_ts = common::ObClockGenerator::getClock();
244
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = 300 * MB;
245
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
246
  LogEngine *log_engine = &leader.palf_handle_impl_->log_engine_;
247
  IOTaskCond io_task_cond_1(id, log_engine->palf_epoch_);
248
  EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_1));
249
  ASSERT_EQ(6, log_io_worker->purge_throttling_task_submitted_seq_);
250
  ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
251
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
252
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1 * MB));
253
  max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
254
  //wait all flush log tasks pushed into queue of LogIOWorker
255
  wait_lsn_until_submitted(max_lsn_1, leader);
256
  IOTaskCond io_task_cond_2(id, log_engine->palf_epoch_);
257
  EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_2));
258
  ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_);
259
  ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
260
  io_task_cond_1.cond_.signal();
261
  wait_lsn_until_flushed(max_lsn_1, leader);
262
  usleep(10 * 1000);
263
  ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_);
264
  ASSERT_EQ(6, log_io_worker->purge_throttling_task_handled_seq_);
265
  io_task_cond_2.cond_.signal();
266
  usleep(10 * 1000);
267
  ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_);
268
  ASSERT_EQ(7, log_io_worker->purge_throttling_task_handled_seq_);
269
  palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
270
  ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
271
  ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
272
  ASSERT_EQ(true, throttle->stat_.start_ts_ > cur_ts);
273
  ASSERT_EQ(0, throttle->stat_.total_throttling_size_);
274
  ASSERT_EQ(0, throttle->stat_.total_throttling_task_cnt_);
275
  ASSERT_EQ(10, throttle->stat_.total_skipped_task_cnt_);
276
  ASSERT_EQ(10 * (log_size), throttle->stat_.total_skipped_size_);
277
  int64_t cur_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
278
  // no io reduce during writing throttling
279
  ASSERT_EQ(cur_has_batched_size, prev_has_batched_size);
280
  const double old_decay_factor = throttle->decay_factor_;
281

282
  PALF_LOG(INFO, "[CASE 7] defactor is will change when log_disk_throttling_maximum_duration changes", K(max_lsn_1));
283
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_maximum_duration_ = 1800 * 1000 * 1000L;
284
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
285
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, id, 1024));
286
  max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
287
  wait_lsn_until_flushed(max_lsn_1, leader);
288
  PALF_LOG(INFO, "YYY  change when log_disk_throttling_maximum_duration changes", K(old_decay_factor), KPC(throttle));
289
  ASSERT_EQ(true, throttle->decay_factor_ < old_decay_factor);
290

291

292
  PALF_LOG(INFO, "[CASE 8] need break from writing throttling while unrecyclable size fallbacks", K(max_lsn_1));
293
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, id, 1 * MB));
294

295
  cur_ts = common::ObClockGenerator::getClock();
296
  leader.get_palf_handle_impl()->log_engine_.update_base_lsn_used_for_gc(LSN(70 * MB));
297
  LSN max_lsn_2 = leader.palf_handle_impl_->sw_.get_max_lsn();
298
  wait_lsn_until_flushed(max_lsn_2, leader);
299
  int64_t break_ts = common::ObClockGenerator::getClock();
300
  EXPECT_EQ(true, (break_ts-cur_ts) < 1 * 1000 * 1000);
301
  ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
302
  ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
303

304

305
  palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_throttling_percentage_ = throttling_percentage;
306
  palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_ = 400 * MB;
307
  palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_utilization_threshold_ = 70;
308
  palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_utilization_limit_threshold_ = utilization_limit_threshold;
309
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = 400 * MB;
310
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
311
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
312
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 85, id, 2 * MB));
313
  max_lsn_2 = leader.palf_handle_impl_->sw_.get_max_lsn();
314
  wait_lsn_until_flushed(max_lsn_2, leader);
315
  leader.get_palf_handle_impl()->log_engine_.update_base_lsn_used_for_gc(LSN(130 * MB));
316

317
  usleep(1000 * 1000);//wait for gc
318
  LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
319
  LSN begin_lsn;
320
  leader.get_palf_handle_impl()->get_begin_lsn(begin_lsn);
321
  ASSERT_EQ(true, begin_lsn > LSN(0));
322

323
  palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_utilization_threshold_ = 94;
324
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_threshold_ = 94;
325
  //EXPECT_EQ(OB_SUCCESS, submit_log(leader, 35, id, 2 * MB));
326
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, id, 2 * MB));
327
  max_lsn_2 = leader.palf_handle_impl_->sw_.get_max_lsn();
328
  wait_lsn_until_flushed(max_lsn_2, leader);
329

330
  max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
331
  leader.get_palf_handle_impl()->get_begin_lsn(begin_lsn);
332

333
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 40;
334
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
335

336
  cur_ts = common::ObClockGenerator::getClock();
337
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
338
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB));
339
  max_lsn_2 = leader.palf_handle_impl_->sw_.get_max_lsn();
340
  wait_lsn_until_flushed(max_lsn_2, leader);
341
  break_ts = common::ObClockGenerator::getClock();
342
  EXPECT_EQ(true, (break_ts - cur_ts) > 1 * 1000 * 1000);
343

344
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
345
  PALF_LOG(INFO, "[CASE 8] need break from writing throttling while log_disk_throttling_percentage_ is off", K(throttle));
346
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
347
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB));
348
  usleep(1 * 1000);//sleep to come into throttling
349
  cur_ts = common::ObClockGenerator::getClock();
350
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
351
  LSN max_lsn_3 = leader.palf_handle_impl_->sw_.get_max_lsn();
352
  wait_lsn_until_flushed(max_lsn_3, leader);
353
  break_ts = common::ObClockGenerator::getClock();
354
  EXPECT_EQ(true, (break_ts - cur_ts) < 1 * 1000 * 1000);
355

356
  palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 40;
357
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
358
  PALF_LOG(INFO, "[CASE 9] need break from writing throttling while flush meta task is submitted", K(throttle));
359
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
360
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB));
361
  usleep(1000 * 1000);
362
  IOTaskCond io_task_cond_3(id, log_engine->palf_epoch_);
363
  EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3));
364
  cur_ts = common::ObClockGenerator::getClock();
365
  max_lsn_3 = leader.palf_handle_impl_->sw_.get_max_lsn();
366
  wait_lsn_until_flushed(max_lsn_3, leader);
367
  break_ts = common::ObClockGenerator::getClock();
368
  EXPECT_EQ(true, (break_ts - cur_ts) < 1 * 1000 * 1000);
369
  usleep(100 * 1000);
370
  io_task_cond_3.cond_.signal();
371
  usleep(100 * 1000);
372

373
  PALF_LOG(INFO, "[CASE 10] no io reduce during writing throttling", K(throttle));
374
  int64_t batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
375
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
376
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id, 128));
377
  max_lsn_3 = leader.palf_handle_impl_->sw_.get_max_lsn();
378
  wait_lsn_until_flushed(max_lsn_3, leader);
379
  int64_t new_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
380
  EXPECT_EQ(batched_size, new_batched_size);
381

382
  PALF_LOG(INFO, "[CASE 11] need break from writing throttling while flashback task is submitted", K(throttle));
383
  int64_t mode_version = INVALID_PROPOSAL_ID;
384
  SCN max_scn_before_flashback = leader.palf_handle_impl_->get_max_scn();
385
  LSN max_lsn_before_flashback = leader.palf_handle_impl_->sw_.get_max_lsn();
386
  cur_ts = common::ObClockGenerator::getClock();
387
  const int64_t KB = 1024;
388
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1024 * KB));
389
  usleep(20 * 1000);
390
  switch_append_to_flashback(leader, mode_version);
391
  constexpr int64_t timeout_ts_us = 3 * 1000 * 1000;
392
  EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn_before_flashback, timeout_ts_us));
393
  LSN max_lsn_after_flashback = leader.palf_handle_impl_->sw_.get_max_lsn();
394
  SCN max_scn_after_flashback = leader.palf_handle_impl_->sw_.get_max_scn();
395
  ASSERT_EQ(max_lsn_after_flashback, max_lsn_before_flashback);
396
  ASSERT_EQ(max_scn_after_flashback, max_scn_before_flashback);
397

398
  PALF_LOG(INFO, "end test throttling_basic", K(id));
399
  leader.reset();
400
  delete_paxos_group(id);
401
  PALF_LOG(INFO, "destroy", K(id));
402
}
403

404
} // end unittest
405
} // end oceanbase
406

407
int main(int argc, char **argv)
408
{
409
  RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
410
}
411

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.