2
This file is part of TON Blockchain Library.
4
TON Blockchain Library is free software: you can redistribute it and/or modify
5
it under the terms of the GNU Lesser General Public License as published by
6
the Free Software Foundation, either version 2 of the License, or
7
(at your option) any later version.
9
TON Blockchain Library is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU Lesser General Public License for more details.
14
You should have received a copy of the GNU Lesser General Public License
15
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
17
Copyright 2017-2020 Telegram Systems LLP
21
#include "BinlogReaderHelper.h"
23
#include "td/db/utils/StreamInterface.h"
24
#include "td/db/utils/ChainBuffer.h"
25
#include "td/db/utils/CyclicBuffer.h"
26
#include "td/db/utils/FileSyncState.h"
27
#include "td/db/utils/StreamToFileActor.h"
28
#include "td/db/utils/FileToStreamActor.h"
30
#include "td/actor/actor.h"
32
#include "td/utils/misc.h"
33
#include "td/utils/port/path.h"
34
#include "td/utils/VectorQueue.h"
38
class BinlogReplayActor : public actor::Actor {
40
BinlogReplayActor(StreamReader stream_reader, actor::ActorOwn<FileToStreamActor> file_to_stream,
41
std::shared_ptr<BinlogReaderInterface> binlog_reader, Promise<Unit> promise)
42
: stream_reader_(std::move(stream_reader))
43
, file_to_stream_(std::move(file_to_stream))
44
, binlog_reader_(std::move(binlog_reader))
45
, promise_(std::move(promise)) {
49
StreamReader stream_reader_;
50
actor::ActorOwn<FileToStreamActor> file_to_stream_;
51
std::shared_ptr<BinlogReaderInterface> binlog_reader_;
52
Promise<Unit> promise_;
54
bool is_writer_closed_{false};
55
BinlogReaderHelper binlog_reader_helper_;
57
unique_ptr<FileToStreamActor::Callback> create_callback() {
58
class Callback : public FileToStreamActor::Callback {
60
Callback(actor::ActorShared<> actor) : actor_(std::move(actor)) {
62
void got_more() override {
63
send_signals_later(actor_, actor::ActorSignals::wakeup());
67
actor::ActorShared<> actor_;
69
return make_unique<Callback>(actor_shared(this));
72
void start_up() override {
73
send_closure_later(file_to_stream_, &FileToStreamActor::set_callback, create_callback());
75
void notify_writer() {
76
send_signals_later(file_to_stream_, actor::ActorSignals::wakeup());
79
void loop() override {
80
auto status = do_loop();
81
if (status.is_error()) {
82
stream_reader_.close_reader(status.clone());
83
promise_.set_error(std::move(status));
86
if (is_writer_closed_) {
87
stream_reader_.close_reader(Status::OK());
88
promise_.set_value(Unit());
93
is_writer_closed_ = stream_reader_.is_writer_closed();
94
if (is_writer_closed_) {
95
TRY_STATUS(std::move(stream_reader_.writer_status()));
98
// TODO: watermark want_more/got_more logic
99
int64 got_size = stream_reader_.reader_size();
100
while (got_size > 0) {
101
auto slice = stream_reader_.prepare_read();
102
TRY_STATUS(binlog_reader_helper_.parse(*binlog_reader_, slice));
103
stream_reader_.confirm_read(slice.size());
104
got_size -= slice.size();
108
if (is_writer_closed_) {
109
if (binlog_reader_helper_.unparsed_size() != 0) {
110
return Status::Error(PSLICE() << "Got " << binlog_reader_helper_.unparsed_size()
111
<< " unparsed bytes in binlog");
119
Binlog::Binlog(string path) : path_(std::move(path)) {
122
Status Binlog::replay_sync(BinlogReaderInterface& binlog_reader) {
123
TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Read));
124
// No need to use Cyclic buffer, but CyclicBuffer is important for async version
125
CyclicBuffer::Options options;
126
options.chunk_size = 256;
128
auto reader_writer = CyclicBuffer::create(options);
130
auto buf_reader = std::move(reader_writer.first);
131
auto buf_writer = std::move(reader_writer.second);
133
TRY_RESULT(fd_size, fd.get_size());
135
BinlogReaderHelper helper;
136
while (fd_size != 0) {
137
auto read_to = buf_writer.prepare_write();
138
if (static_cast<int64>(read_to.size()) > fd_size) {
139
read_to.truncate(narrow_cast<size_t>(fd_size));
141
TRY_RESULT(read, fd.read(read_to));
143
return Status::Error("Unexpected end of file");
146
buf_writer.confirm_write(read);
148
auto data = buf_reader.prepare_read();
149
CHECK(data.size() == read);
150
TRY_STATUS(helper.parse(binlog_reader, data));
151
buf_reader.confirm_read(data.size());
154
if (helper.unparsed_size() != 0) {
155
return Status::Error(PSLICE() << "Got " << helper.unparsed_size() << " unparsed bytes in binlog");
159
//TODO: allow binlog truncate
163
void Binlog::replay_async(std::shared_ptr<BinlogReaderInterface> binlog_reader, Promise<Unit> promise) {
164
auto r_fd = FileFd::open(path_, FileFd::Flags::Read);
165
if (r_fd.is_error()) {
166
promise.set_error(r_fd.move_as_error());
169
auto fd = r_fd.move_as_ok();
170
CyclicBuffer::Options buf_options;
171
buf_options.chunk_size = 256;
172
auto reader_writer = CyclicBuffer::create(buf_options);
174
auto buf_reader = std::move(reader_writer.first);
175
auto buf_writer = std::move(reader_writer.second);
177
auto r_fd_size = fd.get_size();
178
if (r_fd_size.is_error()) {
179
promise.set_error(r_fd_size.move_as_error());
181
auto options = FileToStreamActor::Options{};
182
options.limit = r_fd_size.move_as_ok();
183
auto file_to_stream =
184
actor::create_actor<FileToStreamActor>("FileToStream", std::move(fd), std::move(buf_writer), options);
185
auto stream_to_binlog = actor::create_actor<BinlogReplayActor>(
186
"BinlogReplay", std::move(buf_reader), std::move(file_to_stream), std::move(binlog_reader), std::move(promise));
187
stream_to_binlog.release();
190
void Binlog::destroy(CSlice path) {
191
td::unlink(path).ignore();
194
void Binlog::destroy() {
198
BinlogWriter::BinlogWriter(std::string path) : path_(std::move(path)) {
201
Status BinlogWriter::open() {
202
TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Write | FileFd::Flags::Append | FileFd::Create));
204
ChainBuffer::Options buf_options;
205
buf_options.max_io_slices = 128;
206
buf_options.chunk_size = 256;
207
auto reader_writer = ChainBuffer::create(buf_options);
208
buf_reader_ = std::move(reader_writer.first);
209
buf_writer_ = std::move(reader_writer.second);
213
Status BinlogWriter::lazy_flush() {
214
if (buf_reader_.reader_size() < 512) {
220
Status BinlogWriter::flush() {
221
while (buf_reader_.reader_size() != 0) {
222
TRY_RESULT(written, fd_.writev(buf_reader_.prepare_readv()));
223
buf_reader_.confirm_read(written);
227
Status BinlogWriter::sync() {
232
Status BinlogWriter::close() {
239
class FlushHelperActor : public actor::Actor {
241
FlushHelperActor(FileSyncState::Reader sync_state_reader, actor::ActorOwn<StreamToFileActor> actor)
242
: sync_state_reader_(std::move(sync_state_reader)), actor_(std::move(actor)) {
247
void sync(size_t position, Promise<Unit> promise) {
248
sync_state_reader_.set_requested_sync_size(position);
250
queries_.emplace(position, std::move(promise));
252
send_signals_later(actor_, actor::ActorSignals::wakeup());
255
void close(Promise<> promise) {
256
close_promise_ = std::move(promise);
261
FileSyncState::Reader sync_state_reader_;
262
actor::ActorOwn<StreamToFileActor> actor_;
263
Promise<> close_promise_;
266
Query(size_t position, Promise<Unit> promise) : position(position), promise(std::move(promise)) {
269
Promise<Unit> promise;
271
VectorQueue<Query> queries_;
273
unique_ptr<StreamToFileActor::Callback> create_callback() {
274
class Callback : public StreamToFileActor::Callback {
276
Callback(actor::ActorShared<> actor) : actor_(std::move(actor)) {
278
void on_sync_state_changed() override {
279
send_signals_later(actor_, actor::ActorSignals::wakeup());
283
actor::ActorShared<> actor_;
285
return make_unique<Callback>(actor_shared(this));
288
void start_up() override {
289
send_closure_later(actor_, &StreamToFileActor::set_callback, create_callback());
292
void loop() override {
293
auto synced_position = sync_state_reader_.synced_size();
294
while (!queries_.empty() && queries_.front().position <= synced_position) {
295
queries_.front().promise.set_value(Unit());
300
void hangup_shared() override {
303
void tear_down() override {
304
if (close_promise_) {
305
close_promise_.set_value(Unit());
310
BinlogWriterAsync::BinlogWriterAsync(std::string path) : path_(std::move(path)) {
312
BinlogWriterAsync::~BinlogWriterAsync() = default;
314
Status BinlogWriterAsync::open() {
315
TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Write | FileFd::Flags::Append | FileFd::Create));
316
ChainBuffer::Options buf_options;
317
buf_options.max_io_slices = 128;
318
buf_options.chunk_size = 256;
319
auto reader_writer = ChainBuffer::create(buf_options);
320
buf_writer_ = std::move(reader_writer.second);
322
auto sync_state_reader_writer = td::FileSyncState::create();
323
auto writer_actor = actor::create_actor<StreamToFileActor>("StreamToFile", std::move(reader_writer.first),
324
std::move(fd), std::move(sync_state_reader_writer.second));
325
writer_actor_ = writer_actor.get();
326
sync_state_reader_ = std::move(sync_state_reader_writer.first);
328
flush_helper_actor_ =
329
actor::create_actor<detail::FlushHelperActor>("FlushHelperActor", sync_state_reader_, std::move(writer_actor));
334
void BinlogWriterAsync::close(Promise<> promise) {
335
send_closure(std::move(flush_helper_actor_), &detail::FlushHelperActor::close, std::move(promise));
338
void BinlogWriterAsync::lazy_flush() {
339
send_signals_later(writer_actor_, actor::ActorSignals::wakeup());
342
void BinlogWriterAsync::flush() {
343
send_closure(flush_helper_actor_, &detail::FlushHelperActor::flush);
345
void BinlogWriterAsync::sync(Promise<Unit> promise) {
346
send_closure(flush_helper_actor_, &detail::FlushHelperActor::sync, buf_writer_.writer_size(), std::move(promise));