Ton

Форк
0
/
Binlog.cpp 
493 строки · 14.9 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19
#include "crypto/block/Binlog.h"
20

21
#include "td/utils/as.h"
22
#include "td/utils/misc.h"
23
#include "td/utils/port/path.h"
24

25
#include <sstream>
26

27
namespace block {
28
/*
29
 * 
30
 *    GENERIC BINLOG (move to separate file)
31
 * 
32
 */
33

34
BinlogBuffer::BinlogBuffer(std::unique_ptr<BinlogCallback> cb, std::size_t _max_size, td::FileFd fd)
35
    : cb(std::move(cb))
36
    , need_more_bytes(0)
37
    , eptr(nullptr)
38
    , log_rpos(0)
39
    , log_cpos(0)
40
    , log_wpos(0)
41
    , fd(std::move(fd))
42
    , replica(false)
43
    , writing(false)
44
    , dirty(false)
45
    , created(false)
46
    , ok(false) {
47
  max_size = _max_size;
48
  start = static_cast<unsigned char*>(std::malloc(max_size));
49
  DCHECK(start);
50
  rptr = wptr = cptr = start;
51
  end = start + max_size;
52
}
53

54
unsigned char* BinlogBuffer::alloc_log_event_force(std::size_t size) {
55
  unsigned char* res = alloc_log_event(size);
56
  if (!res) {
57
    throw LevAllocError{size};
58
  }
59
  return res;
60
}
61

62
unsigned char* BinlogBuffer::try_alloc_log_event(std::size_t size) {
63
  if (!eptr) {
64
    if (end - wptr >= (long)size) {
65
      unsigned char* res = wptr;
66
      wptr += size;
67
      log_wpos += size;
68
      return res;
69
    }
70
    eptr = wptr;
71
    wptr = start;
72
    if (rptr == eptr) {
73
      rptr = start;
74
    }
75
    if (cptr == eptr) {
76
      cptr = start;
77
    }
78
  }
79
  if (rptr - wptr > (long)size) {
80
    unsigned char* res = wptr;
81
    wptr += size;
82
    log_wpos += size;
83
    return res;
84
  }
85
  return nullptr;
86
}
87

88
bool BinlogBuffer::flush(int mode) {
89
  auto r_res = try_flush(mode);
90
  if (r_res.is_ok()) {
91
    return r_res.ok();
92
  }
93
  std::string msg = PSTRING() << "cannot flush binlog file " << binlog_name << " at position " << log_rpos << " "
94
                              << r_res.error();
95
  LOG(ERROR) << msg;
96
  throw BinlogError{msg};
97
}
98

99
td::Result<bool> BinlogBuffer::try_flush(int mode) {
100
  LOG(DEBUG) << "in flush: writing=" << writing << " r=" << rptr - start << " c=" << cptr - start
101
             << " w=" << wptr - start << "; rp=" << log_rpos << " cp=" << log_cpos << " wp=" << log_wpos;
102
  if (!writing || rptr == cptr) {
103
    return false;  // nothing to flush
104
  }
105
  DCHECK(!fd.empty());  // must have an open binlog file
106
  while (rptr != cptr) {
107
    unsigned char* tptr = (cptr >= rptr ? cptr : eptr);
108
    DCHECK(rptr <= tptr);
109
    auto sz = tptr - rptr;
110
    if (sz) {
111
      LOG(INFO) << "writing " << sz << " bytes to binlog " << binlog_name << " at position " << log_rpos;
112
      TRY_RESULT(res, fd.pwrite(td::Slice(rptr, sz), log_rpos));
113
      if (static_cast<td::int64>(res) != sz) {
114
        return td::Status::Error(PSLICE() << "written " << res << " bytes instead of " << sz);
115
      }
116
      log_rpos += sz;
117
      rptr += sz;
118
    }
119
    if (rptr == eptr) {
120
      rptr = start;
121
      eptr = nullptr;
122
    }
123
  }
124
  if (mode >= 3) {
125
    LOG(INFO) << "syncing binlog " << binlog_name << " (position " << log_rpos << ")";
126
    TRY_STATUS(fd.sync());
127
  }
128
  return true;
129
}
130

131
unsigned char* BinlogBuffer::alloc_log_event(std::size_t size) {
132
  if (!writing) {
133
    throw BinlogError{"cannot create new binlog event: binlog not open for writing"};
134
  }
135
  if (size >= max_size || size > max_event_size) {
136
    return nullptr;
137
  }
138
  size = (size + 3) & -4;
139
  unsigned char* res = try_alloc_log_event(size);
140
  if (!res) {
141
    flush();
142
    return try_alloc_log_event(size);
143
  } else {
144
    return res;
145
  }
146
}
147

148
bool BinlogBuffer::commit_range(unsigned long long pos_start, unsigned long long pos_end) {
149
  // TODO: make something more clever, with partially committed/uncommitted segments in [cpos..wpos] range
150
  if (pos_start != log_cpos || pos_end < pos_start || pos_end > log_wpos) {
151
    return false;
152
  }
153
  if (!pos_start && pos_end >= pos_start + 4 && td::as<unsigned>(cptr) != 0x0442446b) {
154
    throw BinlogError{"incorrect magic"};
155
  }
156
  long long size = pos_end - pos_start;
157
  replay_range(cptr, pos_start, pos_end);
158
  log_cpos = pos_end;
159
  cptr += size;
160
  if (eptr && cptr >= eptr) {
161
    cptr -= eptr - start;
162
  }
163
  return true;
164
}
165

166
bool BinlogBuffer::rollback_range(unsigned long long pos_start, unsigned long long pos_end) {
167
  if (pos_start < log_cpos || pos_end < pos_start || pos_end != log_wpos) {
168
    return false;
169
  }
170
  long long size = pos_end - pos_start;
171
  log_wpos = pos_end;
172
  if (size >= wptr - start) {
173
    wptr -= size;
174
  } else {
175
    DCHECK(eptr);
176
    wptr += eptr - start - size;
177
  }
178
  return true;
179
}
180

181
void BinlogBuffer::NewBinlogEvent::commit() {
182
  //LOG(DEBUG) << "in NewBinlogEvent::commit (status = " << status << ")";
183
  if (!(status & 4)) {
184
    throw BinlogError{"cannot commit new binlog event: already committed or rolled back"};
185
  }
186
  if (!bb.commit_range(pos, pos + size)) {
187
    throw BinlogError{"cannot commit new binlog event: possibly some earlier log events are not committed yet"};
188
  }
189
  status = 1;
190
  //LOG(DEBUG) << "after NewBinlogEvent::commit (status = " << status << ")";
191
}
192

193
void BinlogBuffer::NewBinlogEvent::rollback() {
194
  if (!(status & 4)) {
195
    throw BinlogError{"cannot roll back new binlog event: already committed or rolled back"};
196
  }
197
  if (!bb.rollback_range(pos, pos + size)) {
198
    throw BinlogError{"cannot roll back new binlog event: possibly some later log event are already committed"};
199
  }
200
  status = 2;
201
}
202

203
BinlogBuffer::NewBinlogEvent::~NewBinlogEvent() {
204
  if (status & 4) {
205
    if (status == 5) {
206
      status = 4;
207
      commit();
208
    } else if (status == 6) {
209
      status = 4;
210
      rollback();
211
    } else {
212
      LOG(ERROR) << "newly-allocated binlog event is neither committed nor rolled back (automatically rolling back)";
213
      rollback();
214
    }
215
  }
216
}
217

218
void BinlogBuffer::replay_range(unsigned char* ptr, unsigned long long pos_start, unsigned long long pos_end) {
219
  unsigned char* tptr = (ptr <= wptr ? wptr : eptr);
220
  long long avail = tptr - ptr;
221
  while (pos_start < pos_end) {
222
    if (ptr == eptr) {
223
      ptr = start;
224
      tptr = wptr;
225
      avail = tptr - ptr;
226
      if (avail > (long long)(pos_end - pos_start)) {
227
        avail = pos_end - pos_start;
228
      }
229
    }
230
    int res = (avail >= 4 ? cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(ptr),
231
                                                 td::narrow_cast<size_t>(avail), pos_start)
232
                          : -0x7ffffffc);
233
    if (res <= 0 || res > avail) {
234
      std::ostringstream ss;
235
      ss << "cannot interpret newly-committed binlog event 0x" << std::hex
236
         << (avail >= 4 ? (unsigned)td::as<unsigned>(ptr) : 0u) << std::dec << ": error " << res;
237
      throw BinlogError{ss.str()};
238
    }
239
    ptr += res;
240
    pos_start += res;
241
    avail -= res;
242
  }
243
}
244

