oceanbase

Форк
0
/
test_ob_simple_log_engine.cpp 
756 строк · 35.7 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#include "lib/ob_define.h"
14
#include "lib/ob_errno.h"
15
#include "lib/oblog/ob_log.h"
16
#include "lib/time/ob_time_utility.h"
17
#include "logservice/palf/log_group_entry.h"
18
#include <cstdio>
19
#include <gtest/gtest.h>
20
#include <signal.h>
21
#include <stdexcept>
22
#define private public
23
#include "env/ob_simple_log_cluster_env.h"
24
#include "logservice/palf/log_reader_utils.h"
25
#include "logservice/palf/log_define.h"
26
#include "logservice/palf/log_group_entry_header.h"
27
#include "logservice/palf/log_io_worker.h"
28
#include "logservice/palf/log_shared_queue_thread.h"
29
#include "logservice/palf/lsn.h"
30
#include "share/scn.h"
31
#include "logservice/palf/log_io_task.h"
32
#include "logservice/palf/log_writer_utils.h"
33
#include "logservice/palf_handle_guard.h"
34
#undef private
35

36
const std::string TEST_NAME = "log_engine";
37

38
using namespace oceanbase::common;
39
using namespace oceanbase;
40
namespace oceanbase
41
{
42
using namespace logservice;
43
using namespace palf;
44
namespace unittest
45
{
46
class TestObSimpleLogClusterLogEngine : public ObSimpleLogClusterTestEnv
47
{
48
public:
49
  TestObSimpleLogClusterLogEngine() : ObSimpleLogClusterTestEnv()
50
  {
51
    palf_epoch_ = 0;
52
  }
53
  ~TestObSimpleLogClusterLogEngine() { destroy(); }
54
  int init()
55
  {
56
    int ret = OB_SUCCESS;
57
    int64_t leader_idx = 0;
58
    id_ = ATOMIC_AAF(&palf_id_, 1);
59
    if (OB_FAIL(create_paxos_group(id_, leader_idx, leader_))) {
60
      PALF_LOG(ERROR, "create_paxos_group failed", K(ret));
61
    } else {
62
      log_engine_ = &leader_.palf_handle_impl_->log_engine_;
63
    }
64
    return ret;
65
  }
66
  int reload(const LSN &log_tail_redo, const LSN &log_tail_meta, const LSN &base_lsn)
67
  {
68
    int ret = OB_SUCCESS;
69
    palf_epoch_ = ATOMIC_AAF(&palf_epoch_, 1);
70
    LogGroupEntryHeader entry_header;
71
    bool is_integrity = true;
72
    ObILogAllocator *alloc_mgr = log_engine_->alloc_mgr_;
73
    LogRpc *log_rpc = log_engine_->log_net_service_.log_rpc_;
74
    LogIOWorker *log_io_worker = log_engine_->log_io_worker_;
75
    LogSharedQueueTh *log_shared_queue_th = log_engine_->log_shared_queue_th_;
76
    LogPlugins *plugins = log_engine_->plugins_;
77
    LogEngine log_engine;
78
    ILogBlockPool *log_block_pool = log_engine_->log_storage_.block_mgr_.log_block_pool_;
79
    if (OB_FAIL(log_engine.load(leader_.palf_handle_impl_->palf_id_,
80
                                leader_.palf_handle_impl_->log_dir_,
81
                                alloc_mgr,
82
                                log_block_pool,
83
                                &(leader_.palf_handle_impl_->hot_cache_),
84
                                log_rpc,
85
                                log_io_worker,
86
                                log_shared_queue_th,
87
                                plugins,
88
                                entry_header,
89
                                palf_epoch_,
90
                                is_integrity,
91
                                PALF_BLOCK_SIZE,
92
                                PALF_META_BLOCK_SIZE))) {
93
      PALF_LOG(WARN, "load failed", K(ret));
94
    } else if (log_tail_redo != log_engine.log_storage_.log_tail_
95
        || log_tail_meta != log_engine.log_meta_storage_.log_tail_
96
        || base_lsn != log_engine.log_meta_.log_snapshot_meta_.base_lsn_) {
97
      ret = OB_ERR_UNEXPECTED;
98
      PALF_LOG(ERROR, "reload failed", K(ret), K(log_engine), KPC(log_engine_), K(log_tail_redo), K(log_tail_meta), K(base_lsn));
99
    } else {
100
      PALF_LOG(INFO, "reload success", K(log_engine), KPC(log_engine_));
101
    }
102
    return ret;
103
  }
104

105
  int delete_block_by_human(const block_id_t block_id)
106
  {
107
    int ret = OB_SUCCESS;
108
    char file_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};
109
    const char *log_dir = log_engine_->log_storage_.block_mgr_.log_dir_;
110
    if (OB_FAIL(convert_to_normal_block(log_dir, block_id, file_path, OB_MAX_FILE_NAME_LENGTH))) {
111
      PALF_LOG(WARN, "convert_to_normal_block failed", K(ret), K(log_dir), K(block_id));
112
    } else if (0 != unlink(file_path)){
113
      ret = convert_sys_errno();
114
      PALF_LOG(WARN, "unlink failed", K(ret), K(block_id), K(file_path));
115
    }
116
    return ret;
117
  }
118
  int write_several_blocks(const block_id_t base_block_id, const int block_count)
119
  {
120
    int64_t long_buf_len = 16383 * 128;
121
    LogWriteBuf write_buf;
122
    char *long_buf = reinterpret_cast<char *>(ob_malloc(long_buf_len, "test_log_engine"));
123
    LogGroupEntryHeader header;
124
    int64_t log_checksum;
125
    const block_id_t donot_delete_block_before_this = 3;
126
    write_buf.reset();
127
    memset(long_buf, 0, long_buf_len);
128
    EXPECT_EQ(OB_SUCCESS, write_buf.push_back(long_buf, long_buf_len));
129
    // EXPECT_EQ(32, write_buf.write_buf_.count());
130
    EXPECT_EQ(OB_SUCCESS,
131
              header.generate(false,
132
                              true,
133
                              write_buf,
134
                              long_buf_len - sizeof(LogGroupEntryHeader),
135
                              share::SCN::base_scn(),
136
                              1,
137
                              LSN(donot_delete_block_before_this * PALF_BLOCK_SIZE),
138
                              1,
139
                              log_checksum));
140
    header.update_header_checksum();
141
    int64_t pos = 0;
142
    EXPECT_EQ(OB_SUCCESS, header.serialize(long_buf, long_buf_len, pos));
143
    int ret = OB_SUCCESS;
144
    LogStorage &log_storage = leader_.palf_handle_impl_->log_engine_.log_storage_;
145
    block_id_t min_block_id = LOG_INVALID_BLOCK_ID, max_block_id = LOG_INVALID_BLOCK_ID;
146
    if (block_count == 0) {
147
      ret = OB_INVALID_ARGUMENT;
148
      return ret;
149
    }
150
    bool need_submit_log = true;
151
    if (OB_FAIL(log_storage.get_block_id_range(min_block_id, max_block_id)) && OB_ENTRY_NOT_EXIST != ret) {
152
      PALF_LOG(ERROR, "get_block_id_range failed", K(ret));
153
    } else if (OB_ENTRY_NOT_EXIST == ret) {
154
      min_block_id = base_block_id;
155
      max_block_id = base_block_id;
156
      ret = OB_SUCCESS;
157
    }
158
    block_id_t end_block_id = max_block_id + block_count;
159
    PALF_LOG(INFO, "runlin trace before", K(end_block_id), K(max_block_id));
160
    do {
161
      if (max_block_id < end_block_id) {
162
        need_submit_log = true;
163
        ret = OB_SUCCESS;
164
      } else {
165
        need_submit_log = false;
166
      }
167
      share::SCN tmp_scn;
168
      tmp_scn.convert_for_logservice(max_block_id);
169
      if (true == need_submit_log && OB_FAIL(log_storage.writev(log_storage.log_tail_, write_buf, tmp_scn))) {
170
        PALF_LOG(ERROR, "submit_log failed", K(ret));
171
      } else {
172
      }
173
      if (OB_FAIL(log_storage.get_block_id_range(min_block_id, max_block_id))) {
174
        PALF_LOG(ERROR, "get_block_id_range failed", K(ret));
175
      }
176
    } while (OB_SUCC(ret) && true == need_submit_log);
177
    PALF_LOG(INFO, "runlin trace after", K(end_block_id), K(max_block_id));
178
    return ret;
179
  }
180
  void destroy() {}
181
  int64_t id_;
182
  int64_t palf_epoch_;
183
  LogEngine *log_engine_;
184
  PalfHandleImplGuard leader_;
185
};
186

187
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 1;
188
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1;
189
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
190
bool ObSimpleLogClusterTestBase::need_add_arb_server_  = false;
191

192
// 验证flashback过程中宕机重启
193
TEST_F(TestObSimpleLogClusterLogEngine, flashback_restart)
194
{
195
  SET_CASE_LOG_FILE(TEST_NAME, "flashback_restart");
196
  OB_LOGGER.set_log_level("TRACE");
197
  PALF_LOG(INFO, "begin flashback_restart");
198
  PalfHandleImplGuard leader;
199
  int64_t id_1 = ATOMIC_AAF(&palf_id_, 1);
200
  int64_t leader_idx_1 = 0;
201
  PalfEnv *palf_env = NULL;
202
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_1, leader_idx_1, leader));
203
  EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx_1, palf_env));
