Ton

Форк
0
/
state-serializer.cpp 
334 строки · 13.3 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19
#include "state-serializer.hpp"
20
#include "td/utils/Random.h"
21
#include "adnl/utils.hpp"
22
#include "ton/ton-io.hpp"
23
#include "common/delay.h"
24

25
namespace ton {
26

27
namespace validator {
28

29
void AsyncStateSerializer::start_up() {
30
  if (!opts_->get_state_serializer_enabled()) {
31
    LOG(ERROR) << "Persistent state serializer is disabled";
32
  }
33
  alarm_timestamp() = td::Timestamp::in(1.0 + td::Random::fast(0, 10) * 1.0);
34
  running_ = true;
35

36
  //next_iteration();
37
  auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<AsyncSerializerState> R) {
38
    R.ensure();
39
    td::actor::send_closure(SelfId, &AsyncStateSerializer::got_self_state, R.move_as_ok());
40
  });
41
  td::actor::send_closure(manager_, &ValidatorManager::get_async_serializer_state, std::move(P));
42
}
43

44
void AsyncStateSerializer::got_self_state(AsyncSerializerState state) {
45
  if (state.last_block_id.is_valid()) {
46
    last_block_id_ = state.last_block_id;
47
    last_key_block_id_ = state.last_written_block_id;
48
    last_key_block_ts_ = state.last_written_block_ts;
49

50
    running_ = false;
51

52
    next_iteration();
53
  } else {
54
    auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
55
      R.ensure();
56
      td::actor::send_closure(SelfId, &AsyncStateSerializer::got_init_handle, R.move_as_ok());
57
    });
58
    td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, last_block_id_, true, std::move(P));
59
  }
60
}
61

62
void AsyncStateSerializer::got_init_handle(BlockHandle handle) {
63
  CHECK(handle->id().id.seqno == 0 || handle->is_key_block());
64
  last_key_block_id_ = handle->id();
65
  last_key_block_ts_ = handle->unix_time();
66

67
  masterchain_handle_ = std::move(handle);
68

69
  running_ = false;
70
  saved_to_db_ = false;
71

72
  next_iteration();
73
}
74

75
void AsyncStateSerializer::alarm() {
76
  alarm_timestamp() = td::Timestamp::in(1.0 + td::Random::fast(0, 10) * 1.0);
77

78
  next_iteration();
79

80
  auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockIdExt> R) {
81
    R.ensure();
82
    td::actor::send_closure(SelfId, &AsyncStateSerializer::got_top_masterchain_handle, R.move_as_ok());
83
  });
84
  td::actor::send_closure(manager_, &ValidatorManager::get_top_masterchain_block, std::move(P));
85
}
86

87
void AsyncStateSerializer::request_masterchain_state() {
88
  auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), manager = manager_](td::Result<td::Ref<ShardState>> R) {
89
    if (R.is_error()) {
90
      td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler,
91
                              R.move_as_error_prefix("failed to get masterchain state: "));
92
    } else {
93
      td::actor::send_closure(manager, &ValidatorManager::get_cell_db_reader,
94
                              [SelfId, state = td::Ref<MasterchainState>(R.move_as_ok())](
95
                                  td::Result<std::shared_ptr<vm::CellDbReader>> R) mutable {
96
                                if (R.is_error()) {
97
                                  td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler,
98
                                                          R.move_as_error_prefix("failed to get cell db reader: "));
99
                                } else {
100
                                  td::actor::send_closure(SelfId, &AsyncStateSerializer::got_masterchain_state,
101
                                                          std::move(state), R.move_as_ok());
102
                                }
103
                              });
104
    }
105
  });
106
  td::actor::send_closure(manager_, &ValidatorManager::get_shard_state_from_db, masterchain_handle_, std::move(P));
107
}
108

109
void AsyncStateSerializer::request_shard_state(BlockIdExt shard) {
110
  auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
111
    R.ensure();
112
    td::actor::send_closure(SelfId, &AsyncStateSerializer::got_shard_handle, R.move_as_ok());
113
  });
114
  return td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, shard, true, std::move(P));
115
}
116

117
void AsyncStateSerializer::next_iteration() {
118
  if (running_) {
119
    return;
120
  }
121
  if (!masterchain_handle_) {
122
    running_ = true;
123
    auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
124
      R.ensure();
125
      td::actor::send_closure(SelfId, &AsyncStateSerializer::got_masterchain_handle, R.move_as_ok());
126
    });
127
    td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, last_block_id_, true, std::move(P));
