2
This file is part of TON Blockchain Library.
4
TON Blockchain Library is free software: you can redistribute it and/or modify
5
it under the terms of the GNU Lesser General Public License as published by
6
the Free Software Foundation, either version 2 of the License, or
7
(at your option) any later version.
9
TON Blockchain Library is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU Lesser General Public License for more details.
14
You should have received a copy of the GNU Lesser General Public License
15
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
17
Copyright 2017-2020 Telegram Systems LLP
19
#include "state-serializer.hpp"
20
#include "td/utils/Random.h"
21
#include "adnl/utils.hpp"
22
#include "ton/ton-io.hpp"
23
#include "common/delay.h"
29
void AsyncStateSerializer::start_up() {
30
if (!opts_->get_state_serializer_enabled()) {
31
LOG(ERROR) << "Persistent state serializer is disabled";
33
alarm_timestamp() = td::Timestamp::in(1.0 + td::Random::fast(0, 10) * 1.0);
37
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<AsyncSerializerState> R) {
39
td::actor::send_closure(SelfId, &AsyncStateSerializer::got_self_state, R.move_as_ok());
41
td::actor::send_closure(manager_, &ValidatorManager::get_async_serializer_state, std::move(P));
44
void AsyncStateSerializer::got_self_state(AsyncSerializerState state) {
45
if (state.last_block_id.is_valid()) {
46
last_block_id_ = state.last_block_id;
47
last_key_block_id_ = state.last_written_block_id;
48
last_key_block_ts_ = state.last_written_block_ts;
54
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
56
td::actor::send_closure(SelfId, &AsyncStateSerializer::got_init_handle, R.move_as_ok());
58
td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, last_block_id_, true, std::move(P));
62
void AsyncStateSerializer::got_init_handle(BlockHandle handle) {
63
CHECK(handle->id().id.seqno == 0 || handle->is_key_block());
64
last_key_block_id_ = handle->id();
65
last_key_block_ts_ = handle->unix_time();
67
masterchain_handle_ = std::move(handle);
75
void AsyncStateSerializer::alarm() {
76
alarm_timestamp() = td::Timestamp::in(1.0 + td::Random::fast(0, 10) * 1.0);
80
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockIdExt> R) {
82
td::actor::send_closure(SelfId, &AsyncStateSerializer::got_top_masterchain_handle, R.move_as_ok());
84
td::actor::send_closure(manager_, &ValidatorManager::get_top_masterchain_block, std::move(P));
87
void AsyncStateSerializer::request_masterchain_state() {
88
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), manager = manager_](td::Result<td::Ref<ShardState>> R) {
90
td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler,
91
R.move_as_error_prefix("failed to get masterchain state: "));
93
td::actor::send_closure(manager, &ValidatorManager::get_cell_db_reader,
94
[SelfId, state = td::Ref<MasterchainState>(R.move_as_ok())](
95
td::Result<std::shared_ptr<vm::CellDbReader>> R) mutable {
97
td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler,
98
R.move_as_error_prefix("failed to get cell db reader: "));
100
td::actor::send_closure(SelfId, &AsyncStateSerializer::got_masterchain_state,
101
std::move(state), R.move_as_ok());
106
td::actor::send_closure(manager_, &ValidatorManager::get_shard_state_from_db, masterchain_handle_, std::move(P));
109
void AsyncStateSerializer::request_shard_state(BlockIdExt shard) {
110
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
112
td::actor::send_closure(SelfId, &AsyncStateSerializer::got_shard_handle, R.move_as_ok());
114
return td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, shard, true, std::move(P));
117
void AsyncStateSerializer::next_iteration() {
121
if (!masterchain_handle_) {
123
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
125
td::actor::send_closure(SelfId, &AsyncStateSerializer::got_masterchain_handle, R.move_as_ok());
127
td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, last_block_id_, true, std::move(P));
130
if (!masterchain_handle_->inited_unix_time() || !masterchain_handle_->inited_is_key_block() ||
131
!masterchain_handle_->is_applied()) {
134
CHECK(masterchain_handle_->id() == last_block_id_);
135
if (attempt_ < max_attempt() && last_key_block_id_.id.seqno < last_block_id_.id.seqno &&
136
need_serialize(masterchain_handle_) && opts_->get_state_serializer_enabled()) {
137
if (!have_masterchain_state_) {
138
LOG(ERROR) << "started serializing persistent state for " << masterchain_handle_->id().id.to_str();
139
// block next attempts immediately, but send actual request later
141
double delay = td::Random::fast(0, 3600);
142
LOG(WARNING) << "serializer delay = " << delay << "s";
143
delay_action([SelfId = actor_id(
144
this)]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_masterchain_state); },
145
td::Timestamp::in(delay));
148
while (next_idx_ < shards_.size()) {
149
if (!need_monitor(shards_[next_idx_].shard_full())) {
152
// block next attempts immediately, but send actual request later
154
double delay = td::Random::fast(0, 1800);
155
LOG(WARNING) << "serializer delay = " << delay << "s";
157
[SelfId = actor_id(this), shard = shards_[next_idx_]]() {
158
td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard);
160
td::Timestamp::in(delay));
164
LOG(ERROR) << "finished serializing persistent state for " << masterchain_handle_->id().id.to_str();
165
last_key_block_ts_ = masterchain_handle_->unix_time();
166
last_key_block_id_ = masterchain_handle_->id();
170
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
172
td::actor::send_closure(SelfId, &AsyncStateSerializer::saved_to_db);
174
td::actor::send_closure(manager_, &ValidatorManager::update_async_serializer_state,
175
AsyncSerializerState{masterchain_handle_->id(), last_key_block_id_, last_key_block_ts_},
179
if (masterchain_handle_->inited_next_left()) {
180
if (need_serialize(masterchain_handle_) && !opts_->get_state_serializer_enabled()) {
181
LOG(ERROR) << "skipping serializing persistent state for " << masterchain_handle_->id().id.to_str();
183
last_block_id_ = masterchain_handle_->one_next(true);
184
have_masterchain_state_ = false;
185
masterchain_handle_ = nullptr;
186
saved_to_db_ = false;
193
void AsyncStateSerializer::got_top_masterchain_handle(BlockIdExt block_id) {
194
if (masterchain_handle_ && masterchain_handle_->id().id.seqno < block_id.id.seqno) {
195
CHECK(masterchain_handle_->inited_next_left());
199
void AsyncStateSerializer::got_masterchain_handle(BlockHandle handle) {
200
CHECK(!masterchain_handle_);
201
masterchain_handle_ = std::move(handle);
207
void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state,
208
std::shared_ptr<vm::CellDbReader> cell_db_reader) {
209
if (!opts_->get_state_serializer_enabled()) {
210
stored_masterchain_state();
213
LOG(ERROR) << "serializing masterchain state " << masterchain_handle_->id().id.to_str();
214
have_masterchain_state_ = true;
215
CHECK(next_idx_ == 0);
216
CHECK(shards_.size() == 0);
218
auto vec = state->get_shards();
219
for (auto& v : vec) {
220
shards_.push_back(v->top_block_id());
223
auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader,
224
cancellation_token = cancellation_token_source_.get_cancellation_token()](td::FileFd& fd) mutable {
225
return vm::std_boc_serialize_to_file_large(cell_db_reader, hash, fd, 31, std::move(cancellation_token));
227
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
228
if (R.is_error() && R.error().code() == cancelled) {
229
LOG(ERROR) << "Persistent state serialization cancelled";
233
td::actor::send_closure(SelfId, &AsyncStateSerializer::stored_masterchain_state);
236
td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, masterchain_handle_->id(),
237
masterchain_handle_->id(), write_data, std::move(P));
240
void AsyncStateSerializer::stored_masterchain_state() {
241
LOG(ERROR) << "finished serializing masterchain state " << masterchain_handle_->id().id.to_str();
246
void AsyncStateSerializer::got_shard_handle(BlockHandle handle) {
247
auto P = td::PromiseCreator::lambda(
248
[SelfId = actor_id(this), handle, manager = manager_](td::Result<td::Ref<ShardState>> R) {
250
td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler, R.move_as_error());
252
td::actor::send_closure(
253
manager, &ValidatorManager::get_cell_db_reader,
254
[SelfId, state = R.move_as_ok(), handle](td::Result<std::shared_ptr<vm::CellDbReader>> R) mutable {
256
td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler,
257
R.move_as_error_prefix("failed to get cell db reader: "));
259
td::actor::send_closure(SelfId, &AsyncStateSerializer::got_shard_state, handle, std::move(state),
266
td::actor::send_closure(manager_, &ValidatorManager::get_shard_state_from_db, handle, std::move(P));
269
void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref<ShardState> state,
270
std::shared_ptr<vm::CellDbReader> cell_db_reader) {
271
if (!opts_->get_state_serializer_enabled()) {
275
LOG(ERROR) << "serializing shard state " << handle->id().id.to_str();
276
auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader,
277
cancellation_token = cancellation_token_source_.get_cancellation_token()](td::FileFd& fd) mutable {
278
return vm::std_boc_serialize_to_file_large(cell_db_reader, hash, fd, 31, std::move(cancellation_token));
280
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Unit> R) {
281
if (R.is_error() && R.error().code() == cancelled) {
282
LOG(ERROR) << "Persistent state serialization cancelled";
285
LOG(ERROR) << "finished serializing shard state " << handle->id().id.to_str();
287
td::actor::send_closure(SelfId, &AsyncStateSerializer::success_handler);
289
td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, handle->id(),
290
masterchain_handle_->id(), write_data, std::move(P));
294
void AsyncStateSerializer::fail_handler(td::Status reason) {
295
VLOG(VALIDATOR_NOTICE) << "failure: " << reason;
298
[SelfId = actor_id(this)]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler_cont); },
299
td::Timestamp::in(16.0));
302
void AsyncStateSerializer::fail_handler_cont() {
307
void AsyncStateSerializer::success_handler() {
312
void AsyncStateSerializer::update_options(td::Ref<ValidatorManagerOptions> opts) {
313
opts_ = std::move(opts);
314
if (!opts_->get_state_serializer_enabled()) {
315
cancellation_token_source_.cancel();
320
bool AsyncStateSerializer::need_monitor(ShardIdFull shard) {
321
return opts_->need_monitor(shard);
324
bool AsyncStateSerializer::need_serialize(BlockHandle handle) {
325
if (handle->id().id.seqno == 0 || !handle->is_key_block()) {
328
return ValidatorManager::is_persistent_state(handle->unix_time(), last_key_block_ts_) &&
329
ValidatorManager::persistent_state_ttl(handle->unix_time()) > (UnixTime)td::Clocks::system();
332
} // namespace validator