Ton

Форк
0
/
binlog.cpp 
810 строк · 23.3 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19
#include "td/utils/tests.h"
20

21
#include "td/utils/as.h"
22
#include "td/utils/base64.h"
23
#include "td/utils/benchmark.h"
24
#include "td/utils/buffer.h"
25
#include "td/utils/crypto.h"
26
#include "td/utils/filesystem.h"
27
#include "td/utils/Slice.h"
28
#include "td/utils/Span.h"
29
#include "td/utils/misc.h"
30
#include "td/utils/overloaded.h"
31
#include "td/utils/optional.h"
32
#include "td/utils/port/FileFd.h"
33
#include "td/utils/port/path.h"
34
#include "td/utils/port/IoSlice.h"
35
#include "td/utils/UInt.h"
36
#include "td/utils/Variant.h"
37
#include "td/utils/VectorQueue.h"
38

39
#include "td/actor/actor.h"
40

41
#include "td/db/utils/StreamInterface.h"
42
#include "td/db/utils/ChainBuffer.h"
43
#include "td/db/utils/CyclicBuffer.h"
44
#include "td/db/binlog/BinlogReaderHelper.h"
45

46
#include "td/db/binlog/Binlog.h"
47

48
#include <ctime>
49

50
// Toy Binlog Implementation
51
using td::int64;
52
using td::MutableSlice;
53
using td::Result;
54
using td::Slice;
55
using td::Status;
56

57
using RootHash = td::UInt256;
58
using FileHash = td::UInt256;
59
struct BlockId {
60
  int workchain;
61
  unsigned seqno;
62
  unsigned long long shard;
63
};
64

65
template <class T>
66
Result<int64> memcpy_parse(Slice data, T* res) {
67
  if (data.size() < sizeof(T)) {
68
    return -static_cast<int64>(sizeof(T));
69
  }
70
  std::memcpy(res, data.data(), sizeof(T));
71
  if (res->tag_field != res->tag) {
72
    return Status::Error("Tag mismatch");
73
  }
74
  return sizeof(T);
75
}
76
template <class T>
77
int64 memcpy_serialize(MutableSlice data, const T& res) {
78
  if (data.size() < sizeof(T)) {
79
    return -static_cast<int64>(sizeof(T));
80
  }
81
  std::memcpy(data.data(), &res, sizeof(T));
82
  return sizeof(T);
83
}
84

85
#pragma pack(push, 4)
86
struct LogEventCrc32C {
87
  static constexpr unsigned tag = 0x473a830a;
88

89
  unsigned tag_field;
90
  td::uint32 crc32c;
91
  LogEventCrc32C() = default;
92
  LogEventCrc32C(td::uint32 crc32c) : tag_field(tag), crc32c(crc32c) {
93
  }
94
  static Result<int64> parse(Slice data, LogEventCrc32C* res) {
95
    return memcpy_parse(data, res);
96
  }
97
  int64 serialize(MutableSlice data) const {
98
    return memcpy_serialize(data, *this);
99
  }
100
  auto key() const {
101
    return crc32c;
102
  }
103
  bool operator==(const LogEventCrc32C& other) const {
104
    return key() == other.key();
105
  }
106
  bool operator!=(const LogEventCrc32C& other) const {
107
    return !(*this == other);
108
  }
109
};
110

111
struct LogEventStart {
112
  static constexpr unsigned tag = 0x0442446b;
113
  static constexpr unsigned log_type = 0x290100;
114
  unsigned tag_field;
115
  unsigned type_field;
116
  unsigned created_at;
117
  unsigned char zerostate_root_hash[32];
118
  LogEventStart() = default;
119
  LogEventStart(const RootHash& hash, unsigned _now = 0)
120
      : tag_field(tag), type_field(log_type), created_at(_now ? _now : (unsigned)std::time(nullptr)) {
121
    td::as<RootHash>(zerostate_root_hash) = hash;
122
  }
123
  static Result<int64> parse(Slice data, LogEventStart* res) {
124
    return memcpy_parse(data, res);
125
  }
126
  int64 serialize(MutableSlice data) const {
127
    return memcpy_serialize(data, *this);
128
  }
129
  auto key() const {
130
    return std::make_tuple(tag_field, type_field, created_at, Slice(zerostate_root_hash, 32));
131
  }
132
  bool operator==(const LogEventStart& other) const {
133
    return key() == other.key();
134
  }
135
  bool operator!=(const LogEventStart& other) const {
136
    return !(*this == other);
137
  }
138
};
139

