Ton

Форк
0
/
Binlog.cpp 
349 строк · 10.7 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19
#include "Binlog.h"
20

21
#include "BinlogReaderHelper.h"
22

23
#include "td/db/utils/StreamInterface.h"
24
#include "td/db/utils/ChainBuffer.h"
25
#include "td/db/utils/CyclicBuffer.h"
26
#include "td/db/utils/FileSyncState.h"
27
#include "td/db/utils/StreamToFileActor.h"
28
#include "td/db/utils/FileToStreamActor.h"
29

30
#include "td/actor/actor.h"
31

32
#include "td/utils/misc.h"
33
#include "td/utils/port/path.h"
34
#include "td/utils/VectorQueue.h"
35

36
namespace td {
37
namespace {
38
class BinlogReplayActor : public actor::Actor {
39
 public:
40
  BinlogReplayActor(StreamReader stream_reader, actor::ActorOwn<FileToStreamActor> file_to_stream,
41
                    std::shared_ptr<BinlogReaderInterface> binlog_reader, Promise<Unit> promise)
42
      : stream_reader_(std::move(stream_reader))
43
      , file_to_stream_(std::move(file_to_stream))
44
      , binlog_reader_(std::move(binlog_reader))
45
      , promise_(std::move(promise)) {
46
  }
47

48
 private:
49
  StreamReader stream_reader_;
50
  actor::ActorOwn<FileToStreamActor> file_to_stream_;
51
  std::shared_ptr<BinlogReaderInterface> binlog_reader_;
52
  Promise<Unit> promise_;
53

54
  bool is_writer_closed_{false};
55
  BinlogReaderHelper binlog_reader_helper_;
56

57
  unique_ptr<FileToStreamActor::Callback> create_callback() {
58
    class Callback : public FileToStreamActor::Callback {
59
     public:
60
      Callback(actor::ActorShared<> actor) : actor_(std::move(actor)) {
61
      }
62
      void got_more() override {
63
        send_signals_later(actor_, actor::ActorSignals::wakeup());
64
      }
65

66
     private:
67
      actor::ActorShared<> actor_;
68
    };
69
    return make_unique<Callback>(actor_shared(this));
70
  }
71

72
  void start_up() override {
73
    send_closure_later(file_to_stream_, &FileToStreamActor::set_callback, create_callback());
74
  }
75
  void notify_writer() {
76
    send_signals_later(file_to_stream_, actor::ActorSignals::wakeup());
77
  }
78

79
  void loop() override {
80
    auto status = do_loop();
81
    if (status.is_error()) {
82
      stream_reader_.close_reader(status.clone());
83
      promise_.set_error(std::move(status));
84
      return stop();
85
    }
86
    if (is_writer_closed_) {
87
      stream_reader_.close_reader(Status::OK());
88
      promise_.set_value(Unit());
89
      return stop();
90
    }
91
  }
92
  Status do_loop() {
93
    is_writer_closed_ = stream_reader_.is_writer_closed();
94
    if (is_writer_closed_) {
95
      TRY_STATUS(std::move(stream_reader_.writer_status()));
96
    }
97

98
    // TODO: watermark want_more/got_more logic
99
    int64 got_size = stream_reader_.reader_size();
100
    while (got_size > 0) {
101
      auto slice = stream_reader_.prepare_read();
102
      TRY_STATUS(binlog_reader_helper_.parse(*binlog_reader_, slice));
103
      stream_reader_.confirm_read(slice.size());
104
      got_size -= slice.size();
105
    }
106
    notify_writer();
107

108
    if (is_writer_closed_) {
109
      if (binlog_reader_helper_.unparsed_size() != 0) {
110
        return Status::Error(PSLICE() << "Got " << binlog_reader_helper_.unparsed_size()
111
                                      << " unparsed bytes in binlog");
112
      }
113
    }
114

115
    return Status::OK();
116
  }
117
};
118
}  // namespace
119
Binlog::Binlog(string path) : path_(std::move(path)) {
120
}
121

122
Status Binlog::replay_sync(BinlogReaderInterface& binlog_reader) {
123
  TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Read));
124
  // No need to use Cyclic buffer, but CyclicBuffer is important for async version
125
  CyclicBuffer::Options options;
126
  options.chunk_size = 256;
127
  options.count = 1;
128
  auto reader_writer = CyclicBuffer::create(options);