204
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 66, leader_idx_1, MAX_LOG_BODY_SIZE));
205
  EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
206
  SCN scn;
207
  LogStorage *log_storage = &leader.get_palf_handle_impl()->log_engine_.log_storage_;
208
  LSN log_tail = log_storage->log_tail_;
209
  scn = leader.get_palf_handle_impl()->get_end_scn();
210
  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 33, leader_idx_1, MAX_LOG_BODY_SIZE));
211
  EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
212
  int64_t mode_version;
213
  AccessMode mode;
214
  EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->get_access_mode(mode_version, mode));
215
  LSN flashback_lsn(PALF_BLOCK_SIZE*lsn_2_block(log_tail, PALF_BLOCK_SIZE));
216
  EXPECT_EQ(OB_SUCCESS, log_storage->begin_flashback(flashback_lsn));
217
  leader.reset();
218
  EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
219

220
  {
221
    PalfHandleImplGuard leader1;
222
    EXPECT_EQ(OB_SUCCESS, get_leader(id_1, leader1, leader_idx_1));
223
    LogStorage *log_storage = &leader1.get_palf_handle_impl()->log_engine_.log_storage_;
224
    EXPECT_LE(2, log_storage->block_mgr_.max_block_id_);
225
    EXPECT_EQ(OB_SUCCESS, log_storage->block_mgr_.create_tmp_block_handler(2));
226
    EXPECT_EQ(OB_SUCCESS, log_storage->update_manifest_(3));
227
    EXPECT_EQ(OB_SUCCESS, log_storage->block_mgr_.delete_block_from_back_to_front_until(2));
228
    {
229
      LogBlockMgr *block_mgr = &log_storage->block_mgr_;
230
      int block_id = 2;
231
      int ret = OB_SUCCESS;
232
      // 1. rename "block_id.tmp" to "block_id.flashback"
233
      // 2. delete "block_id", make sure each block has returned into BlockPool
234
      // 3. rename "block_id.flashback" to "block_id"
235
      // NB: for restart, the block which named 'block_id.flashback' must be renamed to 'block_id'
236
      char tmp_block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};
237
      char block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};
