oceanbase

Форк
0
/
test_ob_simple_log_throttling_arb.cpp 
547 строк · 25.9 Кб
1
// Copyright (c) 2021 OceanBase
2
// OceanBase is licensed under Mulan PubL v2.
3
// You can use this software according to the terms and conditions of the Mulan PubL v2.
4
// You may obtain a copy of Mulan PubL v2 at:
5
//          http://license.coscl.org.cn/MulanPubL-2.0
6
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
7
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
8
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
9
// See the Mulan PubL v2 for more details.
10
#include <cstdio>
11
#include <gtest/gtest.h>
12
#include <signal.h>
13
#define private public
14
#include "env/ob_simple_log_cluster_env.h"
15
#undef private
16

17
const std::string TEST_NAME = "throttling_arb";
18

19
using namespace oceanbase::common;
20
using namespace oceanbase;
21
namespace oceanbase
22
{
23
using namespace logservice;
24

25
namespace logservice
26
{
27

28
void ObArbitrationService::update_arb_timeout_()
29
{
30
  arb_timeout_us_ = 2 * 1000 * 1000L;
31
  if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
32
    CLOG_LOG_RET(WARN, OB_ERR_UNEXPECTED, "update_arb_timeout_", K_(self), K_(arb_timeout_us));
33
  }
34
}
35
}
36

37
namespace unittest
38
{
39

40
class TestObSimpleLogThrottleArb : public ObSimpleLogClusterTestEnv
41
{
42
public:
43
  TestObSimpleLogThrottleArb() :  ObSimpleLogClusterTestEnv()
44
  {}
45

46
  void set_palf_disk_options(PalfEnvImpl &palf_env_impl, const PalfDiskOptions &disk_options)
47
  {
48
    palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = disk_options;
49
    palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_ = disk_options;
50
  }
51
};
52

53
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;
54
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 5;
55
bool ObSimpleLogClusterTestBase::need_add_arb_server_ = true;
56
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
57

58
MockLocCB loc_cb;
59

60
const int64_t KB = 1024L;
61
const int64_t MB = 1024 * 1024L;
62
TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_major)
63
{
64
  oceanbase::common::ObClusterVersion::get_instance().cluster_version_ = CLUSTER_VERSION_4_1_0_0;
65
  SET_CASE_LOG_FILE(TEST_NAME, "arb_throttling_major");
66
  int ret = OB_SUCCESS;
67
  PALF_LOG(INFO, "begin test_2f1a_throttling_major");
68
	int64_t leader_idx = 0;
69
  int64_t arb_replica_idx = -1;
70
  PalfHandleImplGuard leader;
71
  std::vector<PalfHandleImplGuard*> palf_list;
72
  const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
73
	const int64_t id = ATOMIC_AAF(&palf_id_, 1);
74
  common::ObMember dummy_member;
75
	ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
76
  ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
77
  loc_cb.leader_ = leader.palf_handle_impl_->self_;
78

79
  PalfEnv *palf_env = NULL;
80
  ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
81
  const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_;
82

83
  const int64_t another_f_idx = (leader_idx+1)%3;
84
  const int64_t follower_D_idx = (leader_idx + 3);
85

86
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
87
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
88
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
89
  ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
90
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
91

92
  PALF_LOG(INFO, "[CASE 1]: prepare for throttling");
93
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
94
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
95
  LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
96
  wait_lsn_until_flushed(max_lsn, leader);
97

98
  int64_t throttling_percentage = 60;
99
  ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
100
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
101

102
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
103
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
104

105
  ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
106
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
107

108
  PALF_LOG(INFO, "[CASE 1.1]: MAJOR degrade");
109
  block_net(leader_idx, another_f_idx);
110
  sleep(1);
111

112
  // do not check OB_SUCCESS, may return OB_NOT_MASTER during degrading member
113
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
114
  palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
115
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
116
  int64_t begin_ts = common::ObClockGenerator::getClock();
117
  EXPECT_TRUE(is_degraded(leader, another_f_idx));
118
  int64_t end_ts = common::ObClockGenerator::getClock();
119
  int64_t used_time = end_ts - begin_ts;
120
  ASSERT_TRUE(used_time < 2 * 1000 * 1000L);
121
  PALF_LOG(INFO, " MAJOR degrade", K(used_time));
122
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
123

124
  //submit some log
125
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
126
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
127
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 1 * MB));
128