128
    return;
129
  }
130
  if (!masterchain_handle_->inited_unix_time() || !masterchain_handle_->inited_is_key_block() ||
131
      !masterchain_handle_->is_applied()) {
132
    return;
133
  }
134
  CHECK(masterchain_handle_->id() == last_block_id_);
135
  if (attempt_ < max_attempt() && last_key_block_id_.id.seqno < last_block_id_.id.seqno &&
136
      need_serialize(masterchain_handle_) && opts_->get_state_serializer_enabled()) {
137
    if (!have_masterchain_state_) {
138
      LOG(ERROR) << "started serializing persistent state for " << masterchain_handle_->id().id.to_str();
139
      // block next attempts immediately, but send actual request later
140
      running_ = true;
141
      double delay = td::Random::fast(0, 3600);
142
      LOG(WARNING) << "serializer delay = " << delay << "s";
143
      delay_action([SelfId = actor_id(
144
                        this)]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_masterchain_state); },
145
                   td::Timestamp::in(delay));
146
      return;
147
    }
148
    while (next_idx_ < shards_.size()) {
149
      if (!need_monitor(shards_[next_idx_].shard_full())) {
150
        next_idx_++;
151
      } else {
152
        // block next attempts immediately, but send actual request later
153
        running_ = true;
154
        double delay = td::Random::fast(0, 1800);
155
        LOG(WARNING) << "serializer delay = " << delay << "s";
156
        delay_action(
157
            [SelfId = actor_id(this), shard = shards_[next_idx_]]() {
158
              td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard);
159
            },
160
            td::Timestamp::in(delay));
161
        return;
162
      }
163
    }
164
    LOG(ERROR) << "finished serializing persistent state for " << masterchain_handle_->id().id.to_str();
165
    last_key_block_ts_ = masterchain_handle_->unix_time();
166
    last_key_block_id_ = masterchain_handle_->id();
167
  }
168
  if (!saved_to_db_) {
169
    running_ = true;
170
    auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
171
      R.ensure();
172
      td::actor::send_closure(SelfId, &AsyncStateSerializer::saved_to_db);
173
    });
174
    td::actor::send_closure(manager_, &ValidatorManager::update_async_serializer_state,
175
                            AsyncSerializerState{masterchain_handle_->id(), last_key_block_id_, last_key_block_ts_},
176
                            std::move(P));
177
    return;
178
  }
179
  if (masterchain_handle_->inited_next_left()) {
180
    if (need_serialize(masterchain_handle_) && !opts_->get_state_serializer_enabled()) {
181
      LOG(ERROR) << "skipping serializing persistent state for " << masterchain_handle_->id().id.to_str();
182
    }
183
    last_block_id_ = masterchain_handle_->one_next(true);
184
    have_masterchain_state_ = false;
185
    masterchain_handle_ = nullptr;
186
    saved_to_db_ = false;
187
    shards_.clear();
188
    next_idx_ = 0;
189
    next_iteration();
190
  }
191
}
192

193
void AsyncStateSerializer::got_top_masterchain_handle(BlockIdExt block_id) {
194
  if (masterchain_handle_ && masterchain_handle_->id().id.seqno < block_id.id.seqno) {
195
    CHECK(masterchain_handle_->inited_next_left());
196
  }
197
}
198

199
void AsyncStateSerializer::got_masterchain_handle(BlockHandle handle) {
200
  CHECK(!masterchain_handle_);
201
  masterchain_handle_ = std::move(handle);
202
  running_ = false;
203
  attempt_ = 0;
204
  next_iteration();
205
}
206

207
void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state,
208
                                                 std::shared_ptr<vm::CellDbReader> cell_db_reader) {
209
  if (!opts_->get_state_serializer_enabled()) {
210
    stored_masterchain_state();
211
    return;
212
  }
213
  LOG(ERROR) << "serializing masterchain state " << masterchain_handle_->id().id.to_str();
214
  have_masterchain_state_ = true;
215
  CHECK(next_idx_ == 0);
216
  CHECK(shards_.size() == 0);
217

218
  auto vec = state->get_shards();
219
  for (auto& v : vec) {
220
    shards_.push_back(v->top_block_id());
221
  }
222

223
  auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader,
224
                     cancellation_token = cancellation_token_source_.get_cancellation_token()](td::FileFd& fd) mutable {
225
    return vm::std_boc_serialize_to_file_large(cell_db_reader, hash, fd, 31, std::move(cancellation_token));
226
  };