238
      char flashback_block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};
239
      if (block_id != block_mgr->curr_writable_block_id_) {
240
        ret = OB_ERR_UNEXPECTED;
241
        PALF_LOG(ERROR, "block_id is not same as curr_writable_handler_, unexpected error",
242
            K(ret), K(block_id), KPC(block_mgr));
243
      } else if (OB_FAIL(block_id_to_string(block_id, block_path, OB_MAX_FILE_NAME_LENGTH))) {
244
	PALF_LOG(ERROR, "block_id_to_string failed", K(ret), K(block_id));
245
      } else if (OB_FAIL(block_id_to_tmp_string(block_id, tmp_block_path, OB_MAX_FILE_NAME_LENGTH))) {
246
	PALF_LOG(ERROR, "block_id_to_tmp_string failed", K(ret), K(block_id));
247
      } else if (OB_FAIL(block_id_to_flashback_string(block_id, flashback_block_path, OB_MAX_FILE_NAME_LENGTH))) {
248
	PALF_LOG(ERROR, "block_id_to_flashback_string failed", K(ret), K(block_id));
249
      } else if (OB_FAIL(block_mgr->do_rename_and_fsync_(tmp_block_path, flashback_block_path))) {
250
        PALF_LOG(ERROR, "do_rename_and_fsync_ failed", K(ret), KPC(block_mgr));
251
      } else {
252
        PALF_LOG(INFO, "rename_tmp_block_handler_to_normal success", K(ret), KPC(block_mgr));
253
      }
254
    }
255
  }
256
  EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
257
  EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
258
}
259

260
TEST_F(TestObSimpleLogClusterLogEngine, exception_path)
261
{
262
  SET_CASE_LOG_FILE(TEST_NAME, "exception_path");
263
  EXPECT_EQ(OB_SUCCESS, init());
264
  OB_LOGGER.set_log_level("TRACE");
265
  // TODO: to be reopened by runlin.
266
  ObTenantMutilAllocator *allocator =
267
      dynamic_cast<ObTenantMutilAllocator *>(log_engine_->alloc_mgr_);
268
  OB_ASSERT(NULL != allocator);
269
  allocator->set_limit(32);
270
  FlushLogCbCtx flush_ctx;
271
  LogWriteBuf write_buf;
272
  const char *buf = "hello";
273
  EXPECT_FALSE(flush_ctx.is_valid());
274
  EXPECT_FALSE(write_buf.is_valid());
275
  EXPECT_EQ(OB_INVALID_ARGUMENT, log_engine_->submit_flush_log_task(flush_ctx, write_buf));
276
  flush_ctx.lsn_ = LSN(1);
277
  flush_ctx.scn_ = share::SCN::base_scn();
278
  EXPECT_EQ(OB_INVALID_ARGUMENT, write_buf.push_back(NULL, strlen(buf)));
279
  EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf, strlen(buf)));
280
  EXPECT_EQ(OB_ALLOCATE_MEMORY_FAILED, log_engine_->submit_flush_log_task(flush_ctx, write_buf));
281
  write_buf.reset();
282
  const int64_t long_buf_len = MAX_LOG_BODY_SIZE;
283
  char *long_buf = reinterpret_cast<char *>(ob_malloc(long_buf_len, "test_log_engine"));
284
  LogGroupEntryHeader header;
285
  int64_t log_checksum;
286
  const block_id_t donot_delete_block_before_this = 3;
287
  write_buf.reset();
288
  memset(long_buf, 0, long_buf_len);
289

290
  // Test LogStorage
291
  LogStorage *log_storage = &log_engine_->log_storage_;
292
  LogStorage *meta_storage = &log_engine_->log_meta_storage_;
293
  block_id_t min_block_id, max_block_id;
294
  share::SCN tmp_scn;
295
  EXPECT_EQ(OB_INVALID_ARGUMENT,
296
            log_engine_->append_log(LSN(LOG_INVALID_LSN_VAL), write_buf, tmp_scn));
297
  EXPECT_EQ(OB_INVALID_ARGUMENT, log_storage->writev(LSN(LOG_INVALID_LSN_VAL), write_buf, tmp_scn));
298
  EXPECT_EQ(OB_ENTRY_NOT_EXIST, log_engine_->get_block_id_range(min_block_id, max_block_id));
299
  EXPECT_EQ(LSN(0), log_engine_->get_begin_lsn());
300
  EXPECT_EQ(OB_ENTRY_NOT_EXIST, log_storage->get_block_id_range(min_block_id, max_block_id));
301
  EXPECT_EQ(LSN(0), log_storage->get_begin_lsn());
302
  EXPECT_EQ(OB_SUCCESS, log_storage->truncate_prefix_blocks(LSN(0)));
303
  EXPECT_EQ(true, log_storage->need_append_block_header_);
304
  EXPECT_EQ(true, log_storage->need_switch_block_());
305
  EXPECT_EQ(OB_INVALID_ARGUMENT, log_storage->truncate(LSN(100000000)));
306
  // no block id 1
307
  EXPECT_EQ(OB_ERR_UNEXPECTED, log_storage->delete_block(1));
308
  EXPECT_EQ(OB_INVALID_ARGUMENT, meta_storage->append_meta(buf, 10000000));
309

310
  int64_t log_id = 1;
311
  share::SCN scn = share::SCN::base_scn();
312
  LSN truncate_lsn;
313
  allocator->set_limit(1*1024*1024*1024);
314

315
  EXPECT_EQ(OB_SUCCESS, write_several_blocks(0, 11));
316
  PALF_LOG(INFO, "after write_several_blocks 11");
