oceanbase

Форк
0
/
test_ob_simple_log_basic_func.cpp 
526 строк · 21.5 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#include <gtest/gtest.h>
14
#include "io/easy_connection.h"
15
#include "lib/file/file_directory_utils.h"
16
#include "lib/ob_errno.h"
17
#include "lib/oblog/ob_log.h"
18
#include "lib/utility/utility.h"
19
#include <cstdio>
20
#include <signal.h>
21
#include "lib/utility/ob_defer.h"
22
#include "share/ob_errno.h"
23
#define private public
24
#include "logservice/palf/log_define.h"
25
#include "logservice/palf/lsn_allocator.h"
26
#include "share/scn.h"
27
#include "logservice/palf/log_rpc_processor.h"
28
#include "env/ob_simple_log_cluster_env.h"
29

30
#undef private
31

32
const std::string TEST_NAME = "basic_func";
33
using namespace oceanbase::common;
34
using namespace oceanbase;
35
namespace oceanbase
36
{
37
using namespace logservice;
38
using namespace palf;
39
namespace unittest
40
{
41
class TestObSimpleLogClusterBasicFunc : public ObSimpleLogClusterTestEnv
42
{
43
public:
44
  TestObSimpleLogClusterBasicFunc() : ObSimpleLogClusterTestEnv()
45
  {}
46
};
47

48
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;
49
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 3;
50
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
51
bool ObSimpleLogClusterTestBase::need_add_arb_server_  = false;
52

53
TEST_F(TestObSimpleLogClusterBasicFunc, submit_log)
54
{
55
  SET_CASE_LOG_FILE(TEST_NAME, "submit_log");
56
  OB_LOGGER.set_log_level("TRACE");
57
  const int64_t id = ATOMIC_AAF(&palf_id_, 1);
58
  const int64_t create_ts = 100;
59
  share::SCN create_scn;
60
  create_scn.convert_for_logservice(create_ts);
61
  int64_t leader_idx = 0;
62
  PalfHandleImplGuard leader;
63
  ObTimeGuard guard("submit_log", 0);
64
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, create_scn, leader_idx, leader));
65
  guard.click("create");
66

67
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1024, id));
68
  guard.click("submit_log");
69
  EXPECT_EQ(OB_ITER_END, read_log(leader));
70
  guard.click("read_log");
71

72
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1024, id));
73
  guard.click("submit_log2");
74
  EXPECT_EQ(OB_ITER_END, read_log(leader));
75
  guard.click("readlog2");
76
  guard.click("delete");
77
  PALF_LOG(INFO, "end test submit_log", K(id), K(guard));
78
}
79

80
// test_max_padding_size: 测试padding entry最长的场景(2M+16K+88+4K-1B).
81
TEST_F(TestObSimpleLogClusterBasicFunc, test_max_padding_size)
82
{
83
  SET_CASE_LOG_FILE(TEST_NAME, "submit_log");
84
  //OB_LOGGER.set_log_level("TRACE");
85
  const int64_t id = ATOMIC_AAF(&palf_id_, 1);
86
  const int64_t create_ts = 100;
87
  share::SCN create_scn;
88
  create_scn.convert_for_logservice(create_ts);
89
  int64_t leader_idx = 0;
90
  PalfHandleImplGuard leader;
91
  ObTimeGuard guard("submit_log", 0);
92
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, create_scn, leader_idx, leader));
93
  guard.click("create");
94
  int64_t follower_1_idx = (leader_idx + 1) % ObSimpleLogClusterTestBase::member_cnt_;
95
  int64_t follower_2_idx = (leader_idx + 2) % ObSimpleLogClusterTestBase::member_cnt_;
96
  block_net(leader_idx, follower_1_idx);
97

98
  const int64_t group_entry_header_total_size = LogGroupEntryHeader::HEADER_SER_SIZE + LogEntryHeader::HEADER_SER_SIZE;