227
  auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
228
    if (R.is_error() && R.error().code() == cancelled) {
229
      LOG(ERROR) << "Persistent state serialization cancelled";
230
    } else {
231
      R.ensure();
232
    }
233
    td::actor::send_closure(SelfId, &AsyncStateSerializer::stored_masterchain_state);
234
  });
235

236
  td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, masterchain_handle_->id(),
237
                          masterchain_handle_->id(), write_data, std::move(P));
238
}
239

240
void AsyncStateSerializer::stored_masterchain_state() {
241
  LOG(ERROR) << "finished serializing masterchain state " << masterchain_handle_->id().id.to_str();
242
  running_ = false;
243
  next_iteration();
244
}
245

246
void AsyncStateSerializer::got_shard_handle(BlockHandle handle) {
247
  auto P = td::PromiseCreator::lambda(
248
      [SelfId = actor_id(this), handle, manager = manager_](td::Result<td::Ref<ShardState>> R) {
249
        if (R.is_error()) {
250
          td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler, R.move_as_error());
251
        } else {
252
          td::actor::send_closure(
253
              manager, &ValidatorManager::get_cell_db_reader,
254
              [SelfId, state = R.move_as_ok(), handle](td::Result<std::shared_ptr<vm::CellDbReader>> R) mutable {
255
                if (R.is_error()) {
256
                  td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler,
257
                                          R.move_as_error_prefix("failed to get cell db reader: "));
258
                } else {
259
                  td::actor::send_closure(SelfId, &AsyncStateSerializer::got_shard_state, handle, std::move(state),
260
                                          R.move_as_ok());
261
                }
262
              });
263
        }
264
      });
265

266
  td::actor::send_closure(manager_, &ValidatorManager::get_shard_state_from_db, handle, std::move(P));
267
}
268

269
void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref<ShardState> state,
270
                                           std::shared_ptr<vm::CellDbReader> cell_db_reader) {
271
  if (!opts_->get_state_serializer_enabled()) {
272
    success_handler();
273
    return;
274
  }
275
  LOG(ERROR) << "serializing shard state " << handle->id().id.to_str();
276
  auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader,
277
                     cancellation_token = cancellation_token_source_.get_cancellation_token()](td::FileFd& fd) mutable {
278
    return vm::std_boc_serialize_to_file_large(cell_db_reader, hash, fd, 31, std::move(cancellation_token));
279
  };
280
  auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Unit> R) {
281
    if (R.is_error() && R.error().code() == cancelled) {
282
      LOG(ERROR) << "Persistent state serialization cancelled";
283
    } else {
284
      R.ensure();
285
      LOG(ERROR) << "finished serializing shard state " << handle->id().id.to_str();
286
    }
287
    td::actor::send_closure(SelfId, &AsyncStateSerializer::success_handler);
288
  });
289
  td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, handle->id(),
290
                          masterchain_handle_->id(), write_data, std::move(P));
291
  next_idx_++;
292
}
293

294
void AsyncStateSerializer::fail_handler(td::Status reason) {
295
  VLOG(VALIDATOR_NOTICE) << "failure: " << reason;
296
  attempt_++;
297
  delay_action(
298
      [SelfId = actor_id(this)]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler_cont); },
299
      td::Timestamp::in(16.0));
300
}
301

302
void AsyncStateSerializer::fail_handler_cont() {
303
  running_ = false;
304
  next_iteration();
305
}
306

307
void AsyncStateSerializer::success_handler() {
308
  running_ = false;
309
  next_iteration();
310
}
311

312
void AsyncStateSerializer::update_options(td::Ref<ValidatorManagerOptions> opts) {
313
  opts_ = std::move(opts);
314
  if (!opts_->get_state_serializer_enabled()) {
315
    cancellation_token_source_.cancel();
316
  }
317
}
318

319

320
bool AsyncStateSerializer::need_monitor(ShardIdFull shard) {
321
  return opts_->need_monitor(shard);
322
}
323

324
bool AsyncStateSerializer::need_serialize(BlockHandle handle) {
325
  if (handle->id().id.seqno == 0 || !handle->is_key_block()) {
326
    return false;
327
  }
328
  return ValidatorManager::is_persistent_state(handle->unix_time(), last_key_block_ts_) &&
329
         ValidatorManager::persistent_state_ttl(handle->unix_time()) > (UnixTime)td::Clocks::system();
330
}
331

332
}  // namespace validator
333

334
}  // namespace ton
335

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.