317

318
  EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));
319
  EXPECT_EQ(0, min_block_id);
320
  EXPECT_EQ(11, max_block_id);
321

322
  // 测试truncate场景
323
  block_id_t truncate_block_id = max_block_id - 2;
324
  EXPECT_EQ(OB_SUCCESS, log_storage->truncate(LSN(truncate_block_id * PALF_BLOCK_SIZE)));
325
  EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));
326
  // 此时最后一个block是空的
327
  EXPECT_EQ(log_storage->log_tail_, LSN(truncate_block_id * PALF_BLOCK_SIZE));
328
  EXPECT_EQ(truncate_block_id, max_block_id);
329
  EXPECT_EQ(lsn_2_block(log_engine_->log_meta_storage_.log_block_header_.min_lsn_, PALF_BLOCK_SIZE), truncate_block_id + 1);
330

331
  LogSnapshotMeta snapshot_meta;
332
  EXPECT_EQ(OB_SUCCESS, snapshot_meta.generate(LSN(1 * PALF_BLOCK_SIZE)));
333
  EXPECT_EQ(OB_SUCCESS, log_engine_->log_meta_.update_log_snapshot_meta(snapshot_meta));
334
  EXPECT_EQ(OB_SUCCESS, log_engine_->append_log_meta_(log_engine_->log_meta_));
335
  EXPECT_EQ(OB_SUCCESS, log_storage->delete_block(0));
336
  EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));
337
  EXPECT_EQ(1, min_block_id);
338
  EXPECT_EQ(LSN(max_block_id * PALF_BLOCK_SIZE), log_storage->log_tail_);
339

340
  log_storage = log_engine_->get_log_storage();
341
  LogBlockHeader block_header;
342
  share::SCN scn_0;
343
  share::SCN scn_11;
344
  EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, log_storage->get_block_min_scn(0, scn_0));
345
  EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, log_storage->read_block_header_(0, block_header));
346
  EXPECT_EQ(OB_ERR_OUT_OF_UPPER_BOUND,
347
            log_storage->get_block_min_scn(truncate_block_id, scn_11));
348
  LSN log_tail = log_engine_->log_storage_.log_tail_;
349
  share::SCN ts_origin = scn_11;
350
  PALF_LOG(INFO, "after second write_several_blocks 1", K(truncate_block_id), K(max_block_id));
351
  // 由于truncate之后,最后一个文件是空的,因此max_block_id = truncate_block_id
352
  EXPECT_EQ(OB_SUCCESS, write_several_blocks(0, 1));
353
  EXPECT_EQ(OB_SUCCESS, log_storage->get_block_min_scn(truncate_block_id, scn_11));
354
  EXPECT_NE(scn_11, ts_origin);
355

356
  // 测试重启场景
357
  EXPECT_EQ(OB_SUCCESS, reload(log_engine_->log_storage_.log_tail_, log_engine_->log_meta_storage_.log_tail_, log_engine_->log_meta_.log_snapshot_meta_.base_lsn_));
358
  PALF_LOG(INFO, "after reload1");
359

360
  //测试truncate_prefix 场景
361
  block_id_t truncate_prefix_block_id = 4;
362
  LogInfo prev_log_info;
363
  prev_log_info.lsn_ = LSN(truncate_prefix_block_id*PALF_BLOCK_SIZE);
364
  prev_log_info.log_id_ = 0;
365
  prev_log_info.log_proposal_id_ = 0;
366
  prev_log_info.scn_ = share::SCN::min_scn();
367
  prev_log_info.accum_checksum_ = 0;
368
  EXPECT_EQ(OB_SUCCESS, snapshot_meta.generate(prev_log_info.lsn_, prev_log_info));
369
  EXPECT_EQ(OB_SUCCESS, log_engine_->log_meta_.update_log_snapshot_meta(snapshot_meta));
370
  EXPECT_EQ(OB_SUCCESS, log_engine_->append_log_meta_(log_engine_->log_meta_));
371
  EXPECT_EQ(OB_SUCCESS,
372
            log_storage->truncate_prefix_blocks(LSN(truncate_prefix_block_id * PALF_BLOCK_SIZE)));
373
  // 测试truncate_prefix后,继续写一个block
374
  write_several_blocks(0, 1);
375
  EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));
376
  EXPECT_EQ(truncate_prefix_block_id, min_block_id);
377
  EXPECT_EQ(truncate_block_id+2, max_block_id);
378

379
  // 测试目录清空场景,此时log_tail应该为truncate_prefix_block_id
380
  // 目录清空之后,会重置log_tail
381
  truncate_prefix_block_id = max_block_id + 2;
382
  prev_log_info.lsn_ = LSN(truncate_prefix_block_id*PALF_BLOCK_SIZE);
383
  prev_log_info.log_id_ = 0;
384
  prev_log_info.log_proposal_id_ = 0;
385
  prev_log_info.scn_ =SCN::min_scn();
386
  prev_log_info.accum_checksum_ = 0;
387
  EXPECT_EQ(OB_SUCCESS, snapshot_meta.generate(prev_log_info.lsn_, prev_log_info));
388
  EXPECT_EQ(OB_SUCCESS, log_engine_->log_meta_.update_log_snapshot_meta(snapshot_meta));
389
  EXPECT_EQ(OB_SUCCESS, log_engine_->append_log_meta_(log_engine_->log_meta_));
390
  const LSN old_log_tail = log_engine_->log_storage_.log_tail_;
391
  EXPECT_EQ(OB_SUCCESS, log_engine_->truncate_prefix_blocks(prev_log_info.lsn_));
392
  EXPECT_EQ(OB_ENTRY_NOT_EXIST, log_storage->get_block_id_range(min_block_id, max_block_id));