245
int BinlogBuffer::replay_pending(bool allow_partial) {
246
  if (rptr == cptr) {
247
    return 0;
248
  }
249
  unsigned char* tptr = (rptr <= cptr ? cptr : eptr);
250
  long long avail = tptr - rptr;
251
  DCHECK(tptr && avail >= 0);
252
  while (rptr != cptr) {
253
    int res = (avail >= 4 ? cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(rptr),
254
                                                 td::narrow_cast<size_t>(avail), log_rpos)
255
                          : -0x7ffffffc);
256
    if (res > 0) {
257
      if (res > avail) {
258
        throw BinlogError{"binlog event used more bytes than available"};
259
      }
260
      avail -= res;
261
      log_rpos += res;
262
      rptr += res;
263
      if (rptr != eptr) {
264
        continue;
265
      }
266
      rptr = start;
267
      tptr = cptr;
268
      avail = tptr - rptr;
269
      continue;
270
    }
271
    long long prev_need = 0;
272
    while (res < -0x40000000) {
273
      long long need = res - 0x80000000;
274
      need = (need + 3) & -4;
275
      if (need > (long long)max_event_size) {
276
        throw BinlogError{"binlog event requires too many bytes"};
277
      }
278
      if (need <= avail) {
279
        throw BinlogError{"binlog event requires more bytes, but we already had them"};
280
      }
281
      if (need <= prev_need) {
282
        throw BinlogError{"binlog event requires more bytes, but we already had them"};
283
      }
284
      prev_need = need;
285
      long long total_avail = avail + (rptr > cptr ? cptr - start : 0);
286
      if (need > total_avail) {
287
        if (allow_partial) {
288
          need_more_bytes = td::narrow_cast<size_t>(need - total_avail);
289
          return 2;
290
        } else {
291
          throw BinlogError{"binlog event extends past end of buffer"};
292
        }
293
      }
294
      if (need <= 1024) {
295
        unsigned char tmp[1024];
296
        std::memcpy(tmp, rptr, td::narrow_cast<size_t>(avail));
297
        std::memcpy(tmp + avail, start, td::narrow_cast<size_t>(need - avail));
298
        res = cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(tmp), td::narrow_cast<size_t>(need),
299
                                   log_rpos);
300
      } else {
301
        unsigned char* tmp = static_cast<unsigned char*>(std::malloc(td::narrow_cast<size_t>(need)));
302
        std::memcpy(tmp, rptr, td::narrow_cast<size_t>(avail));
303
        std::memcpy(tmp + avail, start, td::narrow_cast<size_t>(need - avail));
304
        res = cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(tmp), td::narrow_cast<size_t>(need),
305
                                   log_rpos);
306
        std::free(tmp);
307
      }