99
  const int64_t max_valid_group_entry_size = MAX_LOG_BODY_SIZE + group_entry_header_total_size;
100
  // padding entry size上限如下,预期不会达到该值,故最大值应该是该值减1Byte
101
  const int64_t max_padding_entry_size = max_valid_group_entry_size + CLOG_FILE_TAIL_PADDING_TRIGGER;
102
  // 测试写一个最大padding entry的场景
103
  // 首先写30条2MB的group log
104
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 30, leader_idx, (2 * 1024 * 1024 - group_entry_header_total_size)));
105
  // 接着写文件尾最后一条有效的group entry, 确保文件剩余空间触发生成最大的padding entry
106
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, (4 * 1024 * 1024 - MAX_INFO_BLOCK_SIZE - max_valid_group_entry_size - CLOG_FILE_TAIL_PADDING_TRIGGER - group_entry_header_total_size + 1)));
107
  // 提交一个2MB+16KB size的log buf, 触发生成padding
108
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
109
  // 预期max_lsn为单个block文件size+最大valid group entry size
110
  const LSN expected_max_lsn(PALF_BLOCK_SIZE + max_valid_group_entry_size);
111
  guard.click("submit_log for max size padding entry finish");
112
  CLOG_LOG(INFO, "leader submit log finished", K(expected_max_lsn));
113
  while (leader.palf_handle_impl_->get_end_lsn() < expected_max_lsn) {
114
    usleep(100 * 1000);
115
  }
116
  EXPECT_EQ(leader.palf_handle_impl_->get_end_lsn(), expected_max_lsn);
117
  guard.click("wait leader end_lsn finish");
118
  CLOG_LOG(INFO, "leader wait end_lsn finished", K(expected_max_lsn));
119

120
  unblock_net(leader_idx, follower_1_idx);
121
  std::vector<PalfHandleImplGuard*> palf_list;
122
  EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
123
  PalfHandleImplGuard &follower_2 = *palf_list[follower_2_idx];
124
  while (follower_2.palf_handle_impl_->get_end_lsn() < expected_max_lsn) {
125
    usleep(100 * 1000);
126
  }
127
  EXPECT_EQ(follower_2.palf_handle_impl_->get_end_lsn(), expected_max_lsn);
128
  guard.click("wait follower_2 end_lsn finish");
129
  CLOG_LOG(INFO, "follower_2 wait end_lsn finished", K(expected_max_lsn));
130

131
  PalfHandleImplGuard &follower_1 = *palf_list[follower_1_idx];
132
  // follower_1依赖fetch log追日志,但end_lsn无法推到与leader一致
133
  // 因为这里committed_end_lsn依赖周期性的keepAlive日志推进
134
  while (follower_1.palf_handle_impl_->get_max_lsn() < expected_max_lsn) {
135
    usleep(100 * 1000);
136
  }
137
  guard.click("wait follower_1 max_lsn finish");
138
  CLOG_LOG(INFO, "follower_1 wait max_lsn finished", K(expected_max_lsn));
139

140
  EXPECT_EQ(OB_SUCCESS, revert_cluster_palf_handle_guard(palf_list));
141
  guard.click("delete");
142
  PALF_LOG(INFO, "end test submit_log", K(id), K(guard));
143
}
144

145
bool check_locate_correct(const std::vector<LSN> &lsn_array,
146
                          const std::vector<SCN> &scn_array,
147
                          const share::SCN input_scn, const LSN result_lsn,
148
                          const bool later_less)
149
{
150
  bool find_match = false;
151
  for (int i = 0; i < lsn_array.size(); i++) {
152
    if (lsn_array[i].val_ - LogGroupEntryHeader::HEADER_SER_SIZE == result_lsn) {
153
      if (later_less) {
154
        EXPECT_LE(scn_array[i], input_scn);
155
      } else {
156
        EXPECT_GE(scn_array[i], input_scn);
157
      }
158
      find_match = true;
159
      break;
160
    }
161
    PALF_LOG(INFO, "check_locate_correct array", K(i), K(lsn_array[i]),
162
             K(scn_array[i]));
163
  }
164
  PALF_LOG(INFO, "check_locate_correct", K(find_match), K(result_lsn), K(input_scn));
165
  return find_match;
166
}
167