393
  // truncate_prefix_block_id 和 prev_lsn对应的block_id一样
394
  EXPECT_EQ(log_storage->log_tail_, LSN(truncate_prefix_block_id * PALF_BLOCK_SIZE));
395

396
  // 测试目录清空后,读数据是否正常报错
397
  ReadBufGuard buf_guard("dummy", 100);
398
  int64_t out_read_size;
399
  EXPECT_EQ(OB_ERR_OUT_OF_UPPER_BOUND,
400
            log_storage->pread(LSN((truncate_prefix_block_id + 1) * PALF_BLOCK_SIZE),
401
                               100,
402
                               buf_guard.read_buf_,
403
                               out_read_size));
404
  EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND,
405
            log_storage->pread(LSN((truncate_prefix_block_id - 1) * PALF_BLOCK_SIZE),
406
                               100,
407
                               buf_guard.read_buf_,
408
                               out_read_size));
409
  // 测试目录清空后,重启是否正常
410
  EXPECT_EQ(OB_SUCCESS, reload(log_engine_->log_storage_.log_tail_, log_engine_->log_meta_storage_.log_tail_, log_engine_->log_meta_.log_snapshot_meta_.base_lsn_));
411

412
  PALF_LOG(INFO, "directory is empty");
413
  // 测试目录清空后,写数据是否正常
414
  // 此时log_tail为truncate_prefix_block_id的头部
415
  const block_id_t expected_min_block_id = lsn_2_block(log_storage->log_tail_, log_storage->logical_block_size_);
416
  EXPECT_EQ(OB_SUCCESS, write_several_blocks(expected_min_block_id, 3));
417
  EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));
418
  EXPECT_EQ(expected_min_block_id, min_block_id);
419
  EXPECT_EQ(expected_min_block_id+3, max_block_id);
420
  share::SCN scn_cur;
421
  EXPECT_EQ(OB_SUCCESS, log_engine_->get_block_min_scn(max_block_id, scn_cur));
422

423
  // 测试人为删除文件的重启场景
424
  EXPECT_EQ(OB_SUCCESS, log_engine_->get_block_id_range(min_block_id, max_block_id));
425
  EXPECT_EQ(OB_SUCCESS, delete_block_by_human(max_block_id));
426
  EXPECT_EQ(OB_ERR_UNEXPECTED, reload(log_engine_->log_storage_.log_tail_, log_engine_->log_meta_storage_.log_tail_, log_engine_->log_meta_.log_snapshot_meta_.base_lsn_));
427
  EXPECT_EQ(OB_SUCCESS, delete_block_by_human(min_block_id));
428
  EXPECT_EQ(OB_ERR_UNEXPECTED, reload(log_engine_->log_storage_.log_tail_, log_engine_->log_meta_storage_.log_tail_, log_engine_->log_meta_.log_snapshot_meta_.base_lsn_));
429

430
  if (OB_NOT_NULL(long_buf)) {
431
    ob_free(long_buf);
432
  }
433
  leader_.reset();
434
  PALF_LOG(INFO, "end exception_path");
435
}
436

437

438
TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
439
{
440
  SET_CASE_LOG_FILE(TEST_NAME, "io_reducer_func");
441
  update_server_log_disk(4*1024*1024*1024ul);
442
  update_disk_options(4*1024*1024*1024ul/palf::PALF_PHY_BLOCK_SIZE);
443
  OB_LOGGER.set_log_level("TRACE");
444
  PALF_LOG(INFO, "begin io_reducer_basic_func");
445
  PalfHandleImplGuard leader_1;
446
  int64_t id_1 = ATOMIC_AAF(&palf_id_, 1);
447
  int64_t leader_idx_1 = 0;
448
  PalfEnv *palf_env = NULL;
449
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_1, leader_idx_1, leader_1));
450
  EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx_1, palf_env));
451

452
  LogIOWorker *log_io_worker = leader_1.palf_handle_impl_->log_engine_.log_io_worker_;
453

454
  int64_t prev_log_id_1 = 0;
455
  int64_t prev_has_batched_size = 0;
456
	LogEngine *log_engine = &leader_1.palf_handle_impl_->log_engine_;
457
	IOTaskCond io_task_cond_1(id_1, log_engine->palf_epoch_);
458
  IOTaskVerify io_task_verify_1(id_1, log_engine->palf_epoch_);
459
  // 单日志流场景
460
  // 卡住log_io_worker的处理
461
  {
462
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_1));
463
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1024, id_1, 110));
464
    const int64_t log_id = leader_1.palf_handle_impl_->sw_.get_max_log_id();
465
    LSN max_lsn = leader_1.palf_handle_impl_->sw_.get_max_lsn();
466
    io_task_cond_1.cond_.signal();
467
    wait_lsn_until_flushed(max_lsn, leader_1);
468
    EXPECT_EQ(OB_ITER_END, read_log(leader_1));
469
    // sw内部做了自适应freeze之后这个等式可能不成立, 因为上层可能基于写盘反馈触发提交下一个io_task
470
//    EXPECT_EQ(log_id, log_io_worker->batch_io_task_mgr_.has_batched_size_);
471
    prev_log_id_1 = log_id;
472
    prev_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
473
  }
474
  // 单日志流场景
475
  // 当聚合度为1的时候,应该走正常的提交流程,目前暂未实现,先通过has_batched_size不计算绕过
476
  {
477
    // 聚合度为1的忽略
478
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_1));
479
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110));
480
    sleep(1);
481
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_verify_1));
482
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_verify_1));
483
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1024, id_1, 110));
484
    const int64_t log_id = leader_1.palf_handle_impl_->sw_.get_max_log_id();
485
    LSN max_lsn = leader_1.palf_handle_impl_->sw_.get_max_lsn();
486
    io_task_cond_1.cond_.signal();
