17
#include "queue-size-counter.hpp"
18
#include "block/block-auto.h"
19
#include "block/block-parse.h"
20
#include "common/delay.h"
21
#include "td/actor/MultiPromise.h"
22
#include "td/utils/Random.h"
24
namespace ton::validator {
26
static td::Result<td::uint32> calc_queue_size(const td::Ref<ShardState> &state) {
28
TRY_RESULT(outq_descr, state->message_queue());
29
block::gen::OutMsgQueueInfo::Record qinfo;
30
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
31
return td::Status::Error("invalid message queue");
33
vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
34
bool ok = queue.check_for_each([&](td::Ref<vm::CellSlice>, td::ConstBitPtr, int) -> bool {
39
return td::Status::Error("invalid message queue dict");
44
static td::Result<td::uint32> recalc_queue_size(const td::Ref<ShardState> &state, const td::Ref<ShardState> &prev_state,
45
td::uint32 prev_size) {
46
TRY_RESULT(outq_descr, state->message_queue());
47
block::gen::OutMsgQueueInfo::Record qinfo;
48
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
49
return td::Status::Error("invalid message queue");
51
vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
53
TRY_RESULT(prev_outq_descr, prev_state->message_queue());
54
block::gen::OutMsgQueueInfo::Record prev_qinfo;
55
if (!tlb::unpack_cell(prev_outq_descr->root_cell(), prev_qinfo)) {
56
return td::Status::Error("invalid message queue");
58
vm::AugmentedDictionary prev_queue{prev_qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
59
td::uint32 add = 0, rem = 0;
60
bool ok = prev_queue.scan_diff(
61
queue, [&](td::ConstBitPtr, int, td::Ref<vm::CellSlice> prev_val, td::Ref<vm::CellSlice> new_val) -> bool {
62
if (prev_val.not_null()) {
65
if (new_val.not_null()) {
71
return td::Status::Error("invalid message queue dict");
73
if (prev_size + add < rem) {
74
return td::Status::Error("negative value");
76
return prev_size + add - rem;
79
void QueueSizeCounter::start_up() {
80
if (init_masterchain_state_.is_null()) {
85
current_seqno_ = init_masterchain_state_->get_seqno();
86
process_top_shard_blocks_cont(init_masterchain_state_, true);
87
init_masterchain_state_ = {};
91
void QueueSizeCounter::get_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) {
92
get_queue_size_ex(block_id, simple_mode_ || is_block_too_old(block_id), std::move(promise));
95
void QueueSizeCounter::get_queue_size_ex(ton::BlockIdExt block_id, bool calc_whole, td::Promise<td::uint32> promise) {
96
Entry &entry = results_[block_id];
98
promise.set_result(entry.queue_size_);
101
entry.promises_.push_back(std::move(promise));
102
if (entry.started_) {
105
entry.started_ = true;
106
entry.calc_whole_ = calc_whole;
107
td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, block_id, true,
108
[SelfId = actor_id(this), block_id, manager = manager_](td::Result<BlockHandle> R) mutable {
110
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, block_id, R.move_as_error());
113
BlockHandle handle = R.move_as_ok();
114
td::actor::send_closure(
115
manager, &ValidatorManager::wait_block_state, handle, 0, td::Timestamp::in(10.0),
116
[SelfId, handle](td::Result<td::Ref<ShardState>> R) mutable {
118
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, handle->id(),
122
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont,
123
std::move(handle), R.move_as_ok());
128
void QueueSizeCounter::get_queue_size_cont(BlockHandle handle, td::Ref<ShardState> state) {
129
Entry &entry = results_[handle->id()];
130
CHECK(entry.started_);
131
bool calc_whole = entry.calc_whole_ || handle->id().seqno() == 0;
133
CHECK(handle->inited_prev());
134
auto prev_blocks = handle->prev();
135
bool after_split = prev_blocks.size() == 1 && handle->id().shard_full() != prev_blocks[0].shard_full();
136
bool after_merge = prev_blocks.size() == 2;
137
calc_whole = after_split || after_merge;
140
auto r_size = calc_queue_size(state);
141
if (r_size.is_error()) {
142
on_error(handle->id(), r_size.move_as_error());
146
entry.queue_size_ = r_size.move_as_ok();
147
for (auto &promise : entry.promises_) {
148
promise.set_result(entry.queue_size_);
150
entry.promises_.clear();
154
auto prev_block_id = handle->one_prev(true);
155
get_queue_size(prev_block_id, [=, SelfId = actor_id(this), manager = manager_](td::Result<td::uint32> R) {
157
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error());
160
td::uint32 prev_size = R.move_as_ok();
161
td::actor::send_closure(
162
manager, &ValidatorManager::wait_block_state_short, prev_block_id, 0, td::Timestamp::in(10.0),
163
[=](td::Result<td::Ref<ShardState>> R) {
165
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error());
168
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont2, state, R.move_as_ok(), prev_size);
173
void QueueSizeCounter::get_queue_size_cont2(td::Ref<ShardState> state, td::Ref<ShardState> prev_state,
174
td::uint32 prev_size) {
175
BlockIdExt block_id = state->get_block_id();
176
Entry &entry = results_[block_id];
177
CHECK(entry.started_);
178
auto r_size = recalc_queue_size(state, prev_state, prev_size);
179
if (r_size.is_error()) {
180
on_error(block_id, r_size.move_as_error());
184
entry.queue_size_ = r_size.move_as_ok();
185
for (auto &promise : entry.promises_) {
186
promise.set_result(entry.queue_size_);
188
entry.promises_.clear();
191
void QueueSizeCounter::on_error(ton::BlockIdExt block_id, td::Status error) {
192
auto it = results_.find(block_id);
193
if (it == results_.end()) {
196
Entry &entry = it->second;
198
for (auto &promise : entry.promises_) {
199
promise.set_error(error.clone());
204
void QueueSizeCounter::process_top_shard_blocks() {
205
LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks seqno=" << current_seqno_;
206
td::actor::send_closure(
207
manager_, &ValidatorManager::get_block_by_seqno_from_db, AccountIdPrefixFull{masterchainId, 0}, current_seqno_,
208
[SelfId = actor_id(this), manager = manager_](td::Result<ConstBlockHandle> R) {
210
LOG(WARNING) << "Failed to get masterchain block id: " << R.move_as_error();
211
delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); },
212
td::Timestamp::in(5.0));
215
td::actor::send_closure(
216
manager, &ValidatorManager::wait_block_state_short, R.ok()->id(), 0, td::Timestamp::in(10.0),
217
[=](td::Result<td::Ref<ShardState>> R) {
219
LOG(WARNING) << "Failed to get masterchain state: " << R.move_as_error();
220
delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); },
221
td::Timestamp::in(5.0));
224
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_cont,
225
td::Ref<MasterchainState>(R.move_as_ok()), false);
230
void QueueSizeCounter::process_top_shard_blocks_cont(td::Ref<MasterchainState> state, bool init) {
231
LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks_cont seqno=" << current_seqno_ << " init=" << init;
233
auto ig = mp.init_guard();
234
last_top_blocks_.clear();
235
last_top_blocks_.push_back(state->get_block_id());
236
for (auto &shard : state->get_shards()) {
237
last_top_blocks_.push_back(shard->top_block_id());
239
for (const BlockIdExt &block_id : last_top_blocks_) {
240
get_queue_size_ex_retry(block_id, init, ig.get_promise());
242
ig.add_promise([SelfId = actor_id(this)](td::Result<td::Unit> R) {
246
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_finish);
249
init_top_blocks_ = last_top_blocks_;
253
void QueueSizeCounter::get_queue_size_ex_retry(BlockIdExt block_id, bool calc_whole, td::Promise<td::Unit> promise) {
254
get_queue_size_ex(block_id, calc_whole,
255
[=, promise = std::move(promise), SelfId = actor_id(this)](td::Result<td::uint32> R) mutable {
257
LOG(WARNING) << "Failed to calculate queue size for block " << block_id.to_str() << ": "
258
<< R.move_as_error();
260
[=, promise = std::move(promise)]() mutable {
261
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_ex_retry, block_id,
262
calc_whole, std::move(promise));
264
td::Timestamp::in(5.0));
267
promise.set_result(td::Unit());
271
void QueueSizeCounter::process_top_shard_blocks_finish() {
276
void QueueSizeCounter::wait_shard_client() {
277
LOG(DEBUG) << "QueueSizeCounter::wait_shard_client seqno=" << current_seqno_;
278
td::actor::send_closure(
279
manager_, &ValidatorManager::wait_shard_client_state, current_seqno_, td::Timestamp::in(60.0),
280
[SelfId = actor_id(this)](td::Result<td::Unit> R) {
282
delay_action([=]() mutable { td::actor::send_closure(SelfId, &QueueSizeCounter::wait_shard_client); },
283
td::Timestamp::in(5.0));
286
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks);
290
void QueueSizeCounter::alarm() {
291
for (auto it = results_.begin(); it != results_.end();) {
292
if (it->second.done_ && is_block_too_old(it->first)) {
293
it = results_.erase(it);
298
alarm_timestamp() = td::Timestamp::in(td::Random::fast(20.0, 40.0));