2
This file is part of TON Blockchain Library.
4
TON Blockchain Library is free software: you can redistribute it and/or modify
5
it under the terms of the GNU Lesser General Public License as published by
6
the Free Software Foundation, either version 2 of the License, or
7
(at your option) any later version.
9
TON Blockchain Library is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU Lesser General Public License for more details.
14
You should have received a copy of the GNU Lesser General Public License
15
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
17
Copyright 2017-2020 Telegram Systems LLP
19
#include "td/utils/tests.h"
21
#include "td/utils/as.h"
22
#include "td/utils/base64.h"
23
#include "td/utils/benchmark.h"
24
#include "td/utils/buffer.h"
25
#include "td/utils/crypto.h"
26
#include "td/utils/filesystem.h"
27
#include "td/utils/Slice.h"
28
#include "td/utils/Span.h"
29
#include "td/utils/misc.h"
30
#include "td/utils/overloaded.h"
31
#include "td/utils/optional.h"
32
#include "td/utils/port/FileFd.h"
33
#include "td/utils/port/path.h"
34
#include "td/utils/port/IoSlice.h"
35
#include "td/utils/UInt.h"
36
#include "td/utils/Variant.h"
37
#include "td/utils/VectorQueue.h"
39
#include "td/actor/actor.h"
41
#include "td/db/utils/StreamInterface.h"
42
#include "td/db/utils/ChainBuffer.h"
43
#include "td/db/utils/CyclicBuffer.h"
44
#include "td/db/binlog/BinlogReaderHelper.h"
46
#include "td/db/binlog/Binlog.h"
50
// Toy Binlog Implementation
52
using td::MutableSlice;
57
using RootHash = td::UInt256;
58
using FileHash = td::UInt256;
62
unsigned long long shard;
66
Result<int64> memcpy_parse(Slice data, T* res) {
67
if (data.size() < sizeof(T)) {
68
return -static_cast<int64>(sizeof(T));
70
std::memcpy(res, data.data(), sizeof(T));
71
if (res->tag_field != res->tag) {
72
return Status::Error("Tag mismatch");
77
int64 memcpy_serialize(MutableSlice data, const T& res) {
78
if (data.size() < sizeof(T)) {
79
return -static_cast<int64>(sizeof(T));
81
std::memcpy(data.data(), &res, sizeof(T));
86
struct LogEventCrc32C {
87
static constexpr unsigned tag = 0x473a830a;
91
LogEventCrc32C() = default;
92
LogEventCrc32C(td::uint32 crc32c) : tag_field(tag), crc32c(crc32c) {
94
static Result<int64> parse(Slice data, LogEventCrc32C* res) {
95
return memcpy_parse(data, res);
97
int64 serialize(MutableSlice data) const {
98
return memcpy_serialize(data, *this);
103
bool operator==(const LogEventCrc32C& other) const {
104
return key() == other.key();
106
bool operator!=(const LogEventCrc32C& other) const {
107
return !(*this == other);
111
struct LogEventStart {
112
static constexpr unsigned tag = 0x0442446b;
113
static constexpr unsigned log_type = 0x290100;
117
unsigned char zerostate_root_hash[32];
118
LogEventStart() = default;
119
LogEventStart(const RootHash& hash, unsigned _now = 0)
120
: tag_field(tag), type_field(log_type), created_at(_now ? _now : (unsigned)std::time(nullptr)) {
121
td::as<RootHash>(zerostate_root_hash) = hash;
123
static Result<int64> parse(Slice data, LogEventStart* res) {
124
return memcpy_parse(data, res);
126
int64 serialize(MutableSlice data) const {
127
return memcpy_serialize(data, *this);
130
return std::make_tuple(tag_field, type_field, created_at, Slice(zerostate_root_hash, 32));
132
bool operator==(const LogEventStart& other) const {
133
return key() == other.key();
135
bool operator!=(const LogEventStart& other) const {
136
return !(*this == other);
140
struct LogEventSetZeroState {
141
static constexpr unsigned tag = 0x63ab3cd9;
145
unsigned char file_hash[32];
146
unsigned char root_hash[32];
147
LogEventSetZeroState() = default;
148
LogEventSetZeroState(const RootHash& rhash, const FileHash& fhash, unsigned long long _fsize, unsigned _flags = 0)
149
: tag_field(tag), flags(_flags), file_size(_fsize) {
150
td::as<FileHash>(file_hash) = fhash;
151
td::as<RootHash>(root_hash) = rhash;
153
static Result<int64> parse(Slice data, LogEventSetZeroState* res) {
154
return memcpy_parse(data, res);
156
int64 serialize(MutableSlice data) const {
157
return memcpy_serialize(data, *this);
160
return std::make_tuple(tag_field, flags, file_size, Slice(file_hash, 32), Slice(root_hash, 32));
162
bool operator==(const LogEventSetZeroState& other) const {
163
return key() == other.key();
165
bool operator!=(const LogEventSetZeroState& other) const {
166
return !(*this == other);
170
struct LogEventNewBlock {
171
static constexpr unsigned tag = 0x19f4bc63;
173
unsigned flags; // lower 8 bits = authority
176
unsigned long long shard;
178
unsigned char file_hash[32];
179
unsigned char root_hash[32];
180
unsigned char last_bytes[8];
181
LogEventNewBlock() = default;
182
LogEventNewBlock(const BlockId& block, const RootHash& rhash, const FileHash& fhash, unsigned long long _fsize,
186
, workchain(block.workchain)
189
, file_size(_fsize) {
190
td::as<FileHash>(file_hash) = fhash;
191
td::as<RootHash>(root_hash) = rhash;
192
td::as<unsigned long long>(last_bytes) = 0;
194
static Result<int64> parse(Slice data, LogEventNewBlock* res) {
195
return memcpy_parse(data, res);
197
int64 serialize(MutableSlice data) const {
198
return memcpy_serialize(data, *this);
201
return std::make_tuple(tag_field, flags, workchain, seqno, shard, file_size, Slice(file_hash, 32),
202
Slice(root_hash, 32), Slice(last_bytes, 8));
204
bool operator==(const LogEventNewBlock& other) const {
205
return key() == other.key();
207
bool operator!=(const LogEventNewBlock& other) const {
208
return !(*this == other);
212
struct LogEventNewState {
213
static constexpr unsigned tag = 0x4190a21f;
215
unsigned flags; // lower 8 bits = authority
218
unsigned long long shard;
220
unsigned char file_hash[32];
221
unsigned char root_hash[32];
222
unsigned char last_bytes[8];
223
LogEventNewState() = default;
224
LogEventNewState(const BlockId& state, const RootHash& rhash, const FileHash& fhash, unsigned long long _fsize,
228
, workchain(state.workchain)
231
, file_size(_fsize) {
232
td::as<FileHash>(file_hash) = fhash;
233
td::as<RootHash>(root_hash) = rhash;
234
td::as<unsigned long long>(last_bytes) = 0;
236
static Result<int64> parse(Slice data, LogEventNewState* res) {
237
return memcpy_parse(data, res);
239
int64 serialize(MutableSlice data) const {
240
return memcpy_serialize(data, *this);
243
return std::make_tuple(tag_field, flags, workchain, seqno, shard, file_size, Slice(file_hash, 32),
244
Slice(root_hash, 32), Slice(last_bytes, 8));
246
bool operator==(const LogEventNewState& other) const {
247
return key() == other.key();
249
bool operator!=(const LogEventNewState& other) const {
250
return !(*this == other);
255
struct LogEventString {
256
static constexpr unsigned tag = 0xabcdabcd;
260
bool operator==(const LogEventString& other) const {
261
return data == other.data;
263
bool operator!=(const LogEventString& other) const {
264
return !(*this == other);
267
int64 serialize(MutableSlice dest) const {
268
size_t need_size = 8 + data.size();
269
if (dest.size() < need_size) {
270
return -static_cast<int64>(need_size);
272
dest.truncate(need_size);
273
td::as<unsigned>(dest.data()) = unsigned(tag);
274
td::as<int>(dest.data() + 4) = td::narrow_cast<int>(data.size());
275
dest.substr(8).copy_from(data);
279
static Result<int64> parse(Slice data, LogEventString* res) {
280
if (data.size() < 4) {
283
unsigned got_tag = td::as<unsigned>(data.data());
284
if (got_tag != tag) {
285
return Status::Error(PSLICE() << "tag mismatch " << td::format::as_hex(got_tag));
287
data = data.substr(4);
288
if (data.size() < 4) {
291
td::int64 length = td::as<td::uint32>(data.data());
292
data = data.substr(4);
293
if (static_cast<int64>(data.size()) < length) {
296
res->data = data.substr(0, td::narrow_cast<std::size_t>(length)).str();
302
td::Variant<LogEventCrc32C, LogEventStart, LogEventString, LogEventNewBlock, LogEventNewState, LogEventSetZeroState>
303
event_{LogEventStart{}};
305
bool operator==(const LogEvent& other) const {
306
return event_ == other.event_;
308
bool operator!=(const LogEvent& other) const {
309
return !(*this == other);
312
LogEvent() = default;
313
LogEvent(LogEvent&& other) = default;
315
LogEvent(T&& e) : event_(std::forward<T>(e)) {
318
int64 serialize(MutableSlice data) const {
320
event_.visit([&](auto& e) { res = e.serialize(data); });
324
static Result<int64> parse(Slice data, LogEvent* res) {
325
if (data.size() < 4) {
328
//LOG(ERROR) << td::format::as_hex_dump<4>(data);
329
unsigned got_tag = td::as<unsigned>(data.data());
331
case LogEventCrc32C::tag: {
333
TRY_RESULT(x, e.parse(data, &e));
339
case LogEventStart::tag: {
341
TRY_RESULT(x, e.parse(data, &e));
347
case LogEventSetZeroState::tag: {
348
LogEventSetZeroState e;
349
TRY_RESULT(x, e.parse(data, &e));
355
case LogEventNewBlock::tag: {
357
TRY_RESULT(x, e.parse(data, &e));
363
case LogEventNewState::tag: {
365
TRY_RESULT(x, e.parse(data, &e));
371
case LogEventString::tag: {
373
TRY_RESULT(x, e.parse(data, &e));
380
return Status::Error(PSLICE() << "Unknown tag: " << td::format::as_hex(got_tag));
385
static td::CSlice test_binlog_path("test.binlog");
387
class BinlogReader : public td::BinlogReaderInterface {
389
td::Span<LogEvent> logevents() const {
393
td::Result<td::int64> parse(td::Slice data) override {
394
if (data.size() < 4) {
398
TRY_RESULT(size, res.parse(data, &res));
400
if (res.event_.get_offset() == res.event_.offset<LogEventCrc32C>()) {
401
auto crc = res.event_.get<LogEventCrc32C>().crc32c;
404
return Status::Error("Crc mismatch");
407
logevents_.emplace_back(std::move(res));
409
lazy_crc_extend(data.substr(0, td::narrow_cast<std::size_t>(size)));
414
td::uint32 crc32c() {
419
void flush() override {
424
std::vector<LogEvent> logevents_;
429
crc_ = td::crc32c_extend(crc_, suffix_);
432
void lazy_crc_extend(Slice slice) {
433
if (suffix_.empty()) {
437
if (suffix_.end() == slice.begin()) {
438
suffix_ = Slice(suffix_.begin(), slice.end());
449
size_t logevent_count = 1000;
450
for (size_t i = 0; i < logevent_count; i++) {
451
add_logevent(create_random_logevent());
458
td::Span<LogEvent> logevents() const {
463
std::vector<LogEvent> logevents_;
467
void add_logevent(T event) {
468
int64 size = -event.serialize({});
469
std::string data(td::narrow_cast<std::size_t>(size), '\0');
470
int64 new_size = event.serialize(data);
471
CHECK(new_size == size);
473
logevents_.emplace_back(std::move(event));
476
LogEvent create_random_logevent() {
477
auto rand_uint256 = [] {
479
td::Random::secure_bytes(as_slice(res));
482
auto rand_block_id = [] {
484
res.workchain = td::Random::fast(0, 100);
485
res.shard = td::Random::fast(0, 100);
486
res.seqno = td::Random::fast(0, 100);
490
auto type = td::Random::fast(0, 4);
493
auto size = td::Random::fast(0, 10);
494
LogEventString event;
495
event.data = td::rand_string('a', 'z', size);
499
return LogEventStart(rand_uint256(), 12);
502
return LogEventSetZeroState(rand_uint256(), rand_uint256(), td::Random::fast(0, 1000),
503
td::Random::fast(0, 1000));
506
return LogEventNewBlock(rand_block_id(), rand_uint256(), rand_uint256(), 12, 17);
509
return LogEventNewState(rand_block_id(), rand_uint256(), rand_uint256(), 12, 17);
516
void test_binlog(td::Slice data, td::optional<td::Span<LogEvent>> events = {}) {
517
auto splitted_binlog = td::rand_split(data);
519
std::string new_binlog_data;
522
td::BinlogReaderHelper reader_impl;
523
for (auto& chunk : splitted_binlog) {
524
reader_impl.parse(reader, chunk).ensure();
529
td::Binlog::destroy(test_binlog_path);
530
td::BinlogWriter binlog_writer(test_binlog_path.str());
531
binlog_writer.open().ensure();
533
BinlogReader new_reader;
535
for (auto& logevent : reader.logevents()) {
536
binlog_writer.write_event(logevent, &new_reader).ensure();
539
binlog_writer.write_event(LogEvent(LogEventCrc32C(new_reader.crc32c())), &new_reader).ensure();
542
binlog_writer.sync();
543
binlog_writer.close().ensure();
545
auto file_data = read_file(test_binlog_path).move_as_ok();
546
ASSERT_TRUE(reader.logevents() == new_reader.logevents());
547
new_binlog_data = file_data.as_slice().str();
548
data = new_binlog_data;
549
//ASSERT_EQ(data, file_data);
554
td::Binlog::destroy(test_binlog_path);
555
td::BinlogWriterAsync binlog_writer(test_binlog_path.str());
557
td::actor::Scheduler scheduler({2});
559
BinlogReader new_reader;
560
scheduler.run_in_context([&]() mutable {
561
binlog_writer.open().ensure();
562
for (auto& logevent : reader.logevents()) {
563
binlog_writer.write_event(logevent, &new_reader).ensure();
565
binlog_writer.sync([&](Result<td::Unit> res) {
567
binlog_writer.close([&](Result<td::Unit> res) {
569
td::actor::SchedulerContext::get()->stop();
577
auto file_data = read_file(test_binlog_path).move_as_ok();
578
ASSERT_TRUE(reader.logevents() == new_reader.logevents());
579
//ASSERT_EQ(data, file_data);
582
ASSERT_TRUE(!events || events.value() == reader.logevents());
584
std::string new_data;
585
for (auto& event : reader.logevents()) {
586
int64 size = -event.serialize({});
587
std::string event_data(td::narrow_cast<std::size_t>(size), '\0');
588
int64 new_size = event.serialize(event_data);
589
CHECK(new_size == size);
590
new_data += event_data;
592
//ASSERT_EQ(data, new_data);
596
td::CSlice path("test.binlog");
597
td::Binlog::destroy(path);
598
td::write_file(path, data).ensure();
600
td::Binlog binlog(path.str());
601
BinlogReader binlog_reader;
602
binlog.replay_sync(binlog_reader).ensure();
604
ASSERT_EQ(reader.logevents().size(), binlog_reader.logevents().size());
605
ASSERT_TRUE(reader.logevents() == binlog_reader.logevents());
608
// Binlog::read_async
610
td::Binlog::destroy(test_binlog_path);
611
td::write_file(test_binlog_path, data).ensure();
613
td::Binlog binlog(test_binlog_path.str());
614
auto binlog_reader = std::make_shared<BinlogReader>();
616
td::actor::Scheduler scheduler({2});
617
scheduler.run_in_context([&]() mutable {
618
binlog.replay_async(binlog_reader, [](Result<td::Unit> res) {
620
td::actor::SchedulerContext::get()->stop();
627
ASSERT_EQ(reader.logevents().size(), binlog_reader->logevents().size());
628
ASSERT_TRUE(reader.logevents() == binlog_reader->logevents());
632
TEST(Binlog, Reader) {
634
test_binlog(binlog.data(), binlog.logevents());
638
std::string binlog = td::base64_decode(
639
"a0RCBAABKQCRMn1c2DaJhwrptxburpRtrWI2sjGhVbG29bFO0r8DDtAAExjZPKtjAAAAALwGAAAA"
640
"AAAAFvJq3qfzFCDWap+LUrgBI8sWFayIOQSxkBjV3CWgizHYNomHCum3Fu6ulG2tYjayMaFVsbb1"
641
"sU7SvwMO0AATGGO89BmAAAAA/////wEAAAAAAAAAAAAAgN4RAAAAAAAAa53L4ziGleZ7K+StAsBd"
642
"txMxbHHfuB9SJRFp+BMzXfnGnt8TsgFnig7j/xVRjtIsYUVw0rQZJUC0sWQROj0SHvplIkBV9vMp")
647
TEST(Buffers, CyclicBufferSimple) {
649
auto reader_writer = td::CyclicBuffer::create();
650
auto reader = std::move(reader_writer.first);
651
auto writer = std::move(reader_writer.second);
653
ASSERT_TRUE(!writer.is_reader_closed());
654
reader.close_reader(td::Status::Error(2));
655
ASSERT_TRUE(!reader.is_writer_closed());
656
ASSERT_TRUE(writer.is_reader_closed());
657
ASSERT_EQ(2, writer.reader_status().code());
660
auto reader_writer = td::CyclicBuffer::create();
661
auto reader = std::move(reader_writer.first);
662
auto writer = std::move(reader_writer.second);
664
ASSERT_TRUE(!reader.is_writer_closed());
665
writer.close_writer(td::Status::Error(2));
666
ASSERT_TRUE(!writer.is_reader_closed());
667
ASSERT_TRUE(reader.is_writer_closed());
668
ASSERT_EQ(2, reader.writer_status().code());
671
td::CyclicBuffer::Options options;
672
options.chunk_size = 14;
674
options.alignment = 7;
675
auto reader_writer = td::CyclicBuffer::create(options);
676
auto reader = std::move(reader_writer.first);
677
auto writer = std::move(reader_writer.second);
679
auto data = td::rand_string('a', 'z', 100001);
680
td::Slice write_slice = data;
681
td::Slice read_slice = data;
682
for (size_t i = 1; i < options.count; i++) {
683
ASSERT_EQ((i - 1) * options.chunk_size, reader.reader_size());
684
ASSERT_EQ((i - 1) * options.chunk_size, writer.writer_size());
685
auto slice = writer.prepare_write();
686
ASSERT_EQ(0u, reinterpret_cast<td::uint64>(slice.data()) % options.alignment);
687
auto to_copy = write_slice;
688
to_copy.truncate(options.chunk_size);
689
slice.copy_from(to_copy);
690
write_slice = write_slice.substr(to_copy.size());
691
writer.confirm_write(to_copy.size());
692
ASSERT_EQ(i * options.chunk_size, reader.reader_size());
693
ASSERT_EQ(i * options.chunk_size, writer.writer_size());
695
bool is_writer_closed = false;
698
bool is_closed = reader.is_writer_closed();
699
auto slice = reader.prepare_read();
700
ASSERT_EQ(read_slice.substr(0, slice.size()), slice);
701
read_slice = read_slice.substr(slice.size());
702
reader.confirm_read(slice.size());
703
if (is_closed && slice.empty()) {
708
if (!is_writer_closed) {
709
auto slice = writer.prepare_write();
710
auto to_copy = write_slice;
711
to_copy.truncate(options.chunk_size);
712
if (to_copy.empty()) {
713
writer.close_writer(td::Status::OK());
714
is_writer_closed = true;
716
slice.copy_from(to_copy);
717
write_slice = write_slice.substr(to_copy.size());
718
writer.confirm_write(to_copy.size());
722
ASSERT_EQ(0u, write_slice.size());
723
ASSERT_EQ(0u, read_slice.size());
727
TEST(Buffers, CyclicBuffer) {
728
for (int t = 0; t < 20; t++) {
729
td::CyclicBuffer::Options options;
730
options.chunk_size = 14;
732
options.alignment = 7;
733
auto reader_writer = td::CyclicBuffer::create(options);
734
auto reader = std::move(reader_writer.first);
735
auto writer = std::move(reader_writer.second);
736
auto data = td::rand_string('a', 'z', 100001);
737
auto chunks = td::rand_split(data);
742
if (td::Random::fast(0, 1) == 0) {
743
bool is_closed = reader.is_writer_closed();
744
auto slice = reader.prepare_read();
746
reader.confirm_read(slice.size());
747
if (slice.empty() && is_closed) {
748
reader.writer_status().ensure();
752
if (chunk_i < chunks.size() && td::Random::fast(0, 1) == 0) {
753
auto slice = writer.prepare_write();
754
auto from = Slice(chunks[chunk_i]);
755
auto copy = from.substr(0, slice.size());
756
slice.copy_from(copy);
757
writer.confirm_write(copy.size());
758
auto left = from.substr(copy.size());
760
chunks[chunk_i] = left.str();
763
if (chunk_i == chunks.size()) {
764
writer.close_writer(td::Status::OK());
769
ASSERT_EQ(data, res);
773
TEST(Buffers, ChainBuffer) {
774
for (int t = 0; t < 20; t++) {
775
td::ChainBuffer::Options options;
776
options.chunk_size = 14;
777
auto reader_writer = td::ChainBuffer::create(options);
778
auto reader = std::move(reader_writer.first);
779
auto writer = std::move(reader_writer.second);
780
auto data = td::rand_string('a', 'z', 100001);
781
auto chunks = td::rand_split(data);
786
if (td::Random::fast(0, 1) == 0) {
787
bool is_closed = reader.is_writer_closed();
789
if (reader.reader_size() != 0) {
790
slice = reader.prepare_read();
792
reader.confirm_read(slice.size());
794
if (slice.empty() && is_closed) {
795
reader.writer_status().ensure();
799
if (chunk_i < chunks.size() && td::Random::fast(0, 1) == 0) {
800
writer.append(chunks[chunk_i]);
802
if (chunk_i == chunks.size()) {
803
writer.close_writer(td::Status::OK());
807
ASSERT_EQ(data.size(), res.size());
808
ASSERT_EQ(data, res);