487
    wait_lsn_until_flushed(max_lsn, leader_1);
488
//    EXPECT_EQ(log_id - 1, log_io_worker->batch_io_task_mgr_.has_batched_size_);
489
    EXPECT_EQ(2, io_task_verify_1.count_);
490
//    EXPECT_EQ(log_io_worker->batch_io_task_mgr_.has_batched_size_ - prev_has_batched_size,
491
//              log_id - prev_log_id_1 - 1);
492
    prev_log_id_1 = log_id;
493
    prev_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
494
  }
495

496
  // 多日志流场景
497
  int64_t id_2 = ATOMIC_AAF(&palf_id_, 1);
498
  int64_t prev_log_id_2 = 0;
499
  int64_t leader_idx_2 = 0;
500
  PalfHandleImplGuard leader_2;
501
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_2, leader_idx_2, leader_2));
502
	IOTaskCond io_task_cond_2(id_2, leader_2.get_palf_handle_impl()->log_engine_.palf_epoch_);
503
  IOTaskVerify io_task_verify_2(id_2, leader_2.get_palf_handle_impl()->log_engine_.palf_epoch_);
504
  {
505
    LogIOWorker *log_io_worker = leader_2.palf_handle_impl_->log_engine_.log_io_worker_;
506
    // 聚合度为1的忽略
507
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_2));
508
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110));
509
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));
510
    sleep(1);
511
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_verify_2));
512
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_verify_1));
513

514
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1024, id_1, 110));
515
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1024, id_2, 110));
516

517
    const int64_t log_id_1 = leader_1.palf_handle_impl_->sw_.get_max_log_id();
518
    LSN max_lsn_1 = leader_1.palf_handle_impl_->sw_.get_max_lsn();
519
    const int64_t log_id_2 = leader_2.palf_handle_impl_->sw_.get_max_log_id();
520
    LSN max_lsn_2 = leader_2.palf_handle_impl_->sw_.get_max_lsn();
521
    sleep(1);
522
    io_task_cond_2.cond_.signal();
523
    wait_lsn_until_flushed(max_lsn_1, leader_1);
524
    wait_lsn_until_flushed(max_lsn_2, leader_2);
525
    EXPECT_EQ(3, io_task_verify_1.count_);
526
    EXPECT_EQ(1, io_task_verify_2.count_);
527

528
    // ls1已经有个一个log_id被忽略聚合了
529
//    EXPECT_EQ(log_io_worker->batch_io_task_mgr_.has_batched_size_ - prev_has_batched_size,
530
//              log_id_1 - 1 + log_id_2 -1 - prev_log_id_1);
531
    prev_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
532
    prev_log_id_2 = log_id_2;
533
    prev_log_id_1 = log_id_1;
534
  }
535

536
  // 三个日志流,stripe为2
537
  // 目前不支持可配的LogIOWorkerConfig,此测试暂时不打开,但结果是对的
538
  // int64_t id_3 = ATOMIC_AAF(&palf_id_, 1);
539
  // int64_t leader_idx_3 = 0;
540
  // int64_t prev_log_id_3 = 0;
541
  // PalfHandleImplGuard leader_3;
542
  // IOTaskCond io_task_cond_3;
543
  // IOTaskVerify io_task_verify_3;
544
  // io_task_cond_3.init(id_3);
545
  // io_task_verify_3.init(id_3);
546
  // EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_3, leader_idx_3, leader_3));
547
  // {
548
  //   EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3));
549
  //   EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110));
550
  //   EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));
551
  //   EXPECT_EQ(OB_SUCCESS, submit_log(leader_3, 1, id_3, 110));
552
  //   EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));
553
  //   sleep(1);
554
  //   io_task_cond_3.cond_.signal();
555
  //   const int64_t log_id_1 = leader_1.palf_handle_impl_->sw_.get_max_log_id();
556
  //   LSN max_lsn_1 = leader_1.palf_handle_impl_->sw_.get_max_lsn();
557
  //   const int64_t log_id_2 = leader_2.palf_handle_impl_->sw_.get_max_log_id();
558
  //   LSN max_lsn_2 = leader_2.palf_handle_impl_->sw_.get_max_lsn();
559
  //   const int64_t log_id_3 = leader_3.palf_handle_impl_->sw_.get_max_log_id();
560
  //   LSN max_lsn_3 = leader_3.palf_handle_impl_->sw_.get_max_lsn();
561
  //   wait_lsn_until_flushed(max_lsn_1, leader_1);
562
  //   wait_lsn_until_flushed(max_lsn_2, leader_2);
563
  //   wait_lsn_until_flushed(max_lsn_3, leader_3);
564
  //   EXPECT_EQ(log_io_worker->batch_io_task_mgr_.has_batched_size_ - prev_has_batched_size, 0);
565
  // }
566
  // 验证切文件场景
567
  int64_t id_3 = ATOMIC_AAF(&palf_id_, 1);
568
  int64_t leader_idx_3 = 0;
569
  int64_t prev_log_id_3 = 0;
570
  PalfHandleImplGuard leader_3;
571
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_3, leader_idx_3, leader_3));
572
	IOTaskCond io_task_cond_3(id_3, leader_3.get_palf_handle_impl()->log_engine_.palf_epoch_);
573
  IOTaskVerify io_task_verify_3(id_3, leader_3.get_palf_handle_impl()->log_engine_.palf_epoch_);
574
  {
575
    LogIOWorker *log_io_worker = leader_3.palf_handle_impl_->log_engine_.log_io_worker_;
576
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3));
577
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110));
578
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));
579
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_3, 1, id_3, 110));
580
    sleep(1);
581
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));
582
    sleep(1);
583
    io_task_cond_3.cond_.signal();