168
TEST_F(TestObSimpleLogClusterBasicFunc, test_locate_by_scn_coarsely)
169
{
170
  SET_CASE_LOG_FILE(TEST_NAME, "locate_by_scn");
171
  const int64_t id = ATOMIC_AAF(&palf_id_, 1);
172
  PALF_LOG(INFO, "test test_locate_by_scn_coarsely", K(id));
173
  int64_t leader_idx = 0;
174
  int ret = OB_SUCCESS;
175
  std::vector<LSN> lsn_array;
176
  std::vector<SCN> scn_array;
177
  PalfHandleImplGuard leader;
178
  LSN result_lsn;
179
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
180
  // no log
181
  share::SCN invalid_scn;
182
  EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->locate_by_scn_coarsely(invalid_scn, result_lsn));
183
  share::SCN scn_cur;
184
  scn_cur.convert_for_logservice(ObTimeUtility::current_time_ns());
185
  EXPECT_EQ(OB_ENTRY_NOT_EXIST, leader.palf_handle_impl_->locate_by_scn_coarsely(
186
                                    scn_cur, result_lsn));
187
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 256, leader_idx, lsn_array, scn_array));
188
  sleep(1);
189
  SCN ref_scn;
190
  ref_scn.convert_for_tx(10);
191
  EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, leader.palf_handle_impl_->locate_by_scn_coarsely(ref_scn, result_lsn));
192
  // contains log
193
  int64_t input_ts_ns = ObTimeUtility::current_time_ns();
194
  share::SCN input_scn;
195
  input_scn.convert_for_logservice(input_ts_ns);
196
  EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->locate_by_scn_coarsely(input_scn, result_lsn));
197
  EXPECT_TRUE(
198
      check_locate_correct(lsn_array, scn_array, input_scn, result_lsn, true));
199
  input_ts_ns += 1000 * 1000 * 1000 * 1000L;
200
  input_scn.convert_for_logservice(input_ts_ns);
201
  EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->locate_by_scn_coarsely(input_scn, result_lsn));
202
  EXPECT_TRUE(
203
      check_locate_correct(lsn_array, scn_array, input_scn, result_lsn, true));
204

205
  ObTimeGuard guard("mittest", 0);
206
  // write too much log, do not submit to farm
207
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 6000, leader_idx, lsn_array, scn_array));
208
  const int64_t min_ts = scn_array.front().get_val_for_logservice();
209
  const int64_t max_ts = scn_array.back().get_val_for_logservice();
210
  for (int i = 0; i < 10; i++) {
211
    const int64_t this_ts = ObRandom::rand(min_ts, max_ts);
212
    share::SCN this_scn;
213
    this_scn.convert_for_logservice(this_ts);
214
    guard.click();
215
    EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->locate_by_scn_coarsely(this_scn, result_lsn));
216
    guard.click();
217
    EXPECT_TRUE(
218
        check_locate_correct(lsn_array, scn_array, input_scn, result_lsn, true));
219
  }
220
  PALF_LOG(INFO, "end test_locate_by_scn_coarsely", K(id), K(guard));
221
}
222

223
TEST_F(TestObSimpleLogClusterBasicFunc, test_locate_by_lsn_coarsely)
224
{
225
  SET_CASE_LOG_FILE(TEST_NAME, "locate_by_lsn");
226
  const int64_t id = ATOMIC_AAF(&palf_id_, 1);
227
  PALF_LOG(INFO, "test test_locate_by_lsn_coarsely", K(id));
228
  int64_t leader_idx = 0;
229
  int ret = OB_SUCCESS;
230
  std::vector<LSN> lsn_array;
231
  std::vector<SCN> scn_array;
232
  PalfHandleImplGuard leader;
233
  share::SCN result_scn;
234
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
235
  // no log
236
  EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->locate_by_lsn_coarsely(LSN(), result_scn));