129

130
  auto buf_reader = std::move(reader_writer.first);
131
  auto buf_writer = std::move(reader_writer.second);
132

133
  TRY_RESULT(fd_size, fd.get_size());
134

135
  BinlogReaderHelper helper;
136
  while (fd_size != 0) {
137
    auto read_to = buf_writer.prepare_write();
138
    if (static_cast<int64>(read_to.size()) > fd_size) {
139
      read_to.truncate(narrow_cast<size_t>(fd_size));
140
    }
141
    TRY_RESULT(read, fd.read(read_to));
142
    if (read == 0) {
143
      return Status::Error("Unexpected end of file");
144
    }
145
    fd_size -= read;
146
    buf_writer.confirm_write(read);
147

148
    auto data = buf_reader.prepare_read();
149
    CHECK(data.size() == read);
150
    TRY_STATUS(helper.parse(binlog_reader, data));
151
    buf_reader.confirm_read(data.size());
152
  }
153

154
  if (helper.unparsed_size() != 0) {
155
    return Status::Error(PSLICE() << "Got " << helper.unparsed_size() << " unparsed bytes in binlog");
156
  }
157

158
  //TODO: check crc32
159
  //TODO: allow binlog truncate
160
  return Status::OK();
161
}
162

163
void Binlog::replay_async(std::shared_ptr<BinlogReaderInterface> binlog_reader, Promise<Unit> promise) {
164
  auto r_fd = FileFd::open(path_, FileFd::Flags::Read);
165
  if (r_fd.is_error()) {
166
    promise.set_error(r_fd.move_as_error());
167
    return;
168
  }
169
  auto fd = r_fd.move_as_ok();
170
  CyclicBuffer::Options buf_options;
171
  buf_options.chunk_size = 256;
172
  auto reader_writer = CyclicBuffer::create(buf_options);
173

174
  auto buf_reader = std::move(reader_writer.first);
175
  auto buf_writer = std::move(reader_writer.second);
176

177
  auto r_fd_size = fd.get_size();
178
  if (r_fd_size.is_error()) {
179
    promise.set_error(r_fd_size.move_as_error());
180
  }
181
  auto options = FileToStreamActor::Options{};
182
  options.limit = r_fd_size.move_as_ok();
183
  auto file_to_stream =
184
      actor::create_actor<FileToStreamActor>("FileToStream", std::move(fd), std::move(buf_writer), options);
185
  auto stream_to_binlog = actor::create_actor<BinlogReplayActor>(
186
      "BinlogReplay", std::move(buf_reader), std::move(file_to_stream), std::move(binlog_reader), std::move(promise));
187
  stream_to_binlog.release();
188
}
189

190
void Binlog::destroy(CSlice path) {
191
  td::unlink(path).ignore();
192
}
193

194
void Binlog::destroy() {
195
  destroy(path_);
196
}
197

198
BinlogWriter::BinlogWriter(std::string path) : path_(std::move(path)) {
199
}
200

201
Status BinlogWriter::open() {
202
  TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Write | FileFd::Flags::Append | FileFd::Create));
203
  fd_ = std::move(fd);
204
  ChainBuffer::Options buf_options;
205
  buf_options.max_io_slices = 128;
206
  buf_options.chunk_size = 256;
207
  auto reader_writer = ChainBuffer::create(buf_options);
208
  buf_reader_ = std::move(reader_writer.first);
209
  buf_writer_ = std::move(reader_writer.second);
210
  return Status::OK();
211
}
212

213
Status BinlogWriter::lazy_flush() {
214
  if (buf_reader_.reader_size() < 512) {
215
    return Status::OK();
216
  }
217
  return flush();
218
}
219

220
Status BinlogWriter::flush() {
221
  while (buf_reader_.reader_size() != 0) {
222
    TRY_RESULT(written, fd_.writev(buf_reader_.prepare_readv()));
223
    buf_reader_.confirm_read(written);
224
  }
225
  return Status::OK();
226
}
227
Status BinlogWriter::sync() {
228
  flush();
229
  return fd_.sync();
230
}
231

232
Status BinlogWriter::close() {
233
  sync();
234
  fd_.close();
235
  return Status::OK();
236
}
237