584
    const int64_t log_id_1 = leader_1.palf_handle_impl_->sw_.get_max_log_id();
585
    LSN max_lsn_1 = leader_1.palf_handle_impl_->sw_.get_max_lsn();
586
    const int64_t log_id_2 = leader_2.palf_handle_impl_->sw_.get_max_log_id();
587
    LSN max_lsn_2 = leader_2.palf_handle_impl_->sw_.get_max_lsn();
588
    const int64_t log_id_3 = leader_3.palf_handle_impl_->sw_.get_max_log_id();
589
    LSN max_lsn_3 = leader_3.palf_handle_impl_->sw_.get_max_lsn();
590
    wait_lsn_until_flushed(max_lsn_1, leader_1);
591
    wait_lsn_until_flushed(max_lsn_2, leader_2);
592
    wait_lsn_until_flushed(max_lsn_3, leader_3);
593
//  EXPECT_EQ(log_io_worker->batch_io_task_mgr_.has_batched_size_ - prev_has_batched_size, 2);
594
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 31, leader_idx_1, MAX_LOG_BODY_SIZE));
595
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 2, leader_idx_1, 900 *1024));
596
    max_lsn_1 = leader_1.palf_handle_impl_->get_max_lsn();
597
    wait_lsn_until_flushed(max_lsn_1, leader_1);
598

599
    PALF_LOG(INFO, "current log_tail", K(leader_1.palf_handle_impl_->get_max_lsn()));
600
    EXPECT_EQ(0, leader_1.palf_handle_impl_->log_engine_.log_storage_.block_mgr_.min_block_id_);
601

602
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1024, leader_idx_1, 300));
603
    max_lsn_1 = leader_1.palf_handle_impl_->get_max_lsn();
604
    wait_lsn_until_flushed(max_lsn_1, leader_1);
605
    EXPECT_EQ(2, leader_1.palf_handle_impl_->log_engine_.log_storage_.block_mgr_.max_block_id_);
606

607
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1024, leader_idx_1, 300));
608
    max_lsn_1 = leader_1.palf_handle_impl_->get_max_lsn();
609
    wait_lsn_until_flushed(max_lsn_1, leader_1);
610
    EXPECT_EQ(OB_ITER_END, read_log(leader_1));
611
  }
612

613
  // 测试epoch change
614
  PALF_LOG(INFO, "begin test epoch change");
615
  int64_t id_4 = ATOMIC_AAF(&palf_id_, 1);
616
  int64_t leader_idx_4 = 0;
617
  int64_t prev_log_id_4 = 0;
618
  PalfHandleImplGuard leader_4;
619
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_4, leader_idx_4, leader_4));
620
	IOTaskCond io_task_cond_4(id_4, leader_4.get_palf_handle_impl()->log_engine_.palf_epoch_);
621
	IOTaskVerify io_task_verify_4(id_4, leader_4.get_palf_handle_impl()->log_engine_.palf_epoch_);
622
  {
623
    LogIOWorker *log_io_worker = leader_4.palf_handle_impl_->log_engine_.log_io_worker_;
624
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_4));
625
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_4, 10, id_4, 110));
626
    sleep(1);
627
    LSN max_lsn = leader_4.palf_handle_impl_->sw_.get_max_lsn();
628
    io_task_cond_4.cond_.signal();
629
    PALF_LOG(INFO, "after signal");
630
    // signal之后需要sleep一会等前面的日志都提交给io_worker,
631
    // 否则在反馈模式下, 这批日志可能会延迟submit, 排在下一个cond task后面
632
    sleep(1);
633
    wait_lsn_until_flushed(max_lsn, leader_4);
634
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_4));
635
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_4, 10, id_4, 110));
636
    sleep(1);
637
    leader_4.palf_handle_impl_->log_engine_.palf_epoch_++;
638
    io_task_cond_4.cond_.signal();
639
    LSN log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
640
    PALF_LOG(INFO, "after signal", K(max_lsn), K(log_tail));
641
    sleep(1);
642
    log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
643
    PALF_LOG(INFO, "after flused case 4", K(max_lsn), K(log_tail));
644
    EXPECT_EQ(max_lsn, log_tail);
645
  }
646

647
  // 测试truncate
648
  PALF_LOG(INFO, "begin test truncate");
649
  int64_t id_5 = ATOMIC_AAF(&palf_id_, 1);
650
  int64_t leader_idx_5 = 0;
651
  int64_t prev_log_id_5 = 0;
652
  PalfHandleImplGuard leader_5;
653
  IOTaskCond io_task_cond_5(id_5, log_engine->palf_epoch_);
654
  IOTaskVerify io_task_verify_5(id_5, log_engine->palf_epoch_);
655
  TruncateLogCbCtx ctx(LSN(0));
656
  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_5, leader_idx_5, leader_5));
657
  {
658
    LogIOWorker *log_io_worker = leader_5.palf_handle_impl_->log_engine_.log_io_worker_;
659
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_5));
660
    EXPECT_EQ(OB_SUCCESS, submit_log(leader_5, 10, id_5, 110));
661
    LSN max_lsn = leader_5.palf_handle_impl_->sw_.get_max_lsn();
662
    sleep(2);
663
    // 在提交truncate log task之前需先等待之前的日志提交写盘
664
    io_task_cond_5.cond_.signal();
665
    wait_lsn_until_flushed(max_lsn, leader_5);
666
    EXPECT_EQ(OB_SUCCESS, leader_5.palf_handle_impl_->log_engine_.submit_truncate_log_task(ctx));
667
    sleep(1);
668
    EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_5));
669
    sleep(1);
670
    io_task_cond_5.cond_.signal();
671
    // wait_lsn_until_flushed(max_lsn, leader_5);
672
    EXPECT_EQ(0, leader_5.palf_handle_impl_->log_engine_.log_storage_.log_tail_);