237
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, leader_idx, lsn_array, scn_array));
238
  sleep(1);
239

240
  LSN input_lsn;
241
  input_lsn.val_ = 0;
242
  EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->locate_by_lsn_coarsely(input_lsn, result_scn));
243
  EXPECT_TRUE(check_locate_correct(lsn_array, scn_array, result_scn, input_lsn, false));
244

245
  EXPECT_EQ(OB_SUCCESS,
246
            submit_log(leader, 65, 1024 * 1024, leader_idx, lsn_array, scn_array));
247

248
  input_lsn.val_ = 1 * PALF_BLOCK_SIZE;
249
  EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->locate_by_lsn_coarsely(input_lsn, result_scn));
250
  EXPECT_TRUE(check_locate_correct(lsn_array, scn_array, result_scn, input_lsn, false)) ;
251

252
  PALF_LOG(INFO, "end test_locate_by_lsn_coarsely", K(id));
253
}
254

255
TEST_F(TestObSimpleLogClusterBasicFunc, test_switch_leader)
256
{
257
  SET_CASE_LOG_FILE(TEST_NAME, "switch_leader");
258
  OB_LOGGER.set_log_level("TRACE");
259
  const int64_t id = ATOMIC_AAF(&palf_id_, 1);
260
  PALF_LOG(INFO, "start test_switch_leader", K(id));
261
  int64_t leader_idx = 0;
262
  PalfHandleImplGuard leader;
263
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
264

265
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1024, id));
266

267
  // switch leader
268
  leader_idx = 1;
269
  PalfHandleImplGuard new_leader1;
270
  EXPECT_EQ(OB_SUCCESS, switch_leader(id, leader_idx, new_leader1));
271
  EXPECT_EQ(OB_SUCCESS, submit_log(new_leader1, 1024, id));
272

273
  // switch leader
274
  PalfHandleImplGuard new_leader2;
275
  leader_idx = 2;
276
  EXPECT_EQ(OB_SUCCESS, switch_leader(id, leader_idx, new_leader2));
277
  EXPECT_EQ(OB_SUCCESS, submit_log(new_leader2, 1024, id));
278

279
  // switch leader
280
  PalfHandleImplGuard new_leader3;
281
  leader_idx = 1;
282
  EXPECT_EQ(OB_SUCCESS, switch_leader(id, leader_idx, new_leader3));
283
  // block the other two followers
284
  block_net(1, 0);
285
  block_net(1, 2);
286
  // leader submit new logs
287
  // PalfHandleImplGuard new_leader4;
288
  // EXPECT_EQ(OB_SUCCESS, submit_log(new_leader4, 1024, id));
289
  unblock_net(1, 0);
290
  unblock_net(1, 2);
291
  PALF_LOG(INFO, "end test_switch_leader", K(id));
292
}
293

294
TEST_F(TestObSimpleLogClusterBasicFunc, advance_base_lsn)
295
{
296
  SET_CASE_LOG_FILE(TEST_NAME, "advance_base_lsn");
297
  ObTimeGuard guard("advance_base_lsn", 0);
298
  OB_LOGGER.set_log_level("INFO");
299
  const int64_t id = ATOMIC_AAF(&palf_id_, 1);
300
  PALF_LOG(INFO, "start advance_base_lsn", K(id));
301
  int64_t leader_idx = 0;
302

303
  PalfHandleImplGuard leader;
304
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
305
  EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->log_engine_.truncate_prefix_blocks(LSN(0)));
306
  guard.click("create");
307
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id));
308
  guard.click("submit");
309
  guard.click("get_leader");
310
  EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->set_base_lsn(LSN(0)));
311
  guard.click("advance_base_lsn");
312

313
  PALF_LOG(INFO, "end test advance_base_lsn", K(id), K(guard));