238
namespace detail {
239
class FlushHelperActor : public actor::Actor {
240
 public:
241
  FlushHelperActor(FileSyncState::Reader sync_state_reader, actor::ActorOwn<StreamToFileActor> actor)
242
      : sync_state_reader_(std::move(sync_state_reader)), actor_(std::move(actor)) {
243
  }
244
  void flush() {
245
    //TODO;
246
  }
247
  void sync(size_t position, Promise<Unit> promise) {
248
    sync_state_reader_.set_requested_sync_size(position);
249
    if (promise) {
250
      queries_.emplace(position, std::move(promise));
251
    }
252
    send_signals_later(actor_, actor::ActorSignals::wakeup());
253
  }
254

255
  void close(Promise<> promise) {
256
    close_promise_ = std::move(promise);
257
    actor_.reset();
258
  }
259

260
 private:
261
  FileSyncState::Reader sync_state_reader_;
262
  actor::ActorOwn<StreamToFileActor> actor_;
263
  Promise<> close_promise_;
264

265
  struct Query {
266
    Query(size_t position, Promise<Unit> promise) : position(position), promise(std::move(promise)) {
267
    }
268
    size_t position;
269
    Promise<Unit> promise;
270
  };
271
  VectorQueue<Query> queries_;
272

273
  unique_ptr<StreamToFileActor::Callback> create_callback() {
274
    class Callback : public StreamToFileActor::Callback {
275
     public:
276
      Callback(actor::ActorShared<> actor) : actor_(std::move(actor)) {
277
      }
278
      void on_sync_state_changed() override {
279
        send_signals_later(actor_, actor::ActorSignals::wakeup());
280
      }
281

282
     private:
283
      actor::ActorShared<> actor_;
284
    };
285
    return make_unique<Callback>(actor_shared(this));
286
  }
287

288
  void start_up() override {
289
    send_closure_later(actor_, &StreamToFileActor::set_callback, create_callback());
290
  }
291

292
  void loop() override {
293
    auto synced_position = sync_state_reader_.synced_size();
294
    while (!queries_.empty() && queries_.front().position <= synced_position) {
295
      queries_.front().promise.set_value(Unit());
296
      queries_.pop();
297
    }
298
  }
299

300
  void hangup_shared() override {
301
    stop();
302
  }
303
  void tear_down() override {
304
    if (close_promise_) {
305
      close_promise_.set_value(Unit());
306
    }
307
  }
308
};
309
}  // namespace detail
310
BinlogWriterAsync::BinlogWriterAsync(std::string path) : path_(std::move(path)) {
311
}
312
BinlogWriterAsync::~BinlogWriterAsync() = default;
313

314
Status BinlogWriterAsync::open() {
315
  TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Write | FileFd::Flags::Append | FileFd::Create));
316
  ChainBuffer::Options buf_options;
317
  buf_options.max_io_slices = 128;
318
  buf_options.chunk_size = 256;
319
  auto reader_writer = ChainBuffer::create(buf_options);
320
  buf_writer_ = std::move(reader_writer.second);
321

322
  auto sync_state_reader_writer = td::FileSyncState::create();
323
  auto writer_actor = actor::create_actor<StreamToFileActor>("StreamToFile", std::move(reader_writer.first),
324
                                                             std::move(fd), std::move(sync_state_reader_writer.second));
325
  writer_actor_ = writer_actor.get();
326
  sync_state_reader_ = std::move(sync_state_reader_writer.first);
327

328
  flush_helper_actor_ =
329
      actor::create_actor<detail::FlushHelperActor>("FlushHelperActor", sync_state_reader_, std::move(writer_actor));
330

331
  return Status::OK();
332
}
333

334
void BinlogWriterAsync::close(Promise<> promise) {
335
  send_closure(std::move(flush_helper_actor_), &detail::FlushHelperActor::close, std::move(promise));
336
  writer_actor_ = {};
337
}
338
void BinlogWriterAsync::lazy_flush() {
339
  send_signals_later(writer_actor_, actor::ActorSignals::wakeup());
340
}
341

342
void BinlogWriterAsync::flush() {
343
  send_closure(flush_helper_actor_, &detail::FlushHelperActor::flush);
344
}
345
void BinlogWriterAsync::sync(Promise<Unit> promise) {
346
  send_closure(flush_helper_actor_, &detail::FlushHelperActor::sync, buf_writer_.writer_size(), std::move(promise));
347
}
348

349
}  // namespace td
350

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.