308
      if (res > need) {
309
        throw BinlogError{"binlog event used more bytes than available"};
310
      }
311
    }
312
    if (res < 0) {
313
      return res;
314
    }
315
    if (!res) {
316
      throw BinlogError{"unknown error while interpreting binlog event"};
317
    }
318
    if (res < avail) {
319
      avail -= res;
320
      log_rpos += res;
321
      rptr += res;
322
      continue;
323
    }
324
    DCHECK(eptr);
325
    log_rpos += res;
326
    rptr += res;
327
    rptr = start + (rptr - eptr);
328
    eptr = nullptr;
329
    DCHECK(start <= rptr && rptr <= cptr && cptr <= wptr && wptr <= end);
330
  }
331
  return 1;
332
}
333

334
BinlogBuffer::~BinlogBuffer() {
335
  if (start) {
336
    if (writing) {
337
      flush(2);
338
    }
339
    std::free(start);
340
  }
341
}
342

343
td::Status BinlogBuffer::set_binlog(std::string new_binlog_name, int mode) {
344
  if (!binlog_name.empty() || !fd.empty()) {
345
    return td::Status::Error("binlog buffer already attached to a file");
346
  }
347
  td::int32 flags = td::FileFd::Read;
348
  if ((mode & 1) != 0) {
349
    flags |= td::FileFd::Write;
350
  }
351
  auto r_fd = td::FileFd::open(new_binlog_name, flags, 0640);
352
  if (r_fd.is_error()) {
353
    if (!(~mode & 3)) {
354
      TRY_RESULT(new_fd, td::FileFd::open(new_binlog_name, flags | td::FileFd::CreateNew, 0640));
355
      fd = std::move(new_fd);
356
      created = true;
357
    } else {
358
      return r_fd.move_as_error();
359
    }
360
  } else {
361
    fd = r_fd.move_as_ok();
362
  }
363
  replica = !(mode & 1);
364
  if (!replica) {
365
    TRY_STATUS(fd.lock(td::FileFd::LockFlags::Write, new_binlog_name, 100));
366
  }
367
  if (created) {
368
    writing = true;
369
    td::Status res;
370
    try {
371
      res = cb->init_new_binlog(*this);
372
    } catch (BinlogBuffer::BinlogError& err) {
373
      res = td::Status::Error(err.msg);
374
    }
375
    if (res.is_error()) {
376
      fd.close();
377
      td::unlink(new_binlog_name).ignore();
378
      writing = false;
379
      return res;
380
    }
381
    binlog_name = new_binlog_name;
382
    ok = true;
383
    return td::Status::OK();
384
  }
385
  binlog_name = new_binlog_name;
386
  auto res = replay_binlog(replica);
387
  if (res.is_error()) {
388
    return res.move_as_error();
389
  }
390
  if (!replica) {
391
    if (log_rpos != log_wpos || log_rpos != log_cpos || rptr != wptr || rptr != cptr) {
392
      std::string msg = (PSLICE() << "error while interpreting binlog `" << binlog_name << "`: " << log_wpos - log_rpos
393
                                  << " bytes left uninterpreted at position " << log_rpos << ", truncated binlog?")
394
                            .c_str();
395
      LOG(ERROR) << msg;
396
      return td::Status::Error(msg);
397
    }
398
    //rptr = wptr = cptr = start;
399
    //eptr = nullptr;
400
    LOG(INFO) << "read and interpreted " << res.move_as_ok() << " bytes from binlog `" << binlog_name
401
              << "`, final position " << log_rpos << ", reopening in write mode";
402
    writing = true;
403
    if (!log_rpos) {
404
      td::Status status;
405
      try {
406
        status = cb->init_new_binlog(*this);
407
      } catch (BinlogBuffer::BinlogError& err) {
408
        status = td::Status::Error(err.msg);
409
      }
410
      if (status.is_error()) {
411
        fd.close();
412
        td::unlink(new_binlog_name).ignore();
413
        writing = false;
414
        return status;
415
      }
416
    }
417
  }
418
  ok = true;
419
  return td::Status::OK();
420
}
421