314
}
315

316
TEST_F(TestObSimpleLogClusterBasicFunc, data_corrupted)
317
{
318
  SET_CASE_LOG_FILE(TEST_NAME, "data_corrupted");
319
  ObTimeGuard guard("data_corrupted", 0);
320
  OB_LOGGER.set_log_level("TRACE");
321
  const int64_t id = ATOMIC_AAF(&palf_id_, 1);
322
  PALF_LOG(INFO, "start advance_base_lsn", K(id));
323
  int64_t leader_idx = 0;
324

325
  PalfHandleImplGuard leader;
326
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
327
  guard.click("create");
328
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id));
329
  sleep(3);
330
  guard.click("submit");
331
  guard.click("get_leader");
332
  PalfGroupBufferIterator iterator;
333
  EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(LSN(0), iterator));
334
  LogGroupEntry entry;
335
  LSN lsn;
336
  EXPECT_EQ(OB_SUCCESS, iterator.next());
337
  EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, lsn));
338
  char *buf = static_cast<char*>(ob_malloc_align(LOG_DIO_ALIGN_SIZE, MAX_LOG_BUFFER_SIZE,
339
                                                 ObMemAttr(OB_SERVER_TENANT_ID, ObNewModIds::TEST)));
340
  int64_t pos = 0;
341
  LogWriteBuf write_buf;
342
  write_buf.push_back(buf, MAX_LOG_BUFFER_SIZE);
343
  EXPECT_EQ(OB_SUCCESS, entry.serialize(buf, MAX_LOG_BUFFER_SIZE, pos));
344
  char *group_size_buf = buf + 4;
345
  *group_size_buf = 1;
346
  LSN origin = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
347
  share::SCN new_scn;
348
  new_scn.convert_for_logservice(leader.palf_handle_impl_->get_max_scn().get_val_for_logservice() + 1);
349
  EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->log_engine_.
350
      append_log(leader.palf_handle_impl_->get_max_lsn(), write_buf, new_scn));
351
  PalfGroupBufferIterator iterator1;
352
  auto get_file_end_lsn = [](){
353
    return LSN(LOG_MAX_LSN_VAL);
354
  };
355
  EXPECT_EQ(OB_SUCCESS, iterator1.init(origin, get_file_end_lsn, &leader.palf_handle_impl_->log_engine_.log_storage_));
356
  EXPECT_EQ(OB_INVALID_DATA, iterator1.next());
357
  PALF_LOG(INFO, "end data_corrupted advance_base_lsn", K(id), K(guard));
358
}
359

360
TEST_F(TestObSimpleLogClusterBasicFunc, limit_palf_instances)
361
{
362
  SET_CASE_LOG_FILE(TEST_NAME, "limit_palf_instances");
363
  OB_LOGGER.set_log_level("INFO");
364
  int64_t id1 = ATOMIC_AAF(&palf_id_, 1);
365
  int64_t id2 = ATOMIC_AAF(&palf_id_, 1);
366
  int server_idx = 0;
367
  PalfEnv *palf_env = NULL;
368
  int64_t leader_idx = 0;
369
  share::SCN create_scn = share::SCN::base_scn();
370
	{
371
    PalfHandleImplGuard leader1;
372
    PalfHandleImplGuard leader2;
373
    PalfHandleImplGuard leader3;
374
    EXPECT_EQ(OB_SUCCESS, get_palf_env(server_idx, palf_env));
375
		int64_t id3 = palf_id_ + 1;
376
    palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_ = 2 * MIN_DISK_SIZE_PER_PALF_INSTANCE;
377
    EXPECT_EQ(OB_SUCCESS, create_paxos_group(id1, create_scn, leader_idx, leader1));
378
    EXPECT_EQ(OB_SUCCESS, create_paxos_group(id2, create_scn, leader_idx, leader2));
379
    EXPECT_EQ(OB_LOG_OUTOF_DISK_SPACE, create_paxos_group(id3, create_scn, leader_idx, leader3));
380
	}
381
	EXPECT_EQ(OB_SUCCESS, delete_paxos_group(id1));
382
}
383