129
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
130
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
131
  PALF_LOG(INFO, "[CASE 1.2] MAJOR upgrade");
132
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
133
  palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
134
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB));
135
  unblock_net(leader_idx, another_f_idx);
136
  begin_ts = common::ObClockGenerator::getClock();
137
  PALF_LOG(INFO, "[CASE 1.2] begin MAJOR upgrade", K(used_time));
138
  ASSERT_TRUE(is_upgraded(leader, id));
139
  end_ts = common::ObClockGenerator::getClock();
140
  used_time = end_ts - begin_ts;
141
  PALF_LOG(INFO, "[CASE 1.2] end MAJOR upgrade", K(used_time));
142
  ASSERT_TRUE(used_time < 5 * 1000 * 1000L);
143
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
144

145
  PALF_LOG(INFO, "[CASE 1.3]: MAJOR replace_member(OB_TIMEOUT)");
146
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
147
  palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
148
  palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
149
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 128 * KB));
150

151
  LogConfigVersion config_version;
152
  ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
153
  ret = leader.palf_handle_impl_->replace_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1),
154
                                                 ObMember(get_cluster()[another_f_idx]->get_addr(), 1),
155
                                                 config_version,
156
                                                 CONFIG_CHANGE_TIMEOUT);
157
  //timeout because added member can flush new meta when prev log is throttling
158
  ASSERT_TRUE(OB_TIMEOUT == ret || OB_SUCCESS == ret);
159
  int64_t new_leader_idx = OB_TIMEOUT == ret ? another_f_idx : follower_D_idx;
160
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
161

162
  PALF_LOG(INFO, "[CASE 1.4]: MAJOR switch_leader");
163
  PalfHandleImplGuard new_leader;
164
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
165
  palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
166
  palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
167
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
168
  int64_t switch_start_ts = common::ObClockGenerator::getClock();
169
  ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader));
170
  int64_t switch_end_ts = common::ObClockGenerator::getClock();
171
  used_time  = switch_end_ts - switch_start_ts;
172
  PALF_LOG(INFO, "[CASE 1.4 end ] end switch_leader", K(used_time));
173
  // ASSERT_EQ(true, used_time < 2 * 1000 * 1000);
174

175
  new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
176
  ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
177
  max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
178
  wait_lsn_until_flushed(max_lsn, new_leader);
179

180
  ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
181
  set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
182

183
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
184
  set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
185

186
  ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
187
  set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
188

189
  revert_cluster_palf_handle_guard(palf_list);
190
  leader.reset();
191
  new_leader.reset();
192
  delete_paxos_group(id);
193
  PALF_LOG(INFO, "end test_2f1a_degrade_upgrade", K(id));
194
}
195

196
TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_leader)
197
{
198
  oceanbase::common::ObClusterVersion::get_instance().cluster_version_ = CLUSTER_VERSION_4_1_0_0;
199
  SET_CASE_LOG_FILE(TEST_NAME, "arb_throttling_minor_leader");
200
  int ret = OB_SUCCESS;
201
  PALF_LOG(INFO, "begin arb_throttling_minor_leader");
202
	int64_t leader_idx = 0;
203
  int64_t arb_replica_idx = -1;
204
  PalfHandleImplGuard leader;
205
  std::vector<PalfHandleImplGuard*> palf_list;
206
  const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
207
	const int64_t id = ATOMIC_AAF(&palf_id_, 1);
208
  common::ObMember dummy_member;
209
	ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
210
  ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
211
  loc_cb.leader_ = leader.palf_handle_impl_->self_;
212

213
  PalfEnv *palf_env = NULL;
214
  ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
215
  const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_;
216

217
  const int64_t another_f_idx = (leader_idx+1)%3;
218
  const int64_t follower_D_idx = (leader_idx + 3);
219

220
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
221
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
222
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
223
  ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
224
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
225

226
  PALF_LOG(INFO, "[CASE 2]: prepare for throttling");
227
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
228
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
229
  LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
230
  wait_lsn_until_flushed(max_lsn, leader);
231

232
  int64_t throttling_percentage = 60;
233
  ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
234
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
235

236
  PALF_LOG(INFO, "[CASE 2.1]: MONOR_LEADER degrade");
237
  block_net(leader_idx, another_f_idx);
238
  sleep(1);
239

240
  // do not check OB_SUCCESS, may return OB_NOT_MASTER during degrading member
241
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
242
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
243
  int64_t begin_ts = common::ObClockGenerator::getClock();
244
  EXPECT_TRUE(is_degraded(leader, another_f_idx));
245
  int64_t end_ts = common::ObClockGenerator::getClock();
246
  int64_t used_time = end_ts - begin_ts;
247
  ASSERT_TRUE(used_time < 2 * 1000 * 1000L);
248
  PALF_LOG(INFO, " MINOR_LEADER degrade", K(used_time));
249
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
250

251
  //submit some log
252
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
253
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
254
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 1 * MB));
255

