oceanbase
756 строк · 35.7 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#include "lib/ob_define.h"14#include "lib/ob_errno.h"15#include "lib/oblog/ob_log.h"16#include "lib/time/ob_time_utility.h"17#include "logservice/palf/log_group_entry.h"18#include <cstdio>19#include <gtest/gtest.h>20#include <signal.h>21#include <stdexcept>22#define private public23#include "env/ob_simple_log_cluster_env.h"24#include "logservice/palf/log_reader_utils.h"25#include "logservice/palf/log_define.h"26#include "logservice/palf/log_group_entry_header.h"27#include "logservice/palf/log_io_worker.h"28#include "logservice/palf/log_shared_queue_thread.h"29#include "logservice/palf/lsn.h"30#include "share/scn.h"31#include "logservice/palf/log_io_task.h"32#include "logservice/palf/log_writer_utils.h"33#include "logservice/palf_handle_guard.h"34#undef private35
36const std::string TEST_NAME = "log_engine";37
38using namespace oceanbase::common;39using namespace oceanbase;40namespace oceanbase41{
42using namespace logservice;43using namespace palf;44namespace unittest45{
46class TestObSimpleLogClusterLogEngine : public ObSimpleLogClusterTestEnv47{
48public:49TestObSimpleLogClusterLogEngine() : ObSimpleLogClusterTestEnv()50{51palf_epoch_ = 0;52}53~TestObSimpleLogClusterLogEngine() { destroy(); }54int init()55{56int ret = OB_SUCCESS;57int64_t leader_idx = 0;58id_ = ATOMIC_AAF(&palf_id_, 1);59if (OB_FAIL(create_paxos_group(id_, leader_idx, leader_))) {60PALF_LOG(ERROR, "create_paxos_group failed", K(ret));61} else {62log_engine_ = &leader_.palf_handle_impl_->log_engine_;63}64return ret;65}66int reload(const LSN &log_tail_redo, const LSN &log_tail_meta, const LSN &base_lsn)67{68int ret = OB_SUCCESS;69palf_epoch_ = ATOMIC_AAF(&palf_epoch_, 1);70LogGroupEntryHeader entry_header;71bool is_integrity = true;72ObILogAllocator *alloc_mgr = log_engine_->alloc_mgr_;73LogRpc *log_rpc = log_engine_->log_net_service_.log_rpc_;74LogIOWorker *log_io_worker = log_engine_->log_io_worker_;75LogSharedQueueTh *log_shared_queue_th = log_engine_->log_shared_queue_th_;76LogPlugins *plugins = log_engine_->plugins_;77LogEngine log_engine;78ILogBlockPool *log_block_pool = log_engine_->log_storage_.block_mgr_.log_block_pool_;79if (OB_FAIL(log_engine.load(leader_.palf_handle_impl_->palf_id_,80leader_.palf_handle_impl_->log_dir_,81alloc_mgr,82log_block_pool,83&(leader_.palf_handle_impl_->hot_cache_),84log_rpc,85log_io_worker,86log_shared_queue_th,87plugins,88entry_header,89palf_epoch_,90is_integrity,91PALF_BLOCK_SIZE,92PALF_META_BLOCK_SIZE))) {93PALF_LOG(WARN, "load failed", K(ret));94} else if (log_tail_redo != log_engine.log_storage_.log_tail_95|| log_tail_meta != log_engine.log_meta_storage_.log_tail_96|| base_lsn != log_engine.log_meta_.log_snapshot_meta_.base_lsn_) {97ret = OB_ERR_UNEXPECTED;98PALF_LOG(ERROR, "reload failed", K(ret), K(log_engine), KPC(log_engine_), K(log_tail_redo), K(log_tail_meta), K(base_lsn));99} else {100PALF_LOG(INFO, "reload success", K(log_engine), KPC(log_engine_));101}102return ret;103}104
105int delete_block_by_human(const block_id_t block_id)106{107int ret = OB_SUCCESS;108char file_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};109const char *log_dir = log_engine_->log_storage_.block_mgr_.log_dir_;110if (OB_FAIL(convert_to_normal_block(log_dir, block_id, file_path, OB_MAX_FILE_NAME_LENGTH))) {111PALF_LOG(WARN, "convert_to_normal_block failed", K(ret), K(log_dir), K(block_id));112} else if (0 != unlink(file_path)){113ret = convert_sys_errno();114PALF_LOG(WARN, "unlink failed", K(ret), K(block_id), K(file_path));115}116return ret;117}118int write_several_blocks(const block_id_t base_block_id, const int block_count)119{120int64_t long_buf_len = 16383 * 128;121LogWriteBuf write_buf;122char *long_buf = reinterpret_cast<char *>(ob_malloc(long_buf_len, "test_log_engine"));123LogGroupEntryHeader header;124int64_t log_checksum;125const block_id_t donot_delete_block_before_this = 3;126write_buf.reset();127memset(long_buf, 0, long_buf_len);128EXPECT_EQ(OB_SUCCESS, write_buf.push_back(long_buf, long_buf_len));129// EXPECT_EQ(32, write_buf.write_buf_.count());130EXPECT_EQ(OB_SUCCESS,131header.generate(false,132true,133write_buf,134long_buf_len - sizeof(LogGroupEntryHeader),135share::SCN::base_scn(),1361,137LSN(donot_delete_block_before_this * PALF_BLOCK_SIZE),1381,139log_checksum));140header.update_header_checksum();141int64_t pos = 0;142EXPECT_EQ(OB_SUCCESS, header.serialize(long_buf, long_buf_len, pos));143int ret = OB_SUCCESS;144LogStorage &log_storage = leader_.palf_handle_impl_->log_engine_.log_storage_;145block_id_t min_block_id = LOG_INVALID_BLOCK_ID, max_block_id = LOG_INVALID_BLOCK_ID;146if (block_count == 0) {147ret = OB_INVALID_ARGUMENT;148return ret;149}150bool need_submit_log = true;151if (OB_FAIL(log_storage.get_block_id_range(min_block_id, max_block_id)) && OB_ENTRY_NOT_EXIST != ret) {152PALF_LOG(ERROR, "get_block_id_range failed", K(ret));153} else if (OB_ENTRY_NOT_EXIST == ret) {154min_block_id = base_block_id;155max_block_id = base_block_id;156ret = OB_SUCCESS;157}158block_id_t end_block_id = max_block_id + block_count;159PALF_LOG(INFO, "runlin trace before", K(end_block_id), K(max_block_id));160do {161if (max_block_id < end_block_id) {162need_submit_log = true;163ret = OB_SUCCESS;164} else {165need_submit_log = false;166}167share::SCN tmp_scn;168tmp_scn.convert_for_logservice(max_block_id);169if (true == need_submit_log && OB_FAIL(log_storage.writev(log_storage.log_tail_, write_buf, tmp_scn))) {170PALF_LOG(ERROR, "submit_log failed", K(ret));171} else {172}173if (OB_FAIL(log_storage.get_block_id_range(min_block_id, max_block_id))) {174PALF_LOG(ERROR, "get_block_id_range failed", K(ret));175}176} while (OB_SUCC(ret) && true == need_submit_log);177PALF_LOG(INFO, "runlin trace after", K(end_block_id), K(max_block_id));178return ret;179}180void destroy() {}181int64_t id_;182int64_t palf_epoch_;183LogEngine *log_engine_;184PalfHandleImplGuard leader_;185};186
187int64_t ObSimpleLogClusterTestBase::member_cnt_ = 1;188int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1;189std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;190bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false;191
192// 验证flashback过程中宕机重启
193TEST_F(TestObSimpleLogClusterLogEngine, flashback_restart)194{
195SET_CASE_LOG_FILE(TEST_NAME, "flashback_restart");196OB_LOGGER.set_log_level("TRACE");197PALF_LOG(INFO, "begin flashback_restart");198PalfHandleImplGuard leader;199int64_t id_1 = ATOMIC_AAF(&palf_id_, 1);200int64_t leader_idx_1 = 0;201PalfEnv *palf_env = NULL;202EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_1, leader_idx_1, leader));203EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx_1, palf_env));204EXPECT_EQ(OB_SUCCESS, submit_log(leader, 66, leader_idx_1, MAX_LOG_BODY_SIZE));205EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));206SCN scn;207LogStorage *log_storage = &leader.get_palf_handle_impl()->log_engine_.log_storage_;208LSN log_tail = log_storage->log_tail_;209scn = leader.get_palf_handle_impl()->get_end_scn();210EXPECT_EQ(OB_SUCCESS, submit_log(leader, 33, leader_idx_1, MAX_LOG_BODY_SIZE));211EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));212int64_t mode_version;213AccessMode mode;214EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->get_access_mode(mode_version, mode));215LSN flashback_lsn(PALF_BLOCK_SIZE*lsn_2_block(log_tail, PALF_BLOCK_SIZE));216EXPECT_EQ(OB_SUCCESS, log_storage->begin_flashback(flashback_lsn));217leader.reset();218EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());219
220{221PalfHandleImplGuard leader1;222EXPECT_EQ(OB_SUCCESS, get_leader(id_1, leader1, leader_idx_1));223LogStorage *log_storage = &leader1.get_palf_handle_impl()->log_engine_.log_storage_;224EXPECT_LE(2, log_storage->block_mgr_.max_block_id_);225EXPECT_EQ(OB_SUCCESS, log_storage->block_mgr_.create_tmp_block_handler(2));226EXPECT_EQ(OB_SUCCESS, log_storage->update_manifest_(3));227EXPECT_EQ(OB_SUCCESS, log_storage->block_mgr_.delete_block_from_back_to_front_until(2));228{229LogBlockMgr *block_mgr = &log_storage->block_mgr_;230int block_id = 2;231int ret = OB_SUCCESS;232// 1. rename "block_id.tmp" to "block_id.flashback"233// 2. delete "block_id", make sure each block has returned into BlockPool234// 3. rename "block_id.flashback" to "block_id"235// NB: for restart, the block which named 'block_id.flashback' must be renamed to 'block_id'236char tmp_block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};237char block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};238char flashback_block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};239if (block_id != block_mgr->curr_writable_block_id_) {240ret = OB_ERR_UNEXPECTED;241PALF_LOG(ERROR, "block_id is not same as curr_writable_handler_, unexpected error",242K(ret), K(block_id), KPC(block_mgr));243} else if (OB_FAIL(block_id_to_string(block_id, block_path, OB_MAX_FILE_NAME_LENGTH))) {244PALF_LOG(ERROR, "block_id_to_string failed", K(ret), K(block_id));245} else if (OB_FAIL(block_id_to_tmp_string(block_id, tmp_block_path, OB_MAX_FILE_NAME_LENGTH))) {246PALF_LOG(ERROR, "block_id_to_tmp_string failed", K(ret), K(block_id));247} else if (OB_FAIL(block_id_to_flashback_string(block_id, flashback_block_path, OB_MAX_FILE_NAME_LENGTH))) {248PALF_LOG(ERROR, "block_id_to_flashback_string failed", K(ret), K(block_id));249} else if (OB_FAIL(block_mgr->do_rename_and_fsync_(tmp_block_path, flashback_block_path))) {250PALF_LOG(ERROR, "do_rename_and_fsync_ failed", K(ret), KPC(block_mgr));251} else {252PALF_LOG(INFO, "rename_tmp_block_handler_to_normal success", K(ret), KPC(block_mgr));253}254}255}256EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());257EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());258}
259
260TEST_F(TestObSimpleLogClusterLogEngine, exception_path)261{
262SET_CASE_LOG_FILE(TEST_NAME, "exception_path");263EXPECT_EQ(OB_SUCCESS, init());264OB_LOGGER.set_log_level("TRACE");265// TODO: to be reopened by runlin.266ObTenantMutilAllocator *allocator =267dynamic_cast<ObTenantMutilAllocator *>(log_engine_->alloc_mgr_);268OB_ASSERT(NULL != allocator);269allocator->set_limit(32);270FlushLogCbCtx flush_ctx;271LogWriteBuf write_buf;272const char *buf = "hello";273EXPECT_FALSE(flush_ctx.is_valid());274EXPECT_FALSE(write_buf.is_valid());275EXPECT_EQ(OB_INVALID_ARGUMENT, log_engine_->submit_flush_log_task(flush_ctx, write_buf));276flush_ctx.lsn_ = LSN(1);277flush_ctx.scn_ = share::SCN::base_scn();278EXPECT_EQ(OB_INVALID_ARGUMENT, write_buf.push_back(NULL, strlen(buf)));279EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf, strlen(buf)));280EXPECT_EQ(OB_ALLOCATE_MEMORY_FAILED, log_engine_->submit_flush_log_task(flush_ctx, write_buf));281write_buf.reset();282const int64_t long_buf_len = MAX_LOG_BODY_SIZE;283char *long_buf = reinterpret_cast<char *>(ob_malloc(long_buf_len, "test_log_engine"));284LogGroupEntryHeader header;285int64_t log_checksum;286const block_id_t donot_delete_block_before_this = 3;287write_buf.reset();288memset(long_buf, 0, long_buf_len);289
290// Test LogStorage291LogStorage *log_storage = &log_engine_->log_storage_;292LogStorage *meta_storage = &log_engine_->log_meta_storage_;293block_id_t min_block_id, max_block_id;294share::SCN tmp_scn;295EXPECT_EQ(OB_INVALID_ARGUMENT,296log_engine_->append_log(LSN(LOG_INVALID_LSN_VAL), write_buf, tmp_scn));297EXPECT_EQ(OB_INVALID_ARGUMENT, log_storage->writev(LSN(LOG_INVALID_LSN_VAL), write_buf, tmp_scn));298EXPECT_EQ(OB_ENTRY_NOT_EXIST, log_engine_->get_block_id_range(min_block_id, max_block_id));299EXPECT_EQ(LSN(0), log_engine_->get_begin_lsn());300EXPECT_EQ(OB_ENTRY_NOT_EXIST, log_storage->get_block_id_range(min_block_id, max_block_id));301EXPECT_EQ(LSN(0), log_storage->get_begin_lsn());302EXPECT_EQ(OB_SUCCESS, log_storage->truncate_prefix_blocks(LSN(0)));303EXPECT_EQ(true, log_storage->need_append_block_header_);304EXPECT_EQ(true, log_storage->need_switch_block_());305EXPECT_EQ(OB_INVALID_ARGUMENT, log_storage->truncate(LSN(100000000)));306// no block id 1307EXPECT_EQ(OB_ERR_UNEXPECTED, log_storage->delete_block(1));308EXPECT_EQ(OB_INVALID_ARGUMENT, meta_storage->append_meta(buf, 10000000));309
310int64_t log_id = 1;311share::SCN scn = share::SCN::base_scn();312LSN truncate_lsn;313allocator->set_limit(1*1024*1024*1024);314
315EXPECT_EQ(OB_SUCCESS, write_several_blocks(0, 11));316PALF_LOG(INFO, "after write_several_blocks 11");317
318EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));319EXPECT_EQ(0, min_block_id);320EXPECT_EQ(11, max_block_id);321
322// 测试truncate场景323block_id_t truncate_block_id = max_block_id - 2;324EXPECT_EQ(OB_SUCCESS, log_storage->truncate(LSN(truncate_block_id * PALF_BLOCK_SIZE)));325EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));326// 此时最后一个block是空的327EXPECT_EQ(log_storage->log_tail_, LSN(truncate_block_id * PALF_BLOCK_SIZE));328EXPECT_EQ(truncate_block_id, max_block_id);329EXPECT_EQ(lsn_2_block(log_engine_->log_meta_storage_.log_block_header_.min_lsn_, PALF_BLOCK_SIZE), truncate_block_id + 1);330
331LogSnapshotMeta snapshot_meta;332EXPECT_EQ(OB_SUCCESS, snapshot_meta.generate(LSN(1 * PALF_BLOCK_SIZE)));333EXPECT_EQ(OB_SUCCESS, log_engine_->log_meta_.update_log_snapshot_meta(snapshot_meta));334EXPECT_EQ(OB_SUCCESS, log_engine_->append_log_meta_(log_engine_->log_meta_));335EXPECT_EQ(OB_SUCCESS, log_storage->delete_block(0));336EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));337EXPECT_EQ(1, min_block_id);338EXPECT_EQ(LSN(max_block_id * PALF_BLOCK_SIZE), log_storage->log_tail_);339
340log_storage = log_engine_->get_log_storage();341LogBlockHeader block_header;342share::SCN scn_0;343share::SCN scn_11;344EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, log_storage->get_block_min_scn(0, scn_0));345EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, log_storage->read_block_header_(0, block_header));346EXPECT_EQ(OB_ERR_OUT_OF_UPPER_BOUND,347log_storage->get_block_min_scn(truncate_block_id, scn_11));348LSN log_tail = log_engine_->log_storage_.log_tail_;349share::SCN ts_origin = scn_11;350PALF_LOG(INFO, "after second write_several_blocks 1", K(truncate_block_id), K(max_block_id));351// 由于truncate之后,最后一个文件是空的,因此max_block_id = truncate_block_id352EXPECT_EQ(OB_SUCCESS, write_several_blocks(0, 1));353EXPECT_EQ(OB_SUCCESS, log_storage->get_block_min_scn(truncate_block_id, scn_11));354EXPECT_NE(scn_11, ts_origin);355
356// 测试重启场景357EXPECT_EQ(OB_SUCCESS, reload(log_engine_->log_storage_.log_tail_, log_engine_->log_meta_storage_.log_tail_, log_engine_->log_meta_.log_snapshot_meta_.base_lsn_));358PALF_LOG(INFO, "after reload1");359
360//测试truncate_prefix 场景361block_id_t truncate_prefix_block_id = 4;362LogInfo prev_log_info;363prev_log_info.lsn_ = LSN(truncate_prefix_block_id*PALF_BLOCK_SIZE);364prev_log_info.log_id_ = 0;365prev_log_info.log_proposal_id_ = 0;366prev_log_info.scn_ = share::SCN::min_scn();367prev_log_info.accum_checksum_ = 0;368EXPECT_EQ(OB_SUCCESS, snapshot_meta.generate(prev_log_info.lsn_, prev_log_info));369EXPECT_EQ(OB_SUCCESS, log_engine_->log_meta_.update_log_snapshot_meta(snapshot_meta));370EXPECT_EQ(OB_SUCCESS, log_engine_->append_log_meta_(log_engine_->log_meta_));371EXPECT_EQ(OB_SUCCESS,372log_storage->truncate_prefix_blocks(LSN(truncate_prefix_block_id * PALF_BLOCK_SIZE)));373// 测试truncate_prefix后,继续写一个block374write_several_blocks(0, 1);375EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));376EXPECT_EQ(truncate_prefix_block_id, min_block_id);377EXPECT_EQ(truncate_block_id+2, max_block_id);378
379// 测试目录清空场景,此时log_tail应该为truncate_prefix_block_id380// 目录清空之后,会重置log_tail381truncate_prefix_block_id = max_block_id + 2;382prev_log_info.lsn_ = LSN(truncate_prefix_block_id*PALF_BLOCK_SIZE);383prev_log_info.log_id_ = 0;384prev_log_info.log_proposal_id_ = 0;385prev_log_info.scn_ =SCN::min_scn();386prev_log_info.accum_checksum_ = 0;387EXPECT_EQ(OB_SUCCESS, snapshot_meta.generate(prev_log_info.lsn_, prev_log_info));388EXPECT_EQ(OB_SUCCESS, log_engine_->log_meta_.update_log_snapshot_meta(snapshot_meta));389EXPECT_EQ(OB_SUCCESS, log_engine_->append_log_meta_(log_engine_->log_meta_));390const LSN old_log_tail = log_engine_->log_storage_.log_tail_;391EXPECT_EQ(OB_SUCCESS, log_engine_->truncate_prefix_blocks(prev_log_info.lsn_));392EXPECT_EQ(OB_ENTRY_NOT_EXIST, log_storage->get_block_id_range(min_block_id, max_block_id));393// truncate_prefix_block_id 和 prev_lsn对应的block_id一样394EXPECT_EQ(log_storage->log_tail_, LSN(truncate_prefix_block_id * PALF_BLOCK_SIZE));395
396// 测试目录清空后,读数据是否正常报错397ReadBufGuard buf_guard("dummy", 100);398int64_t out_read_size;399EXPECT_EQ(OB_ERR_OUT_OF_UPPER_BOUND,400log_storage->pread(LSN((truncate_prefix_block_id + 1) * PALF_BLOCK_SIZE),401100,402buf_guard.read_buf_,403out_read_size));404EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND,405log_storage->pread(LSN((truncate_prefix_block_id - 1) * PALF_BLOCK_SIZE),406100,407buf_guard.read_buf_,408out_read_size));409// 测试目录清空后,重启是否正常410EXPECT_EQ(OB_SUCCESS, reload(log_engine_->log_storage_.log_tail_, log_engine_->log_meta_storage_.log_tail_, log_engine_->log_meta_.log_snapshot_meta_.base_lsn_));411
412PALF_LOG(INFO, "directory is empty");413// 测试目录清空后,写数据是否正常414// 此时log_tail为truncate_prefix_block_id的头部415const block_id_t expected_min_block_id = lsn_2_block(log_storage->log_tail_, log_storage->logical_block_size_);416EXPECT_EQ(OB_SUCCESS, write_several_blocks(expected_min_block_id, 3));417EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));418EXPECT_EQ(expected_min_block_id, min_block_id);419EXPECT_EQ(expected_min_block_id+3, max_block_id);420share::SCN scn_cur;421EXPECT_EQ(OB_SUCCESS, log_engine_->get_block_min_scn(max_block_id, scn_cur));422
423// 测试人为删除文件的重启场景424EXPECT_EQ(OB_SUCCESS, log_engine_->get_block_id_range(min_block_id, max_block_id));425EXPECT_EQ(OB_SUCCESS, delete_block_by_human(max_block_id));426EXPECT_EQ(OB_ERR_UNEXPECTED, reload(log_engine_->log_storage_.log_tail_, log_engine_->log_meta_storage_.log_tail_, log_engine_->log_meta_.log_snapshot_meta_.base_lsn_));427EXPECT_EQ(OB_SUCCESS, delete_block_by_human(min_block_id));428EXPECT_EQ(OB_ERR_UNEXPECTED, reload(log_engine_->log_storage_.log_tail_, log_engine_->log_meta_storage_.log_tail_, log_engine_->log_meta_.log_snapshot_meta_.base_lsn_));429
430if (OB_NOT_NULL(long_buf)) {431ob_free(long_buf);432}433leader_.reset();434PALF_LOG(INFO, "end exception_path");435}
436
437
438TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)439{
440SET_CASE_LOG_FILE(TEST_NAME, "io_reducer_func");441update_server_log_disk(4*1024*1024*1024ul);442update_disk_options(4*1024*1024*1024ul/palf::PALF_PHY_BLOCK_SIZE);443OB_LOGGER.set_log_level("TRACE");444PALF_LOG(INFO, "begin io_reducer_basic_func");445PalfHandleImplGuard leader_1;446int64_t id_1 = ATOMIC_AAF(&palf_id_, 1);447int64_t leader_idx_1 = 0;448PalfEnv *palf_env = NULL;449EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_1, leader_idx_1, leader_1));450EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx_1, palf_env));451
452LogIOWorker *log_io_worker = leader_1.palf_handle_impl_->log_engine_.log_io_worker_;453
454int64_t prev_log_id_1 = 0;455int64_t prev_has_batched_size = 0;456LogEngine *log_engine = &leader_1.palf_handle_impl_->log_engine_;457IOTaskCond io_task_cond_1(id_1, log_engine->palf_epoch_);458IOTaskVerify io_task_verify_1(id_1, log_engine->palf_epoch_);459// 单日志流场景460// 卡住log_io_worker的处理461{462EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_1));463EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1024, id_1, 110));464const int64_t log_id = leader_1.palf_handle_impl_->sw_.get_max_log_id();465LSN max_lsn = leader_1.palf_handle_impl_->sw_.get_max_lsn();466io_task_cond_1.cond_.signal();467wait_lsn_until_flushed(max_lsn, leader_1);468EXPECT_EQ(OB_ITER_END, read_log(leader_1));469// sw内部做了自适应freeze之后这个等式可能不成立, 因为上层可能基于写盘反馈触发提交下一个io_task470// EXPECT_EQ(log_id, log_io_worker->batch_io_task_mgr_.has_batched_size_);
471prev_log_id_1 = log_id;472prev_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;473}474// 单日志流场景475// 当聚合度为1的时候,应该走正常的提交流程,目前暂未实现,先通过has_batched_size不计算绕过476{477// 聚合度为1的忽略478EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_1));479EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110));480sleep(1);481EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_verify_1));482EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_verify_1));483EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1024, id_1, 110));484const int64_t log_id = leader_1.palf_handle_impl_->sw_.get_max_log_id();485LSN max_lsn = leader_1.palf_handle_impl_->sw_.get_max_lsn();486io_task_cond_1.cond_.signal();487wait_lsn_until_flushed(max_lsn, leader_1);488// EXPECT_EQ(log_id - 1, log_io_worker->batch_io_task_mgr_.has_batched_size_);
489EXPECT_EQ(2, io_task_verify_1.count_);490// EXPECT_EQ(log_io_worker->batch_io_task_mgr_.has_batched_size_ - prev_has_batched_size,
491// log_id - prev_log_id_1 - 1);
492prev_log_id_1 = log_id;493prev_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;494}495
496// 多日志流场景497int64_t id_2 = ATOMIC_AAF(&palf_id_, 1);498int64_t prev_log_id_2 = 0;499int64_t leader_idx_2 = 0;500PalfHandleImplGuard leader_2;501EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_2, leader_idx_2, leader_2));502IOTaskCond io_task_cond_2(id_2, leader_2.get_palf_handle_impl()->log_engine_.palf_epoch_);503IOTaskVerify io_task_verify_2(id_2, leader_2.get_palf_handle_impl()->log_engine_.palf_epoch_);504{505LogIOWorker *log_io_worker = leader_2.palf_handle_impl_->log_engine_.log_io_worker_;506// 聚合度为1的忽略507EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_2));508EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110));509EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));510sleep(1);511EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_verify_2));512EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_verify_1));513
514EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1024, id_1, 110));515EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1024, id_2, 110));516
517const int64_t log_id_1 = leader_1.palf_handle_impl_->sw_.get_max_log_id();518LSN max_lsn_1 = leader_1.palf_handle_impl_->sw_.get_max_lsn();519const int64_t log_id_2 = leader_2.palf_handle_impl_->sw_.get_max_log_id();520LSN max_lsn_2 = leader_2.palf_handle_impl_->sw_.get_max_lsn();521sleep(1);522io_task_cond_2.cond_.signal();523wait_lsn_until_flushed(max_lsn_1, leader_1);524wait_lsn_until_flushed(max_lsn_2, leader_2);525EXPECT_EQ(3, io_task_verify_1.count_);526EXPECT_EQ(1, io_task_verify_2.count_);527
528// ls1已经有个一个log_id被忽略聚合了529// EXPECT_EQ(log_io_worker->batch_io_task_mgr_.has_batched_size_ - prev_has_batched_size,
530// log_id_1 - 1 + log_id_2 -1 - prev_log_id_1);
531prev_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;532prev_log_id_2 = log_id_2;533prev_log_id_1 = log_id_1;534}535
536// 三个日志流,stripe为2537// 目前不支持可配的LogIOWorkerConfig,此测试暂时不打开,但结果是对的538// int64_t id_3 = ATOMIC_AAF(&palf_id_, 1);539// int64_t leader_idx_3 = 0;540// int64_t prev_log_id_3 = 0;541// PalfHandleImplGuard leader_3;542// IOTaskCond io_task_cond_3;543// IOTaskVerify io_task_verify_3;544// io_task_cond_3.init(id_3);545// io_task_verify_3.init(id_3);546// EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_3, leader_idx_3, leader_3));547// {548// EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3));549// EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110));550// EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));551// EXPECT_EQ(OB_SUCCESS, submit_log(leader_3, 1, id_3, 110));552// EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));553// sleep(1);554// io_task_cond_3.cond_.signal();555// const int64_t log_id_1 = leader_1.palf_handle_impl_->sw_.get_max_log_id();556// LSN max_lsn_1 = leader_1.palf_handle_impl_->sw_.get_max_lsn();557// const int64_t log_id_2 = leader_2.palf_handle_impl_->sw_.get_max_log_id();558// LSN max_lsn_2 = leader_2.palf_handle_impl_->sw_.get_max_lsn();559// const int64_t log_id_3 = leader_3.palf_handle_impl_->sw_.get_max_log_id();560// LSN max_lsn_3 = leader_3.palf_handle_impl_->sw_.get_max_lsn();561// wait_lsn_until_flushed(max_lsn_1, leader_1);562// wait_lsn_until_flushed(max_lsn_2, leader_2);563// wait_lsn_until_flushed(max_lsn_3, leader_3);564// EXPECT_EQ(log_io_worker->batch_io_task_mgr_.has_batched_size_ - prev_has_batched_size, 0);565// }566// 验证切文件场景567int64_t id_3 = ATOMIC_AAF(&palf_id_, 1);568int64_t leader_idx_3 = 0;569int64_t prev_log_id_3 = 0;570PalfHandleImplGuard leader_3;571EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_3, leader_idx_3, leader_3));572IOTaskCond io_task_cond_3(id_3, leader_3.get_palf_handle_impl()->log_engine_.palf_epoch_);573IOTaskVerify io_task_verify_3(id_3, leader_3.get_palf_handle_impl()->log_engine_.palf_epoch_);574{575LogIOWorker *log_io_worker = leader_3.palf_handle_impl_->log_engine_.log_io_worker_;576EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3));577EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110));578EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));579EXPECT_EQ(OB_SUCCESS, submit_log(leader_3, 1, id_3, 110));580sleep(1);581EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));582sleep(1);583io_task_cond_3.cond_.signal();584const int64_t log_id_1 = leader_1.palf_handle_impl_->sw_.get_max_log_id();585LSN max_lsn_1 = leader_1.palf_handle_impl_->sw_.get_max_lsn();586const int64_t log_id_2 = leader_2.palf_handle_impl_->sw_.get_max_log_id();587LSN max_lsn_2 = leader_2.palf_handle_impl_->sw_.get_max_lsn();588const int64_t log_id_3 = leader_3.palf_handle_impl_->sw_.get_max_log_id();589LSN max_lsn_3 = leader_3.palf_handle_impl_->sw_.get_max_lsn();590wait_lsn_until_flushed(max_lsn_1, leader_1);591wait_lsn_until_flushed(max_lsn_2, leader_2);592wait_lsn_until_flushed(max_lsn_3, leader_3);593// EXPECT_EQ(log_io_worker->batch_io_task_mgr_.has_batched_size_ - prev_has_batched_size, 2);
594EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 31, leader_idx_1, MAX_LOG_BODY_SIZE));595EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 2, leader_idx_1, 900 *1024));596max_lsn_1 = leader_1.palf_handle_impl_->get_max_lsn();597wait_lsn_until_flushed(max_lsn_1, leader_1);598
599PALF_LOG(INFO, "current log_tail", K(leader_1.palf_handle_impl_->get_max_lsn()));600EXPECT_EQ(0, leader_1.palf_handle_impl_->log_engine_.log_storage_.block_mgr_.min_block_id_);601
602EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1024, leader_idx_1, 300));603max_lsn_1 = leader_1.palf_handle_impl_->get_max_lsn();604wait_lsn_until_flushed(max_lsn_1, leader_1);605EXPECT_EQ(2, leader_1.palf_handle_impl_->log_engine_.log_storage_.block_mgr_.max_block_id_);606
607EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1024, leader_idx_1, 300));608max_lsn_1 = leader_1.palf_handle_impl_->get_max_lsn();609wait_lsn_until_flushed(max_lsn_1, leader_1);610EXPECT_EQ(OB_ITER_END, read_log(leader_1));611}612
613// 测试epoch change614PALF_LOG(INFO, "begin test epoch change");615int64_t id_4 = ATOMIC_AAF(&palf_id_, 1);616int64_t leader_idx_4 = 0;617int64_t prev_log_id_4 = 0;618PalfHandleImplGuard leader_4;619EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_4, leader_idx_4, leader_4));620IOTaskCond io_task_cond_4(id_4, leader_4.get_palf_handle_impl()->log_engine_.palf_epoch_);621IOTaskVerify io_task_verify_4(id_4, leader_4.get_palf_handle_impl()->log_engine_.palf_epoch_);622{623LogIOWorker *log_io_worker = leader_4.palf_handle_impl_->log_engine_.log_io_worker_;624EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_4));625EXPECT_EQ(OB_SUCCESS, submit_log(leader_4, 10, id_4, 110));626sleep(1);627LSN max_lsn = leader_4.palf_handle_impl_->sw_.get_max_lsn();628io_task_cond_4.cond_.signal();629PALF_LOG(INFO, "after signal");630// signal之后需要sleep一会等前面的日志都提交给io_worker,631// 否则在反馈模式下, 这批日志可能会延迟submit, 排在下一个cond task后面632sleep(1);633wait_lsn_until_flushed(max_lsn, leader_4);634EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_4));635EXPECT_EQ(OB_SUCCESS, submit_log(leader_4, 10, id_4, 110));636sleep(1);637leader_4.palf_handle_impl_->log_engine_.palf_epoch_++;638io_task_cond_4.cond_.signal();639LSN log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_;640PALF_LOG(INFO, "after signal", K(max_lsn), K(log_tail));641sleep(1);642log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_;643PALF_LOG(INFO, "after flused case 4", K(max_lsn), K(log_tail));644EXPECT_EQ(max_lsn, log_tail);645}646
647// 测试truncate648PALF_LOG(INFO, "begin test truncate");649int64_t id_5 = ATOMIC_AAF(&palf_id_, 1);650int64_t leader_idx_5 = 0;651int64_t prev_log_id_5 = 0;652PalfHandleImplGuard leader_5;653IOTaskCond io_task_cond_5(id_5, log_engine->palf_epoch_);654IOTaskVerify io_task_verify_5(id_5, log_engine->palf_epoch_);655TruncateLogCbCtx ctx(LSN(0));656EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_5, leader_idx_5, leader_5));657{658LogIOWorker *log_io_worker = leader_5.palf_handle_impl_->log_engine_.log_io_worker_;659EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_5));660EXPECT_EQ(OB_SUCCESS, submit_log(leader_5, 10, id_5, 110));661LSN max_lsn = leader_5.palf_handle_impl_->sw_.get_max_lsn();662sleep(2);663// 在提交truncate log task之前需先等待之前的日志提交写盘664io_task_cond_5.cond_.signal();665wait_lsn_until_flushed(max_lsn, leader_5);666EXPECT_EQ(OB_SUCCESS, leader_5.palf_handle_impl_->log_engine_.submit_truncate_log_task(ctx));667sleep(1);668EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_5));669sleep(1);670io_task_cond_5.cond_.signal();671// wait_lsn_until_flushed(max_lsn, leader_5);672EXPECT_EQ(0, leader_5.palf_handle_impl_->log_engine_.log_storage_.log_tail_);673}674
675PALF_LOG(INFO, "begin test sw full case");676// 测试滑动窗口满场景677// 聚合的两条日志分别在头尾部678int64_t id_6 = ATOMIC_AAF(&palf_id_, 1);679int64_t leader_idx_6 = 0;680int64_t prev_log_id_6 = 0;681PalfHandleImplGuard leader_6;682IOTaskCond io_task_cond_6(id_6, log_engine->palf_epoch_);683IOTaskVerify io_task_verify_6(id_6, log_engine->palf_epoch_);684EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_6, leader_idx_6, leader_6));685{686LogIOWorker *log_io_worker = leader_6.palf_handle_impl_->log_engine_.log_io_worker_;687{688EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 15, id_6, MAX_LOG_BODY_SIZE));689sleep(2);690LSN max_lsn = leader_6.palf_handle_impl_->sw_.get_max_lsn();691wait_lsn_until_flushed(max_lsn, leader_6);692EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_6));693EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 1, id_6, 10*1024));694sleep(1);695LSN max_lsn1 = leader_6.palf_handle_impl_->sw_.get_max_lsn();696int64_t remain_size = LEADER_DEFAULT_GROUP_BUFFER_SIZE - max_lsn1.val_ - LogGroupEntryHeader::HEADER_SER_SIZE - LogEntryHeader::HEADER_SER_SIZE;697EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 1, id_6, remain_size));698sleep(1);699LSN max_lsn2 = leader_6.palf_handle_impl_->sw_.get_max_lsn();700PALF_LOG_RET(ERROR, OB_SUCCESS, "runlin trace", K(max_lsn2), K(max_lsn1), K(remain_size), K(max_lsn));701EXPECT_EQ(max_lsn2, LSN(LEADER_DEFAULT_GROUP_BUFFER_SIZE));702io_task_cond_6.cond_.signal();703wait_lsn_until_flushed(max_lsn2, leader_6);704}705EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 3, id_6, MAX_LOG_BODY_SIZE));706sleep(2);707LSN max_lsn = leader_6.palf_handle_impl_->sw_.get_max_lsn();708wait_lsn_until_flushed(max_lsn, leader_6);709EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_6));710EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 1, id_6, 10*1024));711sleep(1);712LSN max_lsn1 = leader_6.palf_handle_impl_->sw_.get_max_lsn();713int64_t remain_size = FOLLOWER_DEFAULT_GROUP_BUFFER_SIZE - max_lsn1.val_ - LogGroupEntryHeader::HEADER_SER_SIZE - LogEntryHeader::HEADER_SER_SIZE;714EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 1, id_6, remain_size));715sleep(1);716LSN max_lsn2 = leader_6.palf_handle_impl_->sw_.get_max_lsn();717PALF_LOG_RET(ERROR, OB_SUCCESS, "runlin trace", K(max_lsn2), K(max_lsn1), K(remain_size), K(max_lsn));718EXPECT_EQ(max_lsn2, LSN(FOLLOWER_DEFAULT_GROUP_BUFFER_SIZE));719EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 1, id_6, 100));720sleep(1);721LSN max_lsn3 = leader_6.palf_handle_impl_->sw_.get_max_lsn();722io_task_cond_6.cond_.signal();723//EXPECT_EQ(max_lsn, leader_6.palf_handle_.palf_handle_impl_->log_engine_.log_storage_.log_tail_);724wait_lsn_until_flushed(max_lsn3, leader_6);725LSN log_tail = leader_6.palf_handle_impl_->log_engine_.log_storage_.log_tail_;726EXPECT_EQ(max_lsn3, log_tail);727}728
729PALF_LOG(INFO, "end io_reducer_basic_func");730}
731
732//TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_performance)
733//{
734// SET_CASE_LOG_FILE(TEST_NAME, "io_reducer_performance");
735//
736// OB_LOGGER.set_log_level("ERROR");
737// int64_t id = ATOMIC_AAF(&palf_id_, 1);
738// int64_t leader_idx = 0;
739// PalfHandleImplGuard leader;
740// EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
741// leader.palf_env_impl_->log_io_worker_.batch_io_task_mgr_.has_batched_size_ = 0;
742// leader.palf_env_impl_->log_io_worker_.batch_io_task_mgr_.handle_count_ = 0;
743// int64_t start_ts = ObTimeUtility::current_time();
744// EXPECT_EQ(OB_SUCCESS, submit_log(leader, 40 * 10000, leader_idx, 100));
745// const LSN max_lsn = leader.palf_handle_impl_->get_max_lsn();
746// wait_lsn_until_flushed(max_lsn, leader);
747// const int64_t has_batched_size = leader.palf_env_impl_->log_io_worker_.batch_io_task_mgr_.has_batched_size_;
748// const int64_t handle_count = leader.palf_env_impl_->log_io_worker_.batch_io_task_mgr_.handle_count_;
749// const int64_t log_id = leader.palf_handle_impl_->sw_.get_max_log_id();
750// int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
751// PALF_LOG(ERROR, "runlin trace performance", K(cost_ts), K(log_id), K(max_lsn), K(has_batched_size), K(handle_count));
752//}
753} // namespace unittest754} // namespace oceanbase755
756int main(int argc, char **argv) { RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME); }757