673
  }
674

675
  PALF_LOG(INFO, "begin test sw full case");
676
   // 测试滑动窗口满场景
677
   // 聚合的两条日志分别在头尾部
678
   int64_t id_6 = ATOMIC_AAF(&palf_id_, 1);
679
   int64_t leader_idx_6 = 0;
680
   int64_t prev_log_id_6 = 0;
681
   PalfHandleImplGuard leader_6;
682
   IOTaskCond io_task_cond_6(id_6, log_engine->palf_epoch_);
683
   IOTaskVerify io_task_verify_6(id_6, log_engine->palf_epoch_);
684
   EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_6, leader_idx_6, leader_6));
685
   {
686
      LogIOWorker *log_io_worker = leader_6.palf_handle_impl_->log_engine_.log_io_worker_;
687
     {
688
       EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 15, id_6, MAX_LOG_BODY_SIZE));
689
       sleep(2);
690
       LSN max_lsn = leader_6.palf_handle_impl_->sw_.get_max_lsn();
691
       wait_lsn_until_flushed(max_lsn, leader_6);
692
       EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_6));
693
       EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 1, id_6, 10*1024));
694
       sleep(1);
695
       LSN max_lsn1 = leader_6.palf_handle_impl_->sw_.get_max_lsn();
696
       int64_t remain_size = LEADER_DEFAULT_GROUP_BUFFER_SIZE - max_lsn1.val_ - LogGroupEntryHeader::HEADER_SER_SIZE - LogEntryHeader::HEADER_SER_SIZE;
697
       EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 1, id_6, remain_size));
698
       sleep(1);
699
       LSN max_lsn2 = leader_6.palf_handle_impl_->sw_.get_max_lsn();
700
       PALF_LOG_RET(ERROR, OB_SUCCESS, "runlin trace", K(max_lsn2), K(max_lsn1), K(remain_size), K(max_lsn));
701
       EXPECT_EQ(max_lsn2, LSN(LEADER_DEFAULT_GROUP_BUFFER_SIZE));
702
       io_task_cond_6.cond_.signal();
703
       wait_lsn_until_flushed(max_lsn2, leader_6);
704
     }
705
     EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 3, id_6, MAX_LOG_BODY_SIZE));
706
     sleep(2);
707
     LSN max_lsn = leader_6.palf_handle_impl_->sw_.get_max_lsn();
708
     wait_lsn_until_flushed(max_lsn, leader_6);
709
     EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_6));
710
     EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 1, id_6, 10*1024));
711
     sleep(1);
712
     LSN max_lsn1 = leader_6.palf_handle_impl_->sw_.get_max_lsn();
713
     int64_t remain_size = FOLLOWER_DEFAULT_GROUP_BUFFER_SIZE - max_lsn1.val_ - LogGroupEntryHeader::HEADER_SER_SIZE - LogEntryHeader::HEADER_SER_SIZE;
714
     EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 1, id_6, remain_size));
715
     sleep(1);
716
     LSN max_lsn2 = leader_6.palf_handle_impl_->sw_.get_max_lsn();
717
     PALF_LOG_RET(ERROR, OB_SUCCESS, "runlin trace", K(max_lsn2), K(max_lsn1), K(remain_size), K(max_lsn));
718
     EXPECT_EQ(max_lsn2, LSN(FOLLOWER_DEFAULT_GROUP_BUFFER_SIZE));
719
     EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 1, id_6, 100));
720
     sleep(1);
721
     LSN max_lsn3 = leader_6.palf_handle_impl_->sw_.get_max_lsn();
722
     io_task_cond_6.cond_.signal();
723
     //EXPECT_EQ(max_lsn, leader_6.palf_handle_.palf_handle_impl_->log_engine_.log_storage_.log_tail_);
724
     wait_lsn_until_flushed(max_lsn3, leader_6);
725
     LSN log_tail = leader_6.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
726
     EXPECT_EQ(max_lsn3, log_tail);
727
   }
728

729
  PALF_LOG(INFO, "end io_reducer_basic_func");
730
}
731

732
//TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_performance)
733
//{
734
//  SET_CASE_LOG_FILE(TEST_NAME, "io_reducer_performance");
735
//
736
//  OB_LOGGER.set_log_level("ERROR");
737
//  int64_t id = ATOMIC_AAF(&palf_id_, 1);
738
//  int64_t leader_idx = 0;
739
//  PalfHandleImplGuard leader;
740
//  EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
741
//  leader.palf_env_impl_->log_io_worker_.batch_io_task_mgr_.has_batched_size_ = 0;
742
//  leader.palf_env_impl_->log_io_worker_.batch_io_task_mgr_.handle_count_ = 0;
743
//  int64_t start_ts = ObTimeUtility::current_time();
744
//  EXPECT_EQ(OB_SUCCESS, submit_log(leader, 40 * 10000, leader_idx, 100));
745
//  const LSN max_lsn = leader.palf_handle_impl_->get_max_lsn();
746
//  wait_lsn_until_flushed(max_lsn, leader);
747
//  const int64_t has_batched_size = leader.palf_env_impl_->log_io_worker_.batch_io_task_mgr_.has_batched_size_;
748
//  const int64_t handle_count = leader.palf_env_impl_->log_io_worker_.batch_io_task_mgr_.handle_count_;
749
//  const int64_t log_id = leader.palf_handle_impl_->sw_.get_max_log_id();
750
//  int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
751
//  PALF_LOG(ERROR, "runlin trace performance", K(cost_ts), K(log_id), K(max_lsn), K(has_batched_size), K(handle_count));
752
//}
753
} // namespace unittest
754
} // namespace oceanbase
755

756
int main(int argc, char **argv) { RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME); }
757

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.