140
struct LogEventSetZeroState {
141
  static constexpr unsigned tag = 0x63ab3cd9;
142
  unsigned tag_field;
143
  unsigned flags;
144
  long long file_size;
145
  unsigned char file_hash[32];
146
  unsigned char root_hash[32];
147
  LogEventSetZeroState() = default;
148
  LogEventSetZeroState(const RootHash& rhash, const FileHash& fhash, unsigned long long _fsize, unsigned _flags = 0)
149
      : tag_field(tag), flags(_flags), file_size(_fsize) {
150
    td::as<FileHash>(file_hash) = fhash;
151
    td::as<RootHash>(root_hash) = rhash;
152
  }
153
  static Result<int64> parse(Slice data, LogEventSetZeroState* res) {
154
    return memcpy_parse(data, res);
155
  }
156
  int64 serialize(MutableSlice data) const {
157
    return memcpy_serialize(data, *this);
158
  }
159
  auto key() const {
160
    return std::make_tuple(tag_field, flags, file_size, Slice(file_hash, 32), Slice(root_hash, 32));
161
  }
162
  bool operator==(const LogEventSetZeroState& other) const {
163
    return key() == other.key();
164
  }
165
  bool operator!=(const LogEventSetZeroState& other) const {
166
    return !(*this == other);
167
  }
168
};
169

170
struct LogEventNewBlock {
171
  static constexpr unsigned tag = 0x19f4bc63;
172
  unsigned tag_field;
173
  unsigned flags;  // lower 8 bits = authority
174
  int workchain;
175
  unsigned seqno;
176
  unsigned long long shard;
177
  long long file_size;
178
  unsigned char file_hash[32];
179
  unsigned char root_hash[32];
180
  unsigned char last_bytes[8];
181
  LogEventNewBlock() = default;
182
  LogEventNewBlock(const BlockId& block, const RootHash& rhash, const FileHash& fhash, unsigned long long _fsize,
183
                   unsigned _flags)
184
      : tag_field(tag)
185
      , flags(_flags)
186
      , workchain(block.workchain)
187
      , seqno(block.seqno)
188
      , shard(block.shard)
189
      , file_size(_fsize) {
190
    td::as<FileHash>(file_hash) = fhash;
191
    td::as<RootHash>(root_hash) = rhash;
192
    td::as<unsigned long long>(last_bytes) = 0;
193
  }
194
  static Result<int64> parse(Slice data, LogEventNewBlock* res) {
195
    return memcpy_parse(data, res);
196
  }
197
  int64 serialize(MutableSlice data) const {
198
    return memcpy_serialize(data, *this);
199
  }
200
  auto key() const {
201
    return std::make_tuple(tag_field, flags, workchain, seqno, shard, file_size, Slice(file_hash, 32),
202
                           Slice(root_hash, 32), Slice(last_bytes, 8));
203
  }
204
  bool operator==(const LogEventNewBlock& other) const {
205
    return key() == other.key();
206
  }
207
  bool operator!=(const LogEventNewBlock& other) const {
208
    return !(*this == other);
209
  }
210
};
211

212
struct LogEventNewState {
213
  static constexpr unsigned tag = 0x4190a21f;
214
  unsigned tag_field;
215
  unsigned flags;  // lower 8 bits = authority
216
  int workchain;
217
  unsigned seqno;
218
  unsigned long long shard;
219
  long long file_size;
220
  unsigned char file_hash[32];
221
  unsigned char root_hash[32];
222
  unsigned char last_bytes[8];
223
  LogEventNewState() = default;
224
  LogEventNewState(const BlockId& state, const RootHash& rhash, const FileHash& fhash, unsigned long long _fsize,
225
                   unsigned _flags)
226
      : tag_field(tag)
227
      , flags(_flags)
228
      , workchain(state.workchain)
229
      , seqno(state.seqno)
230
      , shard(state.shard)
231
      , file_size(_fsize) {
232
    td::as<FileHash>(file_hash) = fhash;
233
    td::as<RootHash>(root_hash) = rhash;
234
    td::as<unsigned long long>(last_bytes) = 0;
235
  }
236
  static Result<int64> parse(Slice data, LogEventNewState* res) {
237
    return memcpy_parse(data, res);
238
  }
239
  int64 serialize(MutableSlice data) const {
240
    return memcpy_serialize(data, *this);
241
  }
242
  auto key() const {
243
    return std::make_tuple(tag_field, flags, workchain, seqno, shard, file_size, Slice(file_hash, 32),
244
                           Slice(root_hash, 32), Slice(last_bytes, 8));
245
  }
246
  bool operator==(const LogEventNewState& other) const {
247
    return key() == other.key();
248
  }
249
  bool operator!=(const LogEventNewState& other) const {
250
    return !(*this == other);
251
  }
252
};
253
#pragma pack(pop)
254