256
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
257
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
258
  PALF_LOG(INFO, "[CASE 2.2] MINOR_LEADER upgrade");
259
  //upgrade may need many time because of target replica has not fetch log
260
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
261
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
262
  unblock_net(leader_idx, another_f_idx);
263
  begin_ts = common::ObClockGenerator::getClock();
264
  PALF_LOG(INFO, "[CASE 2.2] begin MINOR_LEADER upgrade", K(used_time));
265
  ASSERT_TRUE(is_upgraded(leader, id));
266
  end_ts = common::ObClockGenerator::getClock();
267
  used_time = end_ts - begin_ts;
268
  PALF_LOG(INFO, "[CASE 2.2] end MINOR_LEADER upgrade", K(used_time));
269
  ASSERT_TRUE(used_time < 5 * 1000 * 1000L);
270
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
271

272
//  PALF_LOG(INFO, "[CASE 2.3]: MINOR_LEADER replace_member");
273
//  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
274
//
275
//  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
276
//
277
//  LogConfigVersion config_version;
278
//
279
//  const int64_t CONFIG_CHANGE_TIMEOUT_NEW = 20 * 1000 * 1000L; // 10s
280
//  ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
281
//  ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->replace_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1),
282
//                                                                 ObMember(get_cluster()[another_f_idx]->get_addr(), 1),
283
//                                                                 config_version,
284
//                                                                 CONFIG_CHANGE_TIMEOUT_NEW));
285
//  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
286

287
  PALF_LOG(INFO, "[CASE 2.4]: MINOR_LEADER switch_leader");
288
  int64_t new_leader_idx = another_f_idx;
289
  PalfHandleImplGuard new_leader;
290
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
291
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
292
  int64_t switch_start_ts = common::ObClockGenerator::getClock();
293
  ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader));
294
  int64_t switch_end_ts = common::ObClockGenerator::getClock();
295
  used_time  = switch_end_ts - switch_start_ts;
296
  PALF_LOG(INFO, "[CASE 2.4 end ] end switch_leader", K(used_time));
297
  // ASSERT_EQ(true, used_time < 2 * 1000 * 1000);
298

299
  new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
300
  ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
301
  max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
302
  wait_lsn_until_flushed(max_lsn, new_leader);
303

304
  ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
305
  set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
306

307
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
308
  set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
309

310
  ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
311
  set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
312

313
  revert_cluster_palf_handle_guard(palf_list);
314
  leader.reset();
315
  new_leader.reset();
316
  delete_paxos_group(id);
317
  PALF_LOG(INFO, "end test_2f1a_degrade_upgrade", K(id));
318
}
319

