2
This file is part of TON Blockchain source code.
4
TON Blockchain is free software; you can redistribute it and/or
5
modify it under the terms of the GNU General Public License
6
as published by the Free Software Foundation; either version 2
7
of the License, or (at your option) any later version.
9
TON Blockchain is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU General Public License for more details.
14
You should have received a copy of the GNU General Public License
15
along with TON Blockchain. If not, see <http://www.gnu.org/licenses/>.
17
In addition, as a special exception, the copyright holders give permission
18
to link the code of portions of this program with the OpenSSL library.
19
You must obey the GNU General Public License in all respects for all
20
of the code used other than OpenSSL. If you modify file(s) with this
21
exception, you may extend this exception to your version of the file(s),
22
but you are not obligated to do so. If you do not wish to do so, delete this
23
exception statement from your version. If you delete this exception statement
24
from all source files in the program, then also delete it here.
26
Copyright 2017-2020 Telegram Systems LLP
28
#include "td/utils/OptionParser.h"
29
#include "td/utils/filesystem.h"
30
#include "td/utils/port/FileFd.h"
31
#include "td/utils/Timer.h"
32
#include "td/utils/crypto.h"
33
#include "td/utils/BufferedReader.h"
34
#include "td/utils/optional.h"
35
#include "td/actor/actor.h"
37
#include "td/db/utils/StreamInterface.h"
38
#include "td/db/utils/ChainBuffer.h"
39
#include "td/db/utils/CyclicBuffer.h"
40
#include "td/db/utils/FileSyncState.h"
41
#include "td/db/utils/StreamToFileActor.h"
42
#include "td/db/utils/FileToStreamActor.h"
47
class AsyncCyclicBufferReader : public td::actor::Actor {
53
virtual void want_more() = 0;
54
virtual Status process(Slice data) = 0;
55
virtual void on_closed(Status status) = 0;
57
AsyncCyclicBufferReader(CyclicBuffer::Reader reader, td::unique_ptr<Callback> callback)
58
: reader_(std::move(reader)), callback_(std::move(callback)) {
62
CyclicBuffer::Reader reader_;
63
td::unique_ptr<Callback> callback_;
65
void loop() override {
67
auto data = reader_.prepare_read();
69
if (reader_.is_writer_closed()) {
70
callback_->on_closed(std::move(reader_.writer_status()));
73
callback_->want_more();
76
auto status = callback_->process(data);
77
if (status.is_error()) {
78
callback_->on_closed(std::move(status));
80
reader_.confirm_read(data.size());
81
//TODO: better condition for want_more. May be reader should decide if it is ready for more writes
82
callback_->want_more();
91
void process(td::Slice slice) {
92
res = crc32c_extend(res, slice);
93
res2 = crc32c_extend(res2, slice);
104
void read_baseline(td::CSlice path) {
105
LOG(ERROR) << "BASELINE";
106
td::PerfWarningTimer timer("read file");
107
auto data = td::read_file(path).move_as_ok();
110
td::PerfWarningTimer process_timer("process file", 0);
112
processor.process(data.as_slice());
113
process_timer.reset();
114
LOG(ERROR) << processor.result();
117
void read_buffered(td::CSlice path, size_t buffer_size) {
118
LOG(ERROR) << "BufferedReader";
119
auto fd = td::FileFd::open(path, td::FileFd::Read).move_as_ok();
120
td::BufferedReader reader(fd, buffer_size);
121
std::vector<char> buf(buffer_size);
124
auto slice = td::MutableSlice(&buf[0], buf.size());
125
auto size = reader.read(slice).move_as_ok();
129
processor.process(slice.truncate(size));
131
LOG(ERROR) << processor.result();
134
void read_async(td::CSlice path, size_t buffer_size) {
135
LOG(ERROR) << "Async";
136
auto fd = td::FileFd::open(path, td::FileFd::Read).move_as_ok();
137
td::actor::Scheduler scheduler({2});
138
scheduler.run_in_context([&] {
139
auto reader_writer = td::CyclicBuffer::create();
142
td::actor::create_actor<td::FileToStreamActor>("Reader", std::move(fd), std::move(reader_writer.second));
143
class Callback : public td::AsyncCyclicBufferReader::Callback {
145
Callback(td::actor::ActorOwn<> reader) : reader_(std::move(reader)) {
147
void want_more() override {
148
td::actor::send_signals_later(reader_, td::actor::ActorSignals::wakeup());
150
td::Status process(td::Slice data) override {
151
processor.process(data);
152
return td::Status::OK();
154
void on_closed(td::Status status) override {
155
LOG(ERROR) << processor.result();
156
td::actor::SchedulerContext::get()->stop();
160
td::actor::ActorOwn<> reader_;
163
auto reader_copy = reader.get();
164
auto callback = td::make_unique<Callback>(std::move(reader));
165
auto processor = td::actor::create_actor<td::AsyncCyclicBufferReader>(
166
"BufferReader", std::move(reader_writer.first), std::move(callback));
167
class ReaderCallback : public td::FileToStreamActor::Callback {
169
ReaderCallback(td::actor::ActorId<> actor) : actor_(std::move(actor)) {
171
void got_more() override {
172
td::actor::send_signals_later(actor_, td::actor::ActorSignals::wakeup());
176
td::actor::ActorId<> actor_;
178
send_closure(reader_copy, &td::FileToStreamActor::set_callback,
179
td::make_unique<ReaderCallback>(processor.release()));
184
static char o_direct_buf[100000000];
185
void read_o_direct(td::CSlice path, size_t buffer_size) {
186
LOG(ERROR) << "Direct";
187
auto fd = td::FileFd::open(path, td::FileFd::Read | td::FileFd::Direct).move_as_ok();
190
reinterpret_cast<char *>((reinterpret_cast<std::uintptr_t>(o_direct_buf) + align - 1) & td::bits_negate64(align));
192
td::BufferedReader reader(fd, buffer_size);
195
auto slice = td::MutableSlice(ptr, buffer_size);
196
auto size = reader.read(slice).move_as_ok();
200
processor.process(slice.truncate(size));
202
LOG(ERROR) << processor.result();
207
operator bool() const {
208
return generated_size < total_size;
212
auto res = words_[2];
213
generated_size += res.size();
218
std::vector<std::string> words_{"a", "fjdksalfdfs", std::string(20, 'b'), std::string(1000, 'a')};
219
size_t total_size = (1 << 20) * 600;
220
size_t generated_size = 0;
223
void write_baseline(td::CSlice path, size_t buffer_size) {
224
LOG(ERROR) << "Baseline";
225
auto fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
227
std::vector<char> buf(buffer_size);
229
DataGenerator generator;
231
auto slice = generator.next();
232
fd.write(slice).ensure();
236
void write_buffered(td::CSlice path, size_t buffer_size) {
237
LOG(ERROR) << "Buffered";
238
auto fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
240
std::vector<char> buf(buffer_size);
244
auto slice = td::Slice(buf.data(), data_size);
245
fd.write(slice).ensure();
246
//auto io_slice = as_io_slice(slice);
247
//fd.writev({&io_slice, 1}).ensure();
250
auto append = [&](td::Slice slice) {
251
if (data_size + slice.size() > buffer_size) {
255
td::MutableSlice(buf.data(), buffer_size).substr(data_size).copy_from(slice);
256
data_size += slice.size();
259
DataGenerator generator;
261
auto slice = generator.next();
272
FileWriter(FileFd fd, size_t buffer_size) : fd_(std::move(fd)), raw_buffer_(buffer_size) {
274
buffer_slices_.reserve(1024);
275
strings_.reserve(1024);
276
ios_slices_.reserve(1024);
279
void append(std::string data) {
280
cached_size_ += data.size();
281
if (data.size() <= max_copy_size) {
284
CHECK(strings_.size() < strings_.capacity());
285
strings_.push_back(std::move(data));
286
ios_slices_.push_back(as_io_slice(strings_.back()));
287
should_merge_ = false;
292
void append(BufferSlice data) {
293
cached_size_ += data.size();
294
if (data.size() <= max_copy_size) {
297
buffer_slices_.push_back(std::move(data));
298
ios_slices_.push_back(as_io_slice(strings_.back()));
299
should_merge_ = false;
304
void append(Slice data) {
305
if (data.size() <= max_copy_size) {
308
} else if (data.size() > min_immediate_write_size) {
309
ios_slices_.push_back(as_io_slice(data));
312
append(BufferSlice(data));
317
if (ios_slices_.empty()) {
320
flushed_size_ += cached_size_;
321
fd_.writev(ios_slices_).ensure();
327
synced_size_ = flushed_size_;
331
bool may_flush() const {
332
return cached_size_ != 0;
334
size_t total_size() const {
335
return flushed_size() + cached_size_;
337
size_t flushed_size() const {
338
return flushed_size_;
340
size_t synced_size() const {
345
static constexpr size_t max_cached_size = 256 * (1 << 10);
346
static constexpr size_t min_immediate_write_size = 32 * (1 << 10);
350
std::vector<char> raw_buffer_;
351
size_t max_copy_size = min(raw_buffer_.size() / 8, size_t(4096u));
352
MutableSlice buffer_;
353
bool should_merge_ = false;
355
std::vector<BufferSlice> buffer_slices_;
356
std::vector<std::string> strings_;
357
std::vector<IoSlice> ios_slices_;
358
size_t cached_size_{0};
359
size_t flushed_size_{0};
360
size_t synced_size_{0};
362
void append_copy(Slice data) {
363
buffer_.copy_from(data);
365
auto back = as_slice(ios_slices_.back());
366
back = Slice(back.data(), back.size() + data.size());
367
ios_slices_.back() = as_io_slice(back);
369
ios_slices_.push_back(as_io_slice(buffer_.substr(0, data.size())));
370
should_merge_ = true;
372
buffer_ = buffer_.substr(data.size());
376
buffer_ = MutableSlice(raw_buffer_.data(), raw_buffer_.size());
377
buffer_slices_.clear();
380
should_merge_ = false;
384
bool must_flush() const {
385
return buffer_.size() < max_copy_size || ios_slices_.size() == ios_slices_.capacity() ||
386
cached_size_ >= max_cached_size;
396
class AsyncFileWriterActor : public actor::Actor {
398
AsyncFileWriterActor(FileSyncState::Reader state) : state_(std::move(state)) {
399
io_slices_.reserve(100);
404
ChainBufferReader reader_;
405
FileSyncState::Reader state_;
406
std::vector<IoSlice> io_slices_;
408
size_t flushed_size_{0};
409
size_t synced_size_{0};
412
reader_.sync_with_writer();
413
while (!reader_.empty()) {
414
auto it = reader_.clone();
415
size_t io_slices_size = 0;
416
while (!it.empty() && io_slices_.size() < io_slices_.capacity()) {
417
auto slice = it.prepare_read();
418
io_slices_.push_back(as_io_slice(slice));
419
io_slices_size += slice.size();
420
it.confirm_read(slice.size());
422
if (!io_slices_.empty()) {
423
auto r_written = fd_.writev(io_slices_);
424
LOG_IF(FATAL, r_written.is_error()) << r_written.error();
425
auto written = r_written.move_as_ok();
426
CHECK(written == io_slices_size);
427
flushed_size_ += written;
430
reader_ = std::move(it);
434
void loop() override {
435
reader_.sync_with_writer();
442
void write_vector(td::CSlice path, size_t buffer_size) {
443
LOG(ERROR) << "io vector";
444
auto fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
446
td::FileWriter writer(std::move(fd), buffer_size);
448
DataGenerator generator;
450
auto slice = generator.next();
451
writer.append(std::move(slice));
456
void write_async(td::CSlice path, size_t buffer_size) {
457
LOG(ERROR) << "Async";
458
auto fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
460
td::actor::Scheduler scheduler({1});
461
scheduler.run_in_context([&] {
462
class Writer : public td::actor::Actor {
464
Writer(td::FileFd fd, size_t buffer_size) : fd_(std::move(fd)), buffer_size_(buffer_size) {
466
class Callback : public td::StreamToFileActor::Callback {
468
Callback(td::actor::ActorShared<> parent) : parent_(std::move(parent)) {
470
void on_sync_state_changed() override {
471
td::actor::send_signals_later(parent_, td::actor::ActorSignals::wakeup());
475
td::actor::ActorShared<> parent_;
478
void start_up() override {
479
auto buffer_reader_writer = td::ChainBuffer::create();
480
buffer_writer_ = std::move(buffer_reader_writer.second);
481
auto buffer_reader = std::move(buffer_reader_writer.first);
483
auto sync_state_reader_writer = td::FileSyncState::create();
484
fd_sync_state_ = std::move(sync_state_reader_writer.first);
485
auto sync_state_writer = std::move(sync_state_reader_writer.second);
486
auto options = td::StreamToFileActor::Options{};
487
writer_ = td::actor::create_actor<td::StreamToFileActor>(td::actor::ActorOptions().with_name("FileWriterActor"),
488
std::move(buffer_reader), std::move(fd_),
489
std::move(sync_state_writer), options);
490
send_closure(writer_, &td::StreamToFileActor::set_callback, td::make_unique<Callback>(actor_shared(this)));
496
td::optional<td::ChainBuffer::Writer> buffer_writer_;
497
td::optional<td::FileSyncState::Reader> fd_sync_state_;
498
td::actor::ActorOwn<td::StreamToFileActor> writer_;
500
DataGenerator generator_;
501
size_t total_size_{0};
502
bool was_sync_{false};
504
void loop() override {
505
auto flushed_size = fd_sync_state_.value().flushed_size();
506
while (generator_ && total_size_ < flushed_size + buffer_size_ * 10) {
507
auto str = generator_.next();
508
total_size_ += str.size();
509
buffer_writer_.value().append(str);
511
td::actor::send_signals_later(writer_, td::actor::ActorSignals::wakeup());
514
} else if (!was_sync_) {
516
fd_sync_state_.value().set_requested_sync_size(total_size_);
517
td::actor::send_signals_later(writer_, td::actor::ActorSignals::wakeup());
519
if (fd_sync_state_.value().synced_size() == total_size_) {
523
void hangup_shared() override {
524
td::actor::SchedulerContext::get()->stop();
528
td::actor::create_actor<Writer>("Writer", std::move(fd), buffer_size).release();
533
void write_async2(td::CSlice path, size_t buffer_size) {
534
LOG(ERROR) << "Async2";
535
auto fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
537
td::actor::Scheduler scheduler({1});
538
scheduler.run_in_context([&] {
539
class Worker : public td::actor::Actor {
541
Worker(td::FileFd fd, td::ChainBufferReader reader, td::actor::ActorShared<> parent)
542
: fd_(std::move(fd)), reader_(std::move(reader)), parent_(std::move(parent)) {
547
td::ChainBufferReader reader_;
548
td::actor::ActorShared<> parent_;
549
void loop() override {
550
reader_.sync_with_writer();
551
while (!reader_.empty()) {
552
auto slice = reader_.prepare_read();
553
fd_.write(slice).ensure();
554
reader_.confirm_read(slice.size());
557
void hangup() override {
563
class Writer : public td::actor::Actor {
565
Writer(td::FileFd fd) : fd_(std::move(fd)) {
570
td::actor::ActorOwn<> worker_;
571
td::ChainBufferWriter writer_;
572
DataGenerator generator_;
574
void start_up() override {
576
td::actor::create_actor<Worker>("Worker", std::move(fd_), writer_.extract_reader(), actor_shared(this));
578
writer_.append(generator_.next(), 65536);
579
send_signals_later(worker_, td::actor::ActorSignals::wakeup());
583
void hangup_shared() override {
584
td::actor::SchedulerContext::get()->stop();
588
td::actor::create_actor<Writer>(td::actor::ActorOptions().with_name("Writer").with_poll(), std::move(fd)).release();
593
int main(int argc, char **argv) {
595
enum Type { Read, Write };
597
enum Mode { Baseline, Buffered, Direct, Async, WriteV, Async2 };
598
Mode mode = Baseline;
599
size_t buffer_size = 1024;
601
td::OptionParser options_parser;
602
options_parser.add_checked_option('f', td::Slice("from"), td::Slice("read from file"),
603
[&](td::Slice arg) -> td::Status {
605
return td::Status::OK();
607
options_parser.add_checked_option('m', td::Slice("mode"), td::Slice("mode"), [&](td::Slice arg) -> td::Status {
608
TRY_RESULT(x, td::to_integer_safe<int>(arg));
612
return td::Status::OK();
615
return td::Status::OK();
618
return td::Status::OK();
621
return td::Status::OK();
624
return td::Status::OK();
627
return td::Status::OK();
629
return td::Status::Error("unknown mode");
631
options_parser.add_checked_option('b', td::Slice("buffer"), td::Slice("buffer size"),
632
[&](td::Slice arg) -> td::Status {
633
TRY_RESULT(x, td::to_integer_safe<size_t>(arg));
635
return td::Status::OK();
638
auto status = options_parser.run(argc, argv);
639
if (status.is_error()) {
640
LOG(ERROR) << status.error() << "\n" << options_parser;
651
read_buffered(from, buffer_size);
654
read_o_direct(from, buffer_size);
657
read_async(from, buffer_size);
661
LOG(FATAL) << "Not supported mode for Read test";
667
write_baseline(from, buffer_size);
670
write_buffered(from, buffer_size);
673
write_vector(from, buffer_size);
676
write_async(from, buffer_size);
679
write_async2(from, buffer_size);
682
LOG(FATAL) << "Unimplemented";