255
struct LogEventString {
256
  static constexpr unsigned tag = 0xabcdabcd;
257

258
  std::string data;
259

260
  bool operator==(const LogEventString& other) const {
261
    return data == other.data;
262
  }
263
  bool operator!=(const LogEventString& other) const {
264
    return !(*this == other);
265
  }
266

267
  int64 serialize(MutableSlice dest) const {
268
    size_t need_size = 8 + data.size();
269
    if (dest.size() < need_size) {
270
      return -static_cast<int64>(need_size);
271
    }
272
    dest.truncate(need_size);
273
    td::as<unsigned>(dest.data()) = unsigned(tag);
274
    td::as<int>(dest.data() + 4) = td::narrow_cast<int>(data.size());
275
    dest.substr(8).copy_from(data);
276
    return dest.size();
277
  }
278

279
  static Result<int64> parse(Slice data, LogEventString* res) {
280
    if (data.size() < 4) {
281
      return -4;
282
    }
283
    unsigned got_tag = td::as<unsigned>(data.data());
284
    if (got_tag != tag) {
285
      return Status::Error(PSLICE() << "tag mismatch " << td::format::as_hex(got_tag));
286
    }
287
    data = data.substr(4);
288
    if (data.size() < 4) {
289
      return -8;
290
    }
291
    td::int64 length = td::as<td::uint32>(data.data());
292
    data = data.substr(4);
293
    if (static_cast<int64>(data.size()) < length) {
294
      return -length - 8;
295
    }
296
    res->data = data.substr(0, td::narrow_cast<std::size_t>(length)).str();
297
    return length + 8;
298
  }
299
};
300

301
struct LogEvent {
302
  td::Variant<LogEventCrc32C, LogEventStart, LogEventString, LogEventNewBlock, LogEventNewState, LogEventSetZeroState>
303
      event_{LogEventStart{}};
304

305
  bool operator==(const LogEvent& other) const {
306
    return event_ == other.event_;
307
  }
308
  bool operator!=(const LogEvent& other) const {
309
    return !(*this == other);
310
  }
311

312
  LogEvent() = default;
313
  LogEvent(LogEvent&& other) = default;
314
  template <class T>
315
  LogEvent(T&& e) : event_(std::forward<T>(e)) {
316
  }
317

318
  int64 serialize(MutableSlice data) const {
319
    int64 res;
320
    event_.visit([&](auto& e) { res = e.serialize(data); });
321
    return res;
322
  }
323

324
  static Result<int64> parse(Slice data, LogEvent* res) {
325
    if (data.size() < 4) {
326
      return -4;
327
    }
328
    //LOG(ERROR) << td::format::as_hex_dump<4>(data);
329
    unsigned got_tag = td::as<unsigned>(data.data());
330
    switch (got_tag) {
331
      case LogEventCrc32C::tag: {
332
        LogEventCrc32C e;
333
        TRY_RESULT(x, e.parse(data, &e));
334
        if (x >= 0) {
335
          res->event_ = e;
336
        }
337
        return x;
338
      }
339
      case LogEventStart::tag: {
340
        LogEventStart e;
341
        TRY_RESULT(x, e.parse(data, &e));
342
        if (x >= 0) {
343
          res->event_ = e;
344
        }
345
        return x;
346
      }
347
      case LogEventSetZeroState::tag: {
348
        LogEventSetZeroState e;
349
        TRY_RESULT(x, e.parse(data, &e));
350
        if (x >= 0) {
351
          res->event_ = e;
352
        }
353
        return x;
354
      }
355
      case LogEventNewBlock::tag: {
356
        LogEventNewBlock e;
357
        TRY_RESULT(x, e.parse(data, &e));
358
        if (x >= 0) {
359
          res->event_ = e;
360
        }
361
        return x;
362
      }
363
      case LogEventNewState::tag: {
364
        LogEventNewState e;
365
        TRY_RESULT(x, e.parse(data, &e));
366
        if (x >= 0) {
367
          res->event_ = e;
368
        }
369
        return x;
370
      }
371
      case LogEventString::tag: {
372
        LogEventString e;
373
        TRY_RESULT(x, e.parse(data, &e));
374
        if (x >= 0) {
375
          res->event_ = e;
376
        }
377
        return x;
378
      }
379
      default:
380
        return Status::Error(PSLICE() << "Unknown tag: " << td::format::as_hex(got_tag));
381
    }
382
  }
383
};
384