320
TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_follower)
321
{
322
  oceanbase::common::ObClusterVersion::get_instance().cluster_version_ = CLUSTER_VERSION_4_1_0_0;
323
  SET_CASE_LOG_FILE(TEST_NAME, "arb_throttling_minor_follower");
324
  int ret = OB_SUCCESS;
325
  PALF_LOG(INFO, "begin arb_throttling_minor_follower");
326
	int64_t leader_idx = 0;
327
  int64_t arb_replica_idx = -1;
328
  PalfHandleImplGuard leader;
329
  std::vector<PalfHandleImplGuard*> palf_list;
330
  const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
331
	const int64_t id = ATOMIC_AAF(&palf_id_, 1);
332
  common::ObMember dummy_member;
333
	ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
334
  ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
335
  loc_cb.leader_ = leader.palf_handle_impl_->self_;
336

337
  PalfEnv *palf_env = NULL;
338
  ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
339
  const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_;
340

341
  const int64_t another_f_idx = (leader_idx+1)%3;
342
  const int64_t follower_D_idx = (leader_idx + 3);
343

344
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
345
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
346
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
347
  ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
348
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
349

350
  PALF_LOG(INFO, "[CASE 3]: prepare for throttling");
351
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
352
  palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
353
  palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
354
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
355
  LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
356
  wait_lsn_until_flushed(max_lsn, leader);
357

358
  int64_t throttling_percentage = 60;
359
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
360
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
361

362
  PALF_LOG(INFO, "[CASE 3.1]: MONOR_LEADER degrade");
363
  block_net(leader_idx, another_f_idx);
364
  sleep(1);
365

366
  // do not check OB_SUCCESS, may return OB_NOT_MASTER during degrading member
367
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
368
  palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
369
  palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
370
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
371
  int64_t begin_ts = common::ObClockGenerator::getClock();
372
  EXPECT_TRUE(is_degraded(leader, another_f_idx));
373
  int64_t end_ts = common::ObClockGenerator::getClock();
374
  int64_t used_time = end_ts - begin_ts;
375
  ASSERT_TRUE(used_time < 2 * 1000 * 1000L);
376
  PALF_LOG(INFO, " MINOR_FOLLOWER degrade", K(used_time));
377
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
378

379
  //submit some log
380
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
381
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
382
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 1 * MB));
383

384
//palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
385
 // usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
386
  PALF_LOG(INFO, "[CASE 3.2] MINOR_FOLLOWER upgrade");
387
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
388
  palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
389
  palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
390
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
391
  unblock_net(leader_idx, another_f_idx);
392
  begin_ts = common::ObClockGenerator::getClock();
393
  PALF_LOG(INFO, "[CASE 3.2] begin MINOR_FOLLOWER upgrade", K(used_time));
394
  ASSERT_TRUE(is_upgraded(leader, id));
395
  end_ts = common::ObClockGenerator::getClock();
396
  used_time = end_ts - begin_ts;
397
  PALF_LOG(INFO, "[CASE 3.2] end MINOR_FOLLOWER upgrade", K(used_time));
398
  ASSERT_TRUE(used_time < 3 * 1000 * 1000L);
399
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
400

401
  PALF_LOG(INFO, "[CASE 3.3]: MINOR_FOLLOWER replace_member");
402
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
403
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
404
  palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
405
  palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
406
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
407
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
408
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
409

410
  LogConfigVersion config_version;
411
  ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
412
  ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->replace_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1),
413
                                                                 ObMember(get_cluster()[another_f_idx]->get_addr(), 1),
414
                                                                 config_version,
415
                                                                 CONFIG_CHANGE_TIMEOUT));
416
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
417
  ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
418
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
419
  usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
420

421
  PALF_LOG(INFO, "[CASE 3.4]: MINOR_FOLLOWER switch_leader");
422
  int64_t new_leader_idx = follower_D_idx;
423
  PalfHandleImplGuard new_leader;
424
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
425
  palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
426
  palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
427
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
428
  int64_t switch_start_ts = common::ObClockGenerator::getClock();
429
  ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader));
430
  int64_t switch_end_ts = common::ObClockGenerator::getClock();
431
  used_time  = switch_end_ts - switch_start_ts;
432
  PALF_LOG(INFO, "[CASE 3.4 end ] end switch_leader", K(used_time));
433
  // ASSERT_EQ(true, used_time < 2 * 1000 * 1000);
434

435
  new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
436
  ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
437
  max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
438
  wait_lsn_until_flushed(max_lsn, new_leader);
439

440
  ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
441
  set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