422
td::Result<long long> BinlogBuffer::replay_binlog(bool allow_partial) {
423
  if (writing) {
424
    return 0;
425
  }
426
  long long total = 0;
427
  while (true) {
428
    auto res = read_file();
429
    if (res.is_error()) {
430
      return res.move_as_error();
431
    }
432
    long long sz = res.move_as_ok();
433
    total += sz;
434
    try {
435
      cptr = wptr;
436
      log_cpos = log_wpos;
437
      if (!log_rpos && rptr == start && wptr >= rptr + 4 && td::as<unsigned>(rptr) != 0x0442446b) {
438
        throw BinlogError{"incorrect magic"};
439
      }
440
      int r = replay_pending(allow_partial || sz != 0);
441
      if (r < 0 && r >= -0x40000000) {
442
        throw InterpretError{(PSLICE() << "binlog error " << r).c_str()};
443
      }
444
    } catch (BinlogError err) {
445
      LOG(ERROR) << "error reading binlog " << binlog_name << ": " << err.msg << " at position " << log_rpos;
446
      return td::Status::Error(PSLICE() << "error reading binlog " << binlog_name << ": " << err.msg << " at position "
447
                                        << log_rpos);
448
    } catch (InterpretError err) {
449
      LOG(ERROR) << "error interpreting binlog " << binlog_name << ": " << err.msg << " at position " << log_rpos;
450
      return td::Status::Error(PSLICE() << "error interpreting binlog " << binlog_name << ": " << err.msg
451
                                        << " at position " << log_rpos);
452
    }
453
    if (!sz) {
454
      break;
455
    }
456
  };
457
  return total;
458
}
459

460
td::Result<int> BinlogBuffer::read_file() {
461
  unsigned char* ptr = wptr;
462
  std::size_t sz = end - wptr;
463
  if (rptr > wptr) {
464
    DCHECK(eptr);
465
    sz = rptr - wptr;
466
    if (sz <= 4) {
467
      return 0;  // buffer full
468
    }
469
    sz -= 4;
470
  } else if (!sz) {
471
    DCHECK(!eptr);
472
    if (rptr <= start + 4) {
473
      return 0;  // buffer full
474
    }
475
    eptr = end;
476
    ptr = wptr = start;
477
    sz = rptr - start - 4;
478
  }
479
  auto r_res = fd.pread(td::MutableSlice(ptr, sz), log_wpos);
480
  if (r_res.is_error()) {
481
    std::string msg = PSTRING() << "error reading binlog file `" << binlog_name << "` at position " << log_wpos << " : "
482
                                << r_res.error();
483
    LOG(ERROR) << msg;
484
    return td::Status::Error(msg);
485
  }
486
  auto res = r_res.move_as_ok();
487
  DCHECK(std::size_t(res) <= sz);
488
  LOG(INFO) << "read " << res << " bytes from binlog `" << binlog_name << "` at position " << log_wpos;
489
  log_wpos += res;
490
  wptr += res;
491
  return (int)res;
492
}
493
}  // namespace block
494

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.