385
static td::CSlice test_binlog_path("test.binlog");
386

387
class BinlogReader : public td::BinlogReaderInterface {
388
 public:
389
  td::Span<LogEvent> logevents() const {
390
    return logevents_;
391
  }
392

393
  td::Result<td::int64> parse(td::Slice data) override {
394
    if (data.size() < 4) {
395
      return -4;
396
    }
397
    LogEvent res;
398
    TRY_RESULT(size, res.parse(data, &res));
399
    if (size > 0) {
400
      if (res.event_.get_offset() == res.event_.offset<LogEventCrc32C>()) {
401
        auto crc = res.event_.get<LogEventCrc32C>().crc32c;
402
        flush_crc();
403
        if (crc != crc_) {
404
          return Status::Error("Crc mismatch");
405
        }
406
      } else {
407
        logevents_.emplace_back(std::move(res));
408
      }
409
      lazy_crc_extend(data.substr(0, td::narrow_cast<std::size_t>(size)));
410
    }
411
    return size;
412
  }
413

414
  td::uint32 crc32c() {
415
    flush_crc();
416
    return crc_;
417
  }
418

419
  void flush() override {
420
    flush_crc();
421
  }
422

423
 private:
424
  std::vector<LogEvent> logevents_;
425
  td::uint32 crc_{0};
426
  td::Slice suffix_;
427

428
  void flush_crc() {
429
    crc_ = td::crc32c_extend(crc_, suffix_);
430
    suffix_ = Slice();
431
  }
432
  void lazy_crc_extend(Slice slice) {
433
    if (suffix_.empty()) {
434
      suffix_ = slice;
435
      return;
436
    }
437
    if (suffix_.end() == slice.begin()) {
438
      suffix_ = Slice(suffix_.begin(), slice.end());
439
      return;
440
    }
441
    flush_crc();
442
    suffix_ = slice;
443
  }
444
};
445

446
class RandomBinlog {
447
 public:
448
  RandomBinlog() {
449
    size_t logevent_count = 1000;
450
    for (size_t i = 0; i < logevent_count; i++) {
451
      add_logevent(create_random_logevent());
452
    }
453
  }
454

455
  Slice data() const {
456
    return data_;
457
  }
458
  td::Span<LogEvent> logevents() const {
459
    return logevents_;
460
  }
461

462
 private:
463
  std::vector<LogEvent> logevents_;
464
  std::string data_;
465

466
  template <class T>
467
  void add_logevent(T event) {
468
    int64 size = -event.serialize({});
469
    std::string data(td::narrow_cast<std::size_t>(size), '\0');
470
    int64 new_size = event.serialize(data);
471
    CHECK(new_size == size);
472
    data_ += data;
473
    logevents_.emplace_back(std::move(event));
474
  }
475

476
  LogEvent create_random_logevent() {
477
    auto rand_uint256 = [] {
478
      td::UInt256 res;
479
      td::Random::secure_bytes(as_slice(res));
480
      return res;
481
    };
482
    auto rand_block_id = [] {
483
      BlockId res;
484
      res.workchain = td::Random::fast(0, 100);
485
      res.shard = td::Random::fast(0, 100);
486
      res.seqno = td::Random::fast(0, 100);
487
      return res;
488
    };
489

490
    auto type = td::Random::fast(0, 4);
491
    switch (type) {
492
      case 0: {
493
        auto size = td::Random::fast(0, 10);
494
        LogEventString event;
495
        event.data = td::rand_string('a', 'z', size);
496
        return event;
497
      }
498
      case 1: {
499
        return LogEventStart(rand_uint256(), 12);
500
      }
501
      case 2: {
502
        return LogEventSetZeroState(rand_uint256(), rand_uint256(), td::Random::fast(0, 1000),
503
                                    td::Random::fast(0, 1000));
504
      }
505
      case 3: {
506
        return LogEventNewBlock(rand_block_id(), rand_uint256(), rand_uint256(), 12, 17);
507
      }
508
      case 4: {
509
        return LogEventNewState(rand_block_id(), rand_uint256(), rand_uint256(), 12, 17);
510
      }
511
    }
512
    UNREACHABLE();
513
  }
514
};
515

