2
This file is part of TON Blockchain Library.
4
TON Blockchain Library is free software: you can redistribute it and/or modify
5
it under the terms of the GNU Lesser General Public License as published by
6
the Free Software Foundation, either version 2 of the License, or
7
(at your option) any later version.
9
TON Blockchain Library is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU Lesser General Public License for more details.
14
You should have received a copy of the GNU Lesser General Public License
15
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
17
Copyright 2017-2020 Telegram Systems LLP
19
#include "crypto/block/Binlog.h"
21
#include "td/utils/as.h"
22
#include "td/utils/misc.h"
23
#include "td/utils/port/path.h"
30
* GENERIC BINLOG (move to separate file)
34
BinlogBuffer::BinlogBuffer(std::unique_ptr<BinlogCallback> cb, std::size_t _max_size, td::FileFd fd)
48
start = static_cast<unsigned char*>(std::malloc(max_size));
50
rptr = wptr = cptr = start;
51
end = start + max_size;
54
unsigned char* BinlogBuffer::alloc_log_event_force(std::size_t size) {
55
unsigned char* res = alloc_log_event(size);
57
throw LevAllocError{size};
62
unsigned char* BinlogBuffer::try_alloc_log_event(std::size_t size) {
64
if (end - wptr >= (long)size) {
65
unsigned char* res = wptr;
79
if (rptr - wptr > (long)size) {
80
unsigned char* res = wptr;
88
bool BinlogBuffer::flush(int mode) {
89
auto r_res = try_flush(mode);
93
std::string msg = PSTRING() << "cannot flush binlog file " << binlog_name << " at position " << log_rpos << " "
96
throw BinlogError{msg};
99
td::Result<bool> BinlogBuffer::try_flush(int mode) {
100
LOG(DEBUG) << "in flush: writing=" << writing << " r=" << rptr - start << " c=" << cptr - start
101
<< " w=" << wptr - start << "; rp=" << log_rpos << " cp=" << log_cpos << " wp=" << log_wpos;
102
if (!writing || rptr == cptr) {
103
return false; // nothing to flush
105
DCHECK(!fd.empty()); // must have an open binlog file
106
while (rptr != cptr) {
107
unsigned char* tptr = (cptr >= rptr ? cptr : eptr);
108
DCHECK(rptr <= tptr);
109
auto sz = tptr - rptr;
111
LOG(INFO) << "writing " << sz << " bytes to binlog " << binlog_name << " at position " << log_rpos;
112
TRY_RESULT(res, fd.pwrite(td::Slice(rptr, sz), log_rpos));
113
if (static_cast<td::int64>(res) != sz) {
114
return td::Status::Error(PSLICE() << "written " << res << " bytes instead of " << sz);
125
LOG(INFO) << "syncing binlog " << binlog_name << " (position " << log_rpos << ")";
126
TRY_STATUS(fd.sync());
131
unsigned char* BinlogBuffer::alloc_log_event(std::size_t size) {
133
throw BinlogError{"cannot create new binlog event: binlog not open for writing"};
135
if (size >= max_size || size > max_event_size) {
138
size = (size + 3) & -4;
139
unsigned char* res = try_alloc_log_event(size);
142
return try_alloc_log_event(size);
148
bool BinlogBuffer::commit_range(unsigned long long pos_start, unsigned long long pos_end) {
149
// TODO: make something more clever, with partially committed/uncommitted segments in [cpos..wpos] range
150
if (pos_start != log_cpos || pos_end < pos_start || pos_end > log_wpos) {
153
if (!pos_start && pos_end >= pos_start + 4 && td::as<unsigned>(cptr) != 0x0442446b) {
154
throw BinlogError{"incorrect magic"};
156
long long size = pos_end - pos_start;
157
replay_range(cptr, pos_start, pos_end);
160
if (eptr && cptr >= eptr) {
161
cptr -= eptr - start;
166
bool BinlogBuffer::rollback_range(unsigned long long pos_start, unsigned long long pos_end) {
167
if (pos_start < log_cpos || pos_end < pos_start || pos_end != log_wpos) {
170
long long size = pos_end - pos_start;
172
if (size >= wptr - start) {
176
wptr += eptr - start - size;
181
void BinlogBuffer::NewBinlogEvent::commit() {
182
//LOG(DEBUG) << "in NewBinlogEvent::commit (status = " << status << ")";
184
throw BinlogError{"cannot commit new binlog event: already committed or rolled back"};
186
if (!bb.commit_range(pos, pos + size)) {
187
throw BinlogError{"cannot commit new binlog event: possibly some earlier log events are not committed yet"};
190
//LOG(DEBUG) << "after NewBinlogEvent::commit (status = " << status << ")";
193
void BinlogBuffer::NewBinlogEvent::rollback() {
195
throw BinlogError{"cannot roll back new binlog event: already committed or rolled back"};
197
if (!bb.rollback_range(pos, pos + size)) {
198
throw BinlogError{"cannot roll back new binlog event: possibly some later log event are already committed"};
203
BinlogBuffer::NewBinlogEvent::~NewBinlogEvent() {
208
} else if (status == 6) {
212
LOG(ERROR) << "newly-allocated binlog event is neither committed nor rolled back (automatically rolling back)";
218
void BinlogBuffer::replay_range(unsigned char* ptr, unsigned long long pos_start, unsigned long long pos_end) {
219
unsigned char* tptr = (ptr <= wptr ? wptr : eptr);
220
long long avail = tptr - ptr;
221
while (pos_start < pos_end) {
226
if (avail > (long long)(pos_end - pos_start)) {
227
avail = pos_end - pos_start;
230
int res = (avail >= 4 ? cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(ptr),
231
td::narrow_cast<size_t>(avail), pos_start)
233
if (res <= 0 || res > avail) {
234
std::ostringstream ss;
235
ss << "cannot interpret newly-committed binlog event 0x" << std::hex
236
<< (avail >= 4 ? (unsigned)td::as<unsigned>(ptr) : 0u) << std::dec << ": error " << res;
237
throw BinlogError{ss.str()};
245
int BinlogBuffer::replay_pending(bool allow_partial) {
249
unsigned char* tptr = (rptr <= cptr ? cptr : eptr);
250
long long avail = tptr - rptr;
251
DCHECK(tptr && avail >= 0);
252
while (rptr != cptr) {
253
int res = (avail >= 4 ? cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(rptr),
254
td::narrow_cast<size_t>(avail), log_rpos)
258
throw BinlogError{"binlog event used more bytes than available"};
271
long long prev_need = 0;
272
while (res < -0x40000000) {
273
long long need = res - 0x80000000;
274
need = (need + 3) & -4;
275
if (need > (long long)max_event_size) {
276
throw BinlogError{"binlog event requires too many bytes"};
279
throw BinlogError{"binlog event requires more bytes, but we already had them"};
281
if (need <= prev_need) {
282
throw BinlogError{"binlog event requires more bytes, but we already had them"};
285
long long total_avail = avail + (rptr > cptr ? cptr - start : 0);
286
if (need > total_avail) {
288
need_more_bytes = td::narrow_cast<size_t>(need - total_avail);
291
throw BinlogError{"binlog event extends past end of buffer"};
295
unsigned char tmp[1024];
296
std::memcpy(tmp, rptr, td::narrow_cast<size_t>(avail));
297
std::memcpy(tmp + avail, start, td::narrow_cast<size_t>(need - avail));
298
res = cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(tmp), td::narrow_cast<size_t>(need),
301
unsigned char* tmp = static_cast<unsigned char*>(std::malloc(td::narrow_cast<size_t>(need)));
302
std::memcpy(tmp, rptr, td::narrow_cast<size_t>(avail));
303
std::memcpy(tmp + avail, start, td::narrow_cast<size_t>(need - avail));
304
res = cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(tmp), td::narrow_cast<size_t>(need),
309
throw BinlogError{"binlog event used more bytes than available"};
316
throw BinlogError{"unknown error while interpreting binlog event"};
327
rptr = start + (rptr - eptr);
329
DCHECK(start <= rptr && rptr <= cptr && cptr <= wptr && wptr <= end);
334
BinlogBuffer::~BinlogBuffer() {
343
td::Status BinlogBuffer::set_binlog(std::string new_binlog_name, int mode) {
344
if (!binlog_name.empty() || !fd.empty()) {
345
return td::Status::Error("binlog buffer already attached to a file");
347
td::int32 flags = td::FileFd::Read;
348
if ((mode & 1) != 0) {
349
flags |= td::FileFd::Write;
351
auto r_fd = td::FileFd::open(new_binlog_name, flags, 0640);
352
if (r_fd.is_error()) {
354
TRY_RESULT(new_fd, td::FileFd::open(new_binlog_name, flags | td::FileFd::CreateNew, 0640));
355
fd = std::move(new_fd);
358
return r_fd.move_as_error();
361
fd = r_fd.move_as_ok();
363
replica = !(mode & 1);
365
TRY_STATUS(fd.lock(td::FileFd::LockFlags::Write, new_binlog_name, 100));
371
res = cb->init_new_binlog(*this);
372
} catch (BinlogBuffer::BinlogError& err) {
373
res = td::Status::Error(err.msg);
375
if (res.is_error()) {
377
td::unlink(new_binlog_name).ignore();
381
binlog_name = new_binlog_name;
383
return td::Status::OK();
385
binlog_name = new_binlog_name;
386
auto res = replay_binlog(replica);
387
if (res.is_error()) {
388
return res.move_as_error();
391
if (log_rpos != log_wpos || log_rpos != log_cpos || rptr != wptr || rptr != cptr) {
392
std::string msg = (PSLICE() << "error while interpreting binlog `" << binlog_name << "`: " << log_wpos - log_rpos
393
<< " bytes left uninterpreted at position " << log_rpos << ", truncated binlog?")
396
return td::Status::Error(msg);
398
//rptr = wptr = cptr = start;
400
LOG(INFO) << "read and interpreted " << res.move_as_ok() << " bytes from binlog `" << binlog_name
401
<< "`, final position " << log_rpos << ", reopening in write mode";
406
status = cb->init_new_binlog(*this);
407
} catch (BinlogBuffer::BinlogError& err) {
408
status = td::Status::Error(err.msg);
410
if (status.is_error()) {
412
td::unlink(new_binlog_name).ignore();
419
return td::Status::OK();
422
td::Result<long long> BinlogBuffer::replay_binlog(bool allow_partial) {
428
auto res = read_file();
429
if (res.is_error()) {
430
return res.move_as_error();
432
long long sz = res.move_as_ok();
437
if (!log_rpos && rptr == start && wptr >= rptr + 4 && td::as<unsigned>(rptr) != 0x0442446b) {
438
throw BinlogError{"incorrect magic"};
440
int r = replay_pending(allow_partial || sz != 0);
441
if (r < 0 && r >= -0x40000000) {
442
throw InterpretError{(PSLICE() << "binlog error " << r).c_str()};
444
} catch (BinlogError err) {
445
LOG(ERROR) << "error reading binlog " << binlog_name << ": " << err.msg << " at position " << log_rpos;
446
return td::Status::Error(PSLICE() << "error reading binlog " << binlog_name << ": " << err.msg << " at position "
448
} catch (InterpretError err) {
449
LOG(ERROR) << "error interpreting binlog " << binlog_name << ": " << err.msg << " at position " << log_rpos;
450
return td::Status::Error(PSLICE() << "error interpreting binlog " << binlog_name << ": " << err.msg
451
<< " at position " << log_rpos);
460
td::Result<int> BinlogBuffer::read_file() {
461
unsigned char* ptr = wptr;
462
std::size_t sz = end - wptr;
467
return 0; // buffer full
472
if (rptr <= start + 4) {
473
return 0; // buffer full
477
sz = rptr - start - 4;
479
auto r_res = fd.pread(td::MutableSlice(ptr, sz), log_wpos);
480
if (r_res.is_error()) {
481
std::string msg = PSTRING() << "error reading binlog file `" << binlog_name << "` at position " << log_wpos << " : "
484
return td::Status::Error(msg);
486
auto res = r_res.move_as_ok();
487
DCHECK(std::size_t(res) <= sz);
488
LOG(INFO) << "read " << res << " bytes from binlog `" << binlog_name << "` at position " << log_wpos;