384
TEST_F(TestObSimpleLogClusterBasicFunc, submit_group_log)
385
{
386
  SET_CASE_LOG_FILE(TEST_NAME, "submit_group_log");
387
  ObTimeGuard guard("submit_group_log", 0);
388
  const int64_t id = ATOMIC_AAF(&palf_id_, 1);
389
  const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1);
390
  PALF_LOG(INFO, "start submit_group_log", K(id));
391
  int64_t leader_idx = 0;
392

393
  PalfHandleImplGuard leader;
394
  PalfHandleImplGuard leader_raw_write;
395
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
396
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_raw_write, leader_idx, leader_raw_write));
397
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id, 1024 * 1024));
398
  EXPECT_EQ(OB_ITER_END, read_log(leader));
399
  EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, LSN(512*1024*100)));
400
  EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(leader_raw_write));
401
  EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, leader_raw_write));
402
  EXPECT_EQ(OB_ITER_END, read_log_from_memory(leader));
403
  PALF_LOG(INFO, "end submit_group_log advance_base_lsn", K(id), K(guard));
404
}
405

406
TEST_F(TestObSimpleLogClusterBasicFunc, io_reducer_basic)
407
{
408
  SET_CASE_LOG_FILE(TEST_NAME, "io_reducer_basic");
409

410
  OB_LOGGER.set_log_level("INFO");
411
  int64_t id = ATOMIC_AAF(&palf_id_, 1);
412
  int64_t leader_idx = 0;
413
  PalfHandleImplGuard leader;
414
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
415
  LogIOWorker *iow = leader.palf_handle_impl_->log_engine_.log_io_worker_;
416

417
  iow->batch_io_task_mgr_.has_batched_size_ = 0;
418
  iow->batch_io_task_mgr_.handle_count_ = 0;
419
  std::vector<PalfHandleImplGuard*> palf_list;
420
  EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
421
  int64_t lag_follower_idx = (leader_idx + 1) % node_cnt_;
422
  PalfHandleImplGuard &lag_follower = *palf_list[lag_follower_idx];
423
  block_net(leader_idx, lag_follower_idx);
424
  block_net(lag_follower_idx, leader_idx);
425

426
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10000, leader_idx, 120));
427
  const LSN max_lsn = leader.palf_handle_impl_->get_max_lsn();
428
  wait_lsn_until_flushed(max_lsn, leader);
429
  const int64_t has_batched_size = iow->batch_io_task_mgr_.has_batched_size_;
430
  const int64_t handle_count = iow->batch_io_task_mgr_.handle_count_;
431
  const int64_t log_id = leader.palf_handle_impl_->sw_.get_max_log_id();
432
  PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "batched_size", K(has_batched_size), K(log_id));
433

434
  unblock_net(leader_idx, lag_follower_idx);
435
  unblock_net(lag_follower_idx, leader_idx);
436

437
  int64_t start_ts = ObTimeUtility::current_time();
438
  LSN lag_follower_max_lsn = lag_follower.palf_handle_impl_->sw_.max_flushed_end_lsn_;
439
  while (lag_follower_max_lsn < max_lsn) {
440
    sleep(1);
441
    PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "follower is lagged", K(max_lsn), K(lag_follower_max_lsn));
442
    lag_follower_max_lsn = lag_follower.palf_handle_impl_->sw_.max_flushed_end_lsn_;
443
  }
444
  LogIOWorker *iow_follower = lag_follower.palf_handle_impl_->log_engine_.log_io_worker_;
445
  const int64_t follower_has_batched_size = iow_follower->batch_io_task_mgr_.has_batched_size_;
446
  const int64_t follower_handle_count = iow_follower->batch_io_task_mgr_.handle_count_;
447
  EXPECT_EQ(OB_SUCCESS, revert_cluster_palf_handle_guard(palf_list));