516
void test_binlog(td::Slice data, td::optional<td::Span<LogEvent>> events = {}) {
517
  auto splitted_binlog = td::rand_split(data);
518

519
  std::string new_binlog_data;
520

521
  BinlogReader reader;
522
  td::BinlogReaderHelper reader_impl;
523
  for (auto& chunk : splitted_binlog) {
524
    reader_impl.parse(reader, chunk).ensure();
525
  }
526

527
  //Binlog write sync
528
  {
529
    td::Binlog::destroy(test_binlog_path);
530
    td::BinlogWriter binlog_writer(test_binlog_path.str());
531
    binlog_writer.open().ensure();
532

533
    BinlogReader new_reader;
534
    size_t i = 0;
535
    for (auto& logevent : reader.logevents()) {
536
      binlog_writer.write_event(logevent, &new_reader).ensure();
537
      i++;
538
      if (i % 10 == 0) {
539
        binlog_writer.write_event(LogEvent(LogEventCrc32C(new_reader.crc32c())), &new_reader).ensure();
540
      }
541
    }
542
    binlog_writer.sync();
543
    binlog_writer.close().ensure();
544

545
    auto file_data = read_file(test_binlog_path).move_as_ok();
546
    ASSERT_TRUE(reader.logevents() == new_reader.logevents());
547
    new_binlog_data = file_data.as_slice().str();
548
    data = new_binlog_data;
549
    //ASSERT_EQ(data, file_data);
550
  }
551

552
  //Binlog write async
553
  {
554
    td::Binlog::destroy(test_binlog_path);
555
    td::BinlogWriterAsync binlog_writer(test_binlog_path.str());
556

557
    td::actor::Scheduler scheduler({2});
558

559
    BinlogReader new_reader;
560
    scheduler.run_in_context([&]() mutable {
561
      binlog_writer.open().ensure();
562
      for (auto& logevent : reader.logevents()) {
563
        binlog_writer.write_event(logevent, &new_reader).ensure();
564
      }
565
      binlog_writer.sync([&](Result<td::Unit> res) {
566
        res.ensure();
567
        binlog_writer.close([&](Result<td::Unit> res) {
568
          res.ensure();
569
          td::actor::SchedulerContext::get()->stop();
570
        });
571
      });
572
    });
573

574
    scheduler.run();
575
    scheduler.stop();
576

577
    auto file_data = read_file(test_binlog_path).move_as_ok();
578
    ASSERT_TRUE(reader.logevents() == new_reader.logevents());
579
    //ASSERT_EQ(data, file_data);
580
  }
581

582
  ASSERT_TRUE(!events || events.value() == reader.logevents());
583

584
  std::string new_data;
585
  for (auto& event : reader.logevents()) {
586
    int64 size = -event.serialize({});
587
    std::string event_data(td::narrow_cast<std::size_t>(size), '\0');
588
    int64 new_size = event.serialize(event_data);
589
    CHECK(new_size == size);
590
    new_data += event_data;
591
  }
592
  //ASSERT_EQ(data, new_data);
593

594
  // Binlog::read_sync
595
  {
596
    td::CSlice path("test.binlog");
597
    td::Binlog::destroy(path);
598
    td::write_file(path, data).ensure();
599

600
    td::Binlog binlog(path.str());
601
    BinlogReader binlog_reader;
602
    binlog.replay_sync(binlog_reader).ensure();
603

604
    ASSERT_EQ(reader.logevents().size(), binlog_reader.logevents().size());
605
    ASSERT_TRUE(reader.logevents() == binlog_reader.logevents());
606
  }
607

608
  // Binlog::read_async
609
  {
610
    td::Binlog::destroy(test_binlog_path);
611
    td::write_file(test_binlog_path, data).ensure();
612

613
    td::Binlog binlog(test_binlog_path.str());
614
    auto binlog_reader = std::make_shared<BinlogReader>();
615

616
    td::actor::Scheduler scheduler({2});
617
    scheduler.run_in_context([&]() mutable {
618
      binlog.replay_async(binlog_reader, [](Result<td::Unit> res) {
619
        res.ensure();
620
        td::actor::SchedulerContext::get()->stop();
621
      });
622
    });
623

624
    scheduler.run();
625
    scheduler.stop();
626

627
    ASSERT_EQ(reader.logevents().size(), binlog_reader->logevents().size());
628
    ASSERT_TRUE(reader.logevents() == binlog_reader->logevents());
629
  }
630
}
631

632
TEST(Binlog, Reader) {
633
  RandomBinlog binlog;
634
  test_binlog(binlog.data(), binlog.logevents());
635
}
636

637
TEST(Binlog, Hands) {
638
  std::string binlog = td::base64_decode(
639
                           "a0RCBAABKQCRMn1c2DaJhwrptxburpRtrWI2sjGhVbG29bFO0r8DDtAAExjZPKtjAAAAALwGAAAA"
640
                           "AAAAFvJq3qfzFCDWap+LUrgBI8sWFayIOQSxkBjV3CWgizHYNomHCum3Fu6ulG2tYjayMaFVsbb1"
641
                           "sU7SvwMO0AATGGO89BmAAAAA/////wEAAAAAAAAAAAAAgN4RAAAAAAAAa53L4ziGleZ7K+StAsBd"
642
                           "txMxbHHfuB9SJRFp+BMzXfnGnt8TsgFnig7j/xVRjtIsYUVw0rQZJUC0sWQROj0SHvplIkBV9vMp")
643
                           .move_as_ok();
644
  test_binlog(binlog);
645
}
646

647
TEST(Buffers, CyclicBufferSimple) {
648
  {
649
    auto reader_writer = td::CyclicBuffer::create();
650
    auto reader = std::move(reader_writer.first);
651
    auto writer = std::move(reader_writer.second);
652

653
    ASSERT_TRUE(!writer.is_reader_closed());
654
    reader.close_reader(td::Status::Error(2));
655
    ASSERT_TRUE(!reader.is_writer_closed());
656
    ASSERT_TRUE(writer.is_reader_closed());
657
    ASSERT_EQ(2, writer.reader_status().code());
658
  }
659
  {
660
    auto reader_writer = td::CyclicBuffer::create();
661
    auto reader = std::move(reader_writer.first);
662
    auto writer = std::move(reader_writer.second);
663

664
    ASSERT_TRUE(!reader.is_writer_closed());
665
    writer.close_writer(td::Status::Error(2));
666
    ASSERT_TRUE(!writer.is_reader_closed());
667
    ASSERT_TRUE(reader.is_writer_closed());
668
    ASSERT_EQ(2, reader.writer_status().code());
669
  }
670
  {
671
    td::CyclicBuffer::Options options;
672
    options.chunk_size = 14;
673
    options.count = 10;
674
    options.alignment = 7;
675
    auto reader_writer = td::CyclicBuffer::create(options);
676
    auto reader = std::move(reader_writer.first);
677
    auto writer = std::move(reader_writer.second);
678

679
    auto data = td::rand_string('a', 'z', 100001);
680
    td::Slice write_slice = data;
681
    td::Slice read_slice = data;
682
    for (size_t i = 1; i < options.count; i++) {
683
      ASSERT_EQ((i - 1) * options.chunk_size, reader.reader_size());
684
      ASSERT_EQ((i - 1) * options.chunk_size, writer.writer_size());
685
      auto slice = writer.prepare_write();
686
      ASSERT_EQ(0u, reinterpret_cast<td::uint64>(slice.data()) % options.alignment);
687
      auto to_copy = write_slice;
688
      to_copy.truncate(options.chunk_size);
689
      slice.copy_from(to_copy);
690
      write_slice = write_slice.substr(to_copy.size());
691
      writer.confirm_write(to_copy.size());
692
      ASSERT_EQ(i * options.chunk_size, reader.reader_size());
693
      ASSERT_EQ(i * options.chunk_size, writer.writer_size());
694
    }
695
    bool is_writer_closed = false;
696
    while (true) {
697
      {
698
        bool is_closed = reader.is_writer_closed();
699
        auto slice = reader.prepare_read();
700
        ASSERT_EQ(read_slice.substr(0, slice.size()), slice);
701
        read_slice = read_slice.substr(slice.size());
702
        reader.confirm_read(slice.size());
703
        if (is_closed && slice.empty()) {
704
          break;
705
        }
706
      }
707

708
      if (!is_writer_closed) {
709
        auto slice = writer.prepare_write();
710
        auto to_copy = write_slice;
711
        to_copy.truncate(options.chunk_size);
712
        if (to_copy.empty()) {
713
          writer.close_writer(td::Status::OK());
714
          is_writer_closed = true;
715
        } else {
716
          slice.copy_from(to_copy);
717
          write_slice = write_slice.substr(to_copy.size());
718
          writer.confirm_write(to_copy.size());
719
        }
720
      }
721
    }
722
    ASSERT_EQ(0u, write_slice.size());
723
    ASSERT_EQ(0u, read_slice.size());
724
  }
725
}
726