442

443
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
444
  set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
445

446
  ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
447
  set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
448

449
  revert_cluster_palf_handle_guard(palf_list);
450
  leader.reset();
451
  new_leader.reset();
452
  delete_paxos_group(id);
453
  PALF_LOG(INFO, "end test_2f1a_degrade_upgrade", K(id));
454
}
455

456

457
TEST_F(TestObSimpleLogThrottleArb, test_4f1a_degrade_upgrade)
458
{
459
  SET_CASE_LOG_FILE(TEST_NAME, "arb_4f1a_throttling");
460
  int ret = OB_SUCCESS;
461
  PALF_LOG(INFO, "begin test_4f1a_throttling");
462
	int64_t leader_idx = 0;
463
  int64_t arb_replica_idx = -1;
464
  PalfHandleImplGuard leader;
465
  const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
466
	const int64_t id = ATOMIC_AAF(&palf_id_, 1);
467
  common::ObMember dummy_member;
468
  std::vector<PalfHandleImplGuard*> palf_list;
469
	ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
470

471
  LogConfigVersion config_version;
472
  ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
473
  ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[3]->get_addr(), 1), 3, config_version, CONFIG_CHANGE_TIMEOUT));
474

475
  ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
476
  ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[4]->get_addr(), 1), 4, config_version, CONFIG_CHANGE_TIMEOUT));
477
  ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
478
  loc_cb.leader_ = leader.palf_handle_impl_->self_;
479

480
  const int64_t another_f1_idx = (leader_idx+1)%5;
481
  const int64_t another_f2_idx = (leader_idx+3)%5;
482
  const int64_t another_f3_idx = (leader_idx+4)%5;
483

484
  PalfEnv *palf_env = NULL;
485
  ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
486
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
487
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f1_idx, palf_env));
488
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
489
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f2_idx, palf_env));
490
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
491
  ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f3_idx, palf_env));
492
  set_disk_options_for_throttling(palf_env->palf_env_impl_);
493

494

495
  PALF_LOG(INFO, "[CASE 4]: prepare for throttling");
496
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
497
  palf_list[another_f1_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
498
  palf_list[another_f2_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
499
  palf_list[another_f3_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
500
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
501
  LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
502
  wait_lsn_until_flushed(max_lsn, leader);
503

504
  int64_t throttling_percentage = 60;
505
  ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
506
  palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
507
  sleep(2);
508
  PALF_LOG(INFO, "[CASE 4.1]: 4f1A   B is throttling while A is leader, degrade C & D");
509
  block_all_net(another_f2_idx);
510
  block_all_net(another_f3_idx);
511

512
  int64_t begin_ts = common::ObClockGenerator::getClock();
513
  leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
514
  palf_list[another_f1_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
515
  palf_list[another_f2_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
516
  palf_list[another_f3_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
517
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 256 * KB));
518
  usleep(10 * 1000);
519
  ASSERT_TRUE(is_degraded(leader, another_f2_idx));
520
  ASSERT_TRUE(is_degraded(leader, another_f3_idx));
521
  int64_t end_ts = common::ObClockGenerator::getClock();
522
  int64_t used_time = end_ts - begin_ts;
523
  PALF_LOG(INFO, " [CASE] 4f1a degrade", K(used_time));
524

525
  // 确保lease过期,验证loc_cb是否可以找到leader拉日志
526
  sleep(5);
527
  unblock_all_net(another_f2_idx);
528
  unblock_all_net(another_f3_idx);
529
  ASSERT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 256));
530

531
  PALF_LOG(INFO, " [CASE] 4f1a before upgrade", K(used_time));
532
  ASSERT_TRUE(is_upgraded(leader, id));
533
  PALF_LOG(INFO, " [CASE] end upgrade", K(used_time));
534

535
  revert_cluster_palf_handle_guard(palf_list);
536
  leader.reset();
537
  delete_paxos_group(id);
538
  PALF_LOG(INFO, "end test_4f1a_degrade_upgrade", K(id));
539
}
540

541
} // end unittest
542
} // end oceanbase
543

544
int main(int argc, char **argv)
545
{
546
  RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
547
}
548

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.