448

449
  int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
450
  PALF_LOG_RET(ERROR, OB_SUCCESS, "runlin trace performance", K(cost_ts), K(log_id), K(max_lsn), K(has_batched_size), K(handle_count),
451
      K(follower_has_batched_size), K(follower_handle_count));
452
}
453

454
TEST_F(TestObSimpleLogClusterBasicFunc, create_palf_via_middle_lsn)
455
{
456
  SET_CASE_LOG_FILE(TEST_NAME, "create_palf_via_middle_lsn");
457
  const int64_t prev_log_id = 1000;
458
  const LSN mid_lsn(palf::PALF_BLOCK_SIZE);
459
  const LSN prev_lsn(palf::PALF_BLOCK_SIZE - 100);
460
  const int64_t prev_log_ts = common::ObTimeUtility::current_time_ns();
461
  share::SCN prev_scn;
462
  prev_scn.convert_for_logservice(prev_log_ts);
463
  const int64_t prev_log_pid = 100;
464
  PalfBaseInfo palf_base_info;
465
  palf_base_info.curr_lsn_ = mid_lsn;
466
  palf_base_info.prev_log_info_.log_id_ = prev_log_id;
467
  palf_base_info.prev_log_info_.lsn_ = prev_lsn;
468
  palf_base_info.prev_log_info_.scn_ = prev_scn;
469
  palf_base_info.prev_log_info_.log_proposal_id_ = prev_log_pid;
470
  palf_base_info.prev_log_info_.accum_checksum_ = 1;
471
  int64_t id = ATOMIC_AAF(&palf_id_, 1);
472
  int64_t leader_idx = 0;
473
  {
474
    PalfHandleImplGuard leader;
475
    EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, palf_base_info, leader_idx, leader));
476
  }
477

478
  EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
479

480
  {
481
    PalfHandleImplGuard leader;
482
    int64_t leader_idx;
483
    EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
484
    EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, leader_idx, 1024*1024));
485
    const LSN max_lsn = leader.palf_handle_impl_->get_max_lsn();
486
    const int64_t proposal_id = leader.palf_handle_impl_->state_mgr_.get_proposal_id();
487
    const LSN begin_lsn = leader.palf_handle_impl_->log_engine_.get_begin_lsn();
488
    share::SCN begin_scn;
489
    EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_begin_scn(begin_scn));
490
    // check LogInfo
491
    EXPECT_GT(proposal_id, prev_log_pid);
492
    EXPECT_GT(begin_scn, prev_scn);
493
    EXPECT_EQ(begin_lsn, mid_lsn);
494
    // wait_lsn_until_flushed(max_lsn, leader);
495
    wait_until_has_committed(leader, max_lsn);
496
    EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, 10*1024));
497
  }
498

499
  EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
500

501
  {
502
    PalfHandleImplGuard leader;
503
    int64_t leader_idx;
504
    EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
505
    EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, leader_idx, 1024*1024));
506
    const LSN max_lsn = leader.palf_handle_impl_->get_max_lsn();
507
    wait_lsn_until_flushed(max_lsn, leader);
508
    const LSN mid_lsn(palf::PALF_BLOCK_SIZE);
509
    EXPECT_EQ(OB_ITER_END, read_log(leader, mid_lsn));
510
  }
511
}
512
} // end unittest
513
} // end oceanbase
514

515
// Notes: How to write a new module integrate test case in logservice?
516
// 1. cp test_ob_simple_log_basic_func.cpp test_ob_simple_log_xxx.cpp
517
// 2. modify const string TEST_NAME, class name and log file name in
518
// test_ob_simple_log_xxx.cpp
519
// 3. add ob_unittest_clog() item and set label for test_ob_simple_log_xxx in
520
// unittest/cluster/CMakeFiles.txt
521
// 4. write new TEST_F
522

523
int main(int argc, char **argv)
524
{
525
  RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
526
}
527

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.