727
TEST(Buffers, CyclicBuffer) {
728
  for (int t = 0; t < 20; t++) {
729
    td::CyclicBuffer::Options options;
730
    options.chunk_size = 14;
731
    options.count = 10;
732
    options.alignment = 7;
733
    auto reader_writer = td::CyclicBuffer::create(options);
734
    auto reader = std::move(reader_writer.first);
735
    auto writer = std::move(reader_writer.second);
736
    auto data = td::rand_string('a', 'z', 100001);
737
    auto chunks = td::rand_split(data);
738

739
    size_t chunk_i = 0;
740
    std::string res;
741
    while (true) {
742
      if (td::Random::fast(0, 1) == 0) {
743
        bool is_closed = reader.is_writer_closed();
744
        auto slice = reader.prepare_read();
745
        res += slice.str();
746
        reader.confirm_read(slice.size());
747
        if (slice.empty() && is_closed) {
748
          reader.writer_status().ensure();
749
          break;
750
        }
751
      }
752
      if (chunk_i < chunks.size() && td::Random::fast(0, 1) == 0) {
753
        auto slice = writer.prepare_write();
754
        auto from = Slice(chunks[chunk_i]);
755
        auto copy = from.substr(0, slice.size());
756
        slice.copy_from(copy);
757
        writer.confirm_write(copy.size());
758
        auto left = from.substr(copy.size());
759
        if (!left.empty()) {
760
          chunks[chunk_i] = left.str();
761
        } else {
762
          chunk_i++;
763
          if (chunk_i == chunks.size()) {
764
            writer.close_writer(td::Status::OK());
765
          }
766
        }
767
      }
768
    }
769
    ASSERT_EQ(data, res);
770
  }
771
}
772

773
TEST(Buffers, ChainBuffer) {
774
  for (int t = 0; t < 20; t++) {
775
    td::ChainBuffer::Options options;
776
    options.chunk_size = 14;
777
    auto reader_writer = td::ChainBuffer::create(options);
778
    auto reader = std::move(reader_writer.first);
779
    auto writer = std::move(reader_writer.second);
780
    auto data = td::rand_string('a', 'z', 100001);
781
    auto chunks = td::rand_split(data);
782

783
    size_t chunk_i = 0;
784
    std::string res;
785
    while (true) {
786
      if (td::Random::fast(0, 1) == 0) {
787
        bool is_closed = reader.is_writer_closed();
788
        Slice slice;
789
        if (reader.reader_size() != 0) {
790
          slice = reader.prepare_read();
791
          res += slice.str();
792
          reader.confirm_read(slice.size());
793
        }
794
        if (slice.empty() && is_closed) {
795
          reader.writer_status().ensure();
796
          break;
797
        }
798
      }
799
      if (chunk_i < chunks.size() && td::Random::fast(0, 1) == 0) {
800
        writer.append(chunks[chunk_i]);
801
        chunk_i++;
802
        if (chunk_i == chunks.size()) {
803
          writer.close_writer(td::Status::OK());
804
        }
805
      }
806
    }
807
    ASSERT_EQ(data.size(), res.size());
808
    ASSERT_EQ(data, res);
809
  }
810
}
811

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.