2
This file is part of TON Blockchain Library.
4
TON Blockchain Library is free software: you can redistribute it and/or modify
5
it under the terms of the GNU Lesser General Public License as published by
6
the Free Software Foundation, either version 2 of the License, or
7
(at your option) any later version.
9
TON Blockchain Library is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU Lesser General Public License for more details.
14
You should have received a copy of the GNU Lesser General Public License
15
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
17
Copyright 2017-2020 Telegram Systems LLP
21
#include "td/utils/buffer.h"
22
#include "validator-group.hpp"
23
#include "adnl/utils.hpp"
24
#include "downloaders/wait-block-state.hpp"
25
#include "downloaders/wait-block-state-merge.hpp"
26
#include "downloaders/wait-block-data.hpp"
27
#include "validator-group.hpp"
30
#include "validate-broadcast.hpp"
31
#include "ton/ton-tl.hpp"
32
#include "ton/ton-io.hpp"
33
#include "state-serializer.hpp"
34
#include "get-next-key-blocks.h"
35
#include "import-db-slice.hpp"
37
#include "auto/tl/lite_api.h"
38
#include "tl-utils/lite-utils.hpp"
39
#include "auto/tl/ton_api_json.h"
40
#include "tl/tl_json.h"
42
#include "td/utils/Random.h"
43
#include "td/utils/port/path.h"
44
#include "td/utils/JsonBuilder.h"
46
#include "common/delay.h"
48
#include "validator/stats-merger.h"
56
void ValidatorManagerImpl::validate_block_is_next_proof(BlockIdExt prev_block_id, BlockIdExt next_block_id,
57
td::BufferSlice proof, td::Promise<td::Unit> promise) {
58
if (!prev_block_id.is_masterchain() || !next_block_id.is_masterchain()) {
59
VLOG(VALIDATOR_NOTICE) << "prev=" << prev_block_id << " next=" << next_block_id;
61
td::Status::Error(ErrorCode::protoviolation, "validate_block_is_next_proof() can only work for masterchain"));
64
if (prev_block_id.seqno() + 1 != next_block_id.seqno()) {
65
VLOG(VALIDATOR_NOTICE) << "prev=" << prev_block_id << " next=" << next_block_id;
66
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "validate_block_is_next_proof(): bad seqno"));
69
CHECK(last_masterchain_state_.not_null());
70
auto pp = create_proof(next_block_id, std::move(proof));
72
promise.set_error(pp.move_as_error_prefix("failed to create proof: "));
76
if (last_masterchain_seqno_ == prev_block_id.seqno()) {
77
CHECK(last_masterchain_block_id_ == prev_block_id);
79
auto P = td::PromiseCreator::lambda(
80
[promise = std::move(promise), id = prev_block_id](td::Result<BlockHandle> R) mutable {
82
promise.set_error(R.move_as_error());
85
auto handle = R.move_as_ok();
86
CHECK(!handle->merge_before());
87
if (handle->one_prev(true) != id) {
88
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "prev block mismatch"));
91
promise.set_value(td::Unit());
94
run_check_proof_query(next_block_id, pp.move_as_ok(), actor_id(this), td::Timestamp::in(2.0), std::move(P),
95
last_masterchain_state_, opts_->is_hardfork(next_block_id));
98
td::PromiseCreator::lambda([promise = std::move(promise), next_block_id](td::Result<BlockHandle> R) mutable {
100
auto handle = R.move_as_ok();
101
CHECK(handle->inited_next_left());
102
if (handle->one_next(true) == next_block_id) {
103
promise.set_value(td::Unit());
105
promise.set_error(td::Status::Error("next block id mismatch"));
108
get_block_handle(prev_block_id, false, std::move(P));
112
void ValidatorManagerImpl::validate_block_proof(BlockIdExt block_id, td::BufferSlice proof,
113
td::Promise<td::Unit> promise) {
114
auto pp = create_proof(block_id, std::move(proof));
116
promise.set_error(pp.move_as_error_prefix(PSTRING() << "failed to create proof for " << block_id << ": "));
120
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
122
promise.set_error(R.move_as_error());
124
promise.set_value(td::Unit());
127
run_check_proof_query(block_id, pp.move_as_ok(), actor_id(this), td::Timestamp::in(2.0), std::move(P),
128
opts_->is_hardfork(block_id));
131
void ValidatorManagerImpl::validate_block_proof_link(BlockIdExt block_id, td::BufferSlice proof,
132
td::Promise<td::Unit> promise) {
133
auto pp = create_proof_link(block_id, std::move(proof));
135
promise.set_error(pp.move_as_error_prefix(PSTRING() << "failed to create proof link for " << block_id << ": "));
139
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
141
promise.set_error(R.move_as_error());
143
promise.set_value(td::Unit());
146
run_check_proof_link_query(block_id, pp.move_as_ok(), actor_id(this), td::Timestamp::in(2.0), std::move(P));
149
void ValidatorManagerImpl::validate_block_proof_rel(BlockIdExt block_id, BlockIdExt rel_block_id, td::BufferSlice proof,
150
td::Promise<td::Unit> promise) {
151
auto pp = create_proof(block_id, std::move(proof));
153
promise.set_error(pp.move_as_error_prefix(PSTRING() << "failed to create proof for " << block_id << ": "));
157
auto Q = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
159
promise.set_error(R.move_as_error());
161
promise.set_value(td::Unit());
165
if (rel_block_id.id.seqno == 0) {
166
auto P = td::PromiseCreator::lambda(
167
[block_id, SelfId = actor_id(this), proof = pp.move_as_ok(), promise = std::move(Q),
168
skip_sig = opts_->is_hardfork(block_id)](td::Result<td::Ref<ShardState>> R) mutable {
170
promise.set_error(R.move_as_error());
172
auto s = td::Ref<MasterchainState>{R.move_as_ok()};
174
run_check_proof_query(block_id, std::move(proof), SelfId, td::Timestamp::in(2.0), std::move(promise),
175
std::move(s), skip_sig);
178
get_shard_state_from_db_short(rel_block_id, std::move(P));
181
td::PromiseCreator::lambda([block_id, SelfId = actor_id(this), proof = pp.move_as_ok(), promise = std::move(Q),
182
skip_sig = opts_->is_hardfork(block_id)](td::Result<td::Ref<ProofLink>> R) mutable {
184
promise.set_error(R.move_as_error());
186
run_check_proof_query(block_id, std::move(proof), SelfId, td::Timestamp::in(2.0), std::move(promise),
187
R.move_as_ok(), skip_sig);
190
get_block_proof_link_from_db_short(rel_block_id, std::move(P));
194
void ValidatorManagerImpl::validate_block(ReceivedBlock block, td::Promise<BlockHandle> promise) {
195
auto blkid = block.id;
196
auto pp = create_block(std::move(block));
198
promise.set_error(pp.move_as_error_prefix(PSTRING() << "failed to create block for " << blkid << ": "));
201
CHECK(blkid.is_masterchain());
203
auto P = td::PromiseCreator::lambda(
204
[SelfId = actor_id(this), promise = std::move(promise), id = block.id](td::Result<td::Unit> R) mutable {
206
promise.set_error(R.move_as_error());
208
td::actor::send_closure(SelfId, &ValidatorManager::get_block_handle, id, true, std::move(promise));
211
run_apply_block_query(block.id, pp.move_as_ok(), block.id, actor_id(this), td::Timestamp::in(10.0), std::move(P));
214
void ValidatorManagerImpl::prevalidate_block(BlockBroadcast broadcast, td::Promise<td::Unit> promise) {
216
promise.set_error(td::Status::Error(ErrorCode::notready, "node not started"));
219
td::actor::create_actor<ValidateBroadcast>("broadcast", std::move(broadcast), last_masterchain_block_handle_,
220
last_masterchain_state_, last_known_key_block_handle_, actor_id(this),
221
td::Timestamp::in(2.0), std::move(promise))
225
void ValidatorManagerImpl::sync_complete(td::Promise<td::Unit> promise) {
228
VLOG(VALIDATOR_WARNING) << "completed sync. Validating " << validator_groups_.size() << " groups";
229
for (auto &v : validator_groups_) {
230
if (!v.second.actor.empty()) {
231
td::actor::send_closure(v.second.actor, &ValidatorGroup::create_session);
234
for (auto &v : next_validator_groups_) {
235
if (!v.second.actor.empty()) {
236
td::actor::send_closure(v.second.actor, &ValidatorGroup::create_session);
241
void ValidatorManagerImpl::get_next_block(BlockIdExt block_id, td::Promise<BlockHandle> promise) {
242
auto P = td::PromiseCreator::lambda(
243
[SelfId = actor_id(this), promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
245
promise.set_error(R.move_as_error());
248
auto handle = R.move_as_ok();
249
if (!handle->inited_next_left()) {
250
promise.set_error(td::Status::Error(ErrorCode::notready, "next block not known"));
254
td::actor::send_closure(SelfId, &ValidatorManagerImpl::get_block_handle, handle->one_next(true), true,
258
get_block_handle(block_id, false, std::move(P));
261
void ValidatorManagerImpl::get_next_key_blocks(BlockIdExt block_id, td::uint32 cnt,
262
td::Promise<std::vector<BlockIdExt>> promise) {
263
if (!last_masterchain_block_handle_ || !last_key_block_handle_) {
264
promise.set_error(td::Status::Error(ErrorCode::notready, "not inited"));
268
td::actor::create_actor<GetNextKeyBlocks>("nextkeyblocks", block_id, cnt, last_key_block_handle_,
269
last_masterchain_state_, actor_id(this), td::Timestamp::in(2.0),
274
void ValidatorManagerImpl::get_block_data(BlockHandle handle, td::Promise<td::BufferSlice> promise) {
275
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<td::Ref<BlockData>> R) mutable {
277
promise.set_error(R.move_as_error());
279
auto B = R.move_as_ok();
280
promise.set_value(B->data());
284
td::actor::send_closure(db_, &Db::get_block_data, std::move(handle), std::move(P));
287
void ValidatorManagerImpl::check_zero_state_exists(BlockIdExt block_id, td::Promise<bool> promise) {
288
td::actor::send_closure(db_, &Db::check_zero_state_file_exists, block_id, std::move(promise));
290
void ValidatorManagerImpl::get_zero_state(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) {
291
td::actor::send_closure(db_, &Db::get_zero_state_file, block_id, std::move(promise));
294
void ValidatorManagerImpl::check_persistent_state_exists(BlockIdExt block_id, BlockIdExt masterchain_block_id,
295
td::Promise<bool> promise) {
296
td::actor::send_closure(db_, &Db::check_persistent_state_file_exists, block_id, masterchain_block_id,
299
void ValidatorManagerImpl::get_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id,
300
td::Promise<td::BufferSlice> promise) {
301
td::actor::send_closure(db_, &Db::get_persistent_state_file, block_id, masterchain_block_id, std::move(promise));
304
void ValidatorManagerImpl::get_persistent_state_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id,
305
td::int64 offset, td::int64 max_length,
306
td::Promise<td::BufferSlice> promise) {
307
td::actor::send_closure(db_, &Db::get_persistent_state_file_slice, block_id, masterchain_block_id, offset, max_length,
311
void ValidatorManagerImpl::get_block_proof(BlockHandle handle, td::Promise<td::BufferSlice> promise) {
312
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<td::Ref<Proof>> R) mutable {
314
promise.set_error(R.move_as_error());
316
auto B = R.move_as_ok();
317
promise.set_value(B->data());
321
td::actor::send_closure(db_, &Db::get_block_proof, std::move(handle), std::move(P));
324
void ValidatorManagerImpl::get_block_proof_link(BlockHandle handle, td::Promise<td::BufferSlice> promise) {
325
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<td::Ref<ProofLink>> R) mutable {
327
promise.set_error(R.move_as_error());
329
auto B = R.move_as_ok();
330
promise.set_value(B->data());
334
td::actor::send_closure(db_, &Db::get_block_proof_link, std::move(handle), std::move(P));
337
void ValidatorManagerImpl::get_key_block_proof(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) {
338
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<td::Ref<Proof>> R) mutable {
340
promise.set_error(R.move_as_error());
342
auto B = R.move_as_ok();
343
promise.set_value(B->data());
347
td::actor::send_closure(db_, &Db::get_key_block_proof, block_id, std::move(P));
350
void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) {
351
auto P = td::PromiseCreator::lambda(
352
[promise = std::move(promise), block_id, db = db_.get()](td::Result<td::Ref<Proof>> R) mutable {
354
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<td::Ref<ProofLink>> R) mutable {
356
promise.set_error(R.move_as_error());
358
auto B = R.move_as_ok();
359
promise.set_value(B->data());
363
td::actor::send_closure(db, &Db::get_key_block_proof_link, block_id, std::move(P));
365
auto B = R.move_as_ok()->export_as_proof_link().move_as_ok();
366
promise.set_value(B->data());
370
td::actor::send_closure(db_, &Db::get_key_block_proof, block_id, std::move(P));
373
void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) {
374
if (!is_validator()) {
377
if (last_masterchain_state_.is_null()) {
378
VLOG(VALIDATOR_NOTICE) << "dropping ext message: validator is not ready";
381
if (ext_msgs_[priority].ext_messages_.size() > (size_t)max_mempool_num()) {
384
auto R = create_ext_message(std::move(data), last_masterchain_state_->get_ext_msg_limits());
386
VLOG(VALIDATOR_NOTICE) << "dropping bad ext message: " << R.move_as_error();
389
add_external_message(R.move_as_ok(), priority);
392
void ValidatorManagerImpl::add_external_message(td::Ref<ExtMessage> msg, int priority) {
393
auto &msgs = ext_msgs_[priority];
394
auto message = std::make_unique<MessageExt<ExtMessage>>(msg);
395
auto id = message->ext_id();
396
auto address = message->address();
397
unsigned long per_address_limit = 256;
398
auto it = msgs.ext_addr_messages_.find(address);
399
if (it != msgs.ext_addr_messages_.end() && it->second.size() >= per_address_limit) {
402
auto it2 = ext_messages_hashes_.find(id.hash);
403
if (it2 != ext_messages_hashes_.end()) {
404
int old_priority = it2->second.first;
405
if (old_priority >= priority) {
408
ext_msgs_[old_priority].erase(id);
410
msgs.ext_messages_.emplace(id, std::move(message));
411
msgs.ext_addr_messages_[address].emplace(id.hash, id);
412
ext_messages_hashes_[id.hash] = {priority, id};
414
void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) {
415
auto state = do_get_last_liteserver_state();
416
if (state.is_null()) {
417
promise.set_error(td::Status::Error(ErrorCode::notready, "not ready"));
420
auto R = create_ext_message(std::move(data), state->get_ext_msg_limits());
422
promise.set_error(R.move_as_error_prefix("failed to parse external message: "));
425
auto message = R.move_as_ok();
426
WorkchainId wc = message->wc();
427
StdSmcAddress addr = message->addr();
428
if (checked_ext_msg_counter_.get_msg_count(wc, addr) >= max_ext_msg_per_addr()) {
430
td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex()));
434
promise = [self = this, wc, addr, promise = std::move(promise),
435
SelfId = actor_id(this)](td::Result<td::Ref<ExtMessage>> R) mutable {
437
promise.set_error(R.move_as_error());
440
td::actor::send_lambda(SelfId, [=, promise = std::move(promise), message = R.move_as_ok()]() mutable {
441
if (self->checked_ext_msg_counter_.inc_msg_count(wc, addr) > max_ext_msg_per_addr()) {
443
td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex()));
446
promise.set_result(std::move(message));
449
++ls_stats_check_ext_messages_;
450
run_check_external_message(std::move(message), actor_id(this), std::move(promise));
453
void ValidatorManagerImpl::new_ihr_message(td::BufferSlice data) {
454
if (!is_validator()) {
457
auto R = create_ihr_message(std::move(data));
459
VLOG(VALIDATOR_NOTICE) << "dropping bad ihr message: " << R.move_as_error();
462
auto M = std::make_unique<MessageExt<IhrMessage>>(R.move_as_ok());
463
auto id = M->ext_id();
464
if (ihr_messages_hashes_.count(id.hash) == 0) {
465
ihr_messages_.emplace(id, std::move(M));
466
ihr_messages_hashes_.emplace(id.hash, id);
470
void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) {
471
if (!is_validator()) {
474
if (!last_masterchain_block_handle_) {
475
VLOG(VALIDATOR_DEBUG) << "dropping top shard block broadcast: not inited";
481
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{block_id.shard_full(), cc_seqno});
482
if (it != shard_blocks_.end() && block_id.id.seqno <= it->second->block_id().id.seqno) {
483
VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast";
486
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardTopBlockDescription>> R) {
488
VLOG(VALIDATOR_NOTICE) << "dropping invalid new shard block description: " << R.move_as_error();
490
td::actor::send_closure(SelfId, &ValidatorManagerImpl::add_shard_block_description, R.move_as_ok());
493
run_validate_shard_block_description(std::move(data), last_masterchain_block_handle_, last_masterchain_state_,
494
actor_id(this), td::Timestamp::in(2.0), std::move(P));
497
void ValidatorManagerImpl::new_block_candidate(BlockIdExt block_id, td::BufferSlice data) {
498
if (!last_masterchain_block_handle_) {
499
VLOG(VALIDATOR_DEBUG) << "dropping top shard block broadcast: not inited";
505
add_cached_block_candidate(ReceivedBlock{block_id, std::move(data)});
508
void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDescription> desc) {
509
if (desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
510
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{desc->shard(), desc->catchain_seqno()});
511
if (it != shard_blocks_.end() && desc->block_id().id.seqno <= it->second->block_id().id.seqno) {
512
VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast";
515
shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}] = desc;
516
VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id();
517
if (last_masterchain_block_handle_ && last_masterchain_seqno_ > 0 &&
518
desc->generated_at() < last_masterchain_block_handle_->unix_time() + 60) {
520
[SelfId = actor_id(this), desc]() {
521
auto P = td::PromiseCreator::lambda([](td::Result<td::Ref<ShardState>> R) {
523
auto S = R.move_as_error();
524
if (S.code() != ErrorCode::timeout && S.code() != ErrorCode::notready) {
525
VLOG(VALIDATOR_NOTICE) << "failed to get shard state: " << S;
527
VLOG(VALIDATOR_DEBUG) << "failed to get shard state: " << S;
531
td::actor::send_closure(SelfId, &ValidatorManager::wait_block_state_short, desc->block_id(), 0,
532
td::Timestamp::in(60.0), std::move(P));
534
td::Timestamp::in(1.0));
539
void ValidatorManagerImpl::add_cached_block_candidate(ReceivedBlock block) {
540
BlockIdExt id = block.id;
541
if (block.id.is_masterchain()) {
544
if (cached_block_candidates_.emplace(id, std::move(block)).second) {
545
cached_block_candidates_lru_.push_back(id);
547
auto it = wait_block_data_.find(id);
548
if (it != wait_block_data_.end()) {
549
auto r_block = create_block(cached_block_candidates_[id].clone());
550
if (r_block.is_ok()) {
551
td::actor::send_closure(it->second.actor_, &WaitBlockData::got_block_data_from_net, r_block.move_as_ok());
556
auto it = wait_state_.find(id);
557
if (it != wait_state_.end()) {
558
// Proof link is not ready at this point, but this will force WaitBlockState to redo send_get_proof_link_request
559
td::actor::send_closure(it->second.actor_, &WaitBlockState::after_get_proof_link);
563
if (cached_block_candidates_lru_.size() > max_cached_candidates()) {
564
CHECK(cached_block_candidates_.erase(cached_block_candidates_lru_.front()));
565
cached_block_candidates_lru_.pop_front();
569
void ValidatorManagerImpl::add_ext_server_id(adnl::AdnlNodeIdShort id) {
570
class Cb : public adnl::Adnl::Callback {
572
td::actor::ActorId<ValidatorManagerImpl> id_;
574
void receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data) override {
576
void receive_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data,
577
td::Promise<td::BufferSlice> promise) override {
578
td::actor::send_closure(id_, &ValidatorManagerImpl::run_ext_query, std::move(data), std::move(promise));
582
Cb(td::actor::ActorId<ValidatorManagerImpl> id) : id_(id) {
586
td::actor::send_closure(adnl_, &adnl::Adnl::subscribe, id,
587
adnl::Adnl::int_to_bytestring(lite_api::liteServer_query::ID),
588
std::make_unique<Cb>(actor_id(this)));
590
if (lite_server_.empty()) {
591
pending_ext_ids_.push_back(id);
593
td::actor::send_closure(lite_server_, &adnl::AdnlExtServer::add_local_id, id);
597
void ValidatorManagerImpl::add_ext_server_port(td::uint16 port) {
598
if (lite_server_.empty()) {
599
pending_ext_ports_.push_back(port);
601
td::actor::send_closure(lite_server_, &adnl::AdnlExtServer::add_tcp_port, port);
605
void ValidatorManagerImpl::created_ext_server(td::actor::ActorOwn<adnl::AdnlExtServer> server) {
606
lite_server_ = std::move(server);
607
for (auto &id : pending_ext_ids_) {
608
td::actor::send_closure(lite_server_, &adnl::AdnlExtServer::add_local_id, id);
610
for (auto port : pending_ext_ports_) {
611
td::actor::send_closure(lite_server_, &adnl::AdnlExtServer::add_tcp_port, port);
613
pending_ext_ids_.clear();
614
pending_ext_ports_.clear();
617
void ValidatorManagerImpl::run_ext_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
619
promise.set_error(td::Status::Error(ErrorCode::notready, "node not synced"));
622
auto F = fetch_tl_object<lite_api::liteServer_query>(data.clone(), true);
624
data = std::move(F.move_as_ok()->data_);
626
auto G = fetch_tl_prefix<lite_api::liteServer_queryPrefix>(data, true);
628
promise.set_error(G.move_as_error());
633
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<td::BufferSlice> R) mutable {
634
td::BufferSlice data;
636
auto S = R.move_as_error();
637
data = create_serialize_tl_object<lite_api::liteServer_error>(S.code(), S.message().c_str());
639
data = R.move_as_ok();
641
promise.set_value(std::move(data));
644
auto E = fetch_tl_prefix<lite_api::liteServer_waitMasterchainSeqno>(data, true);
646
run_liteserver_query(std::move(data), actor_id(this), lite_server_cache_.get(), std::move(P));
648
auto e = E.move_as_ok();
649
if (static_cast<BlockSeqno>(e->seqno_) <= min_confirmed_masterchain_seqno_) {
650
run_liteserver_query(std::move(data), actor_id(this), lite_server_cache_.get(), std::move(P));
652
auto t = e->timeout_ms_ < 10000 ? e->timeout_ms_ * 0.001 : 10.0;
654
td::PromiseCreator::lambda([data = std::move(data), SelfId = actor_id(this), cache = lite_server_cache_.get(),
655
promise = std::move(P)](td::Result<td::Unit> R) mutable {
657
promise.set_error(R.move_as_error());
660
run_liteserver_query(std::move(data), SelfId, cache, std::move(promise));
662
wait_shard_client_state(e->seqno_, td::Timestamp::in(t), std::move(Q));
667
void ValidatorManagerImpl::wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
668
td::Promise<td::Ref<ShardState>> promise) {
669
auto it0 = block_state_cache_.find(handle->id());
670
if (it0 != block_state_cache_.end()) {
671
it0->second.ttl_ = td::Timestamp::in(30.0);
672
promise.set_result(it0->second.state_);
675
auto it = wait_state_.find(handle->id());
676
if (it == wait_state_.end()) {
677
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Ref<ShardState>> R) {
678
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_state, handle, std::move(R));
680
auto id = td::actor::create_actor<WaitBlockState>("waitstate", handle, priority, actor_id(this),
681
td::Timestamp::in(10.0), std::move(P))
683
wait_state_[handle->id()].actor_ = id;
684
it = wait_state_.find(handle->id());
687
it->second.waiting_.emplace_back(timeout, priority, std::move(promise));
688
auto X = it->second.get_timeout();
689
td::actor::send_closure(it->second.actor_, &WaitBlockState::update_timeout, X.first, X.second);
692
void ValidatorManagerImpl::wait_block_state_short(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
693
td::Promise<td::Ref<ShardState>> promise) {
694
auto P = td::PromiseCreator::lambda(
695
[SelfId = actor_id(this), priority, timeout, promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
697
promise.set_error(R.move_as_error());
700
td::actor::send_closure(SelfId, &ValidatorManagerImpl::wait_block_state, R.move_as_ok(), priority, timeout,
703
get_block_handle(block_id, true, std::move(P));
706
void ValidatorManagerImpl::wait_block_data(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
707
td::Promise<td::Ref<BlockData>> promise) {
708
auto it = wait_block_data_.find(handle->id());
709
if (it == wait_block_data_.end()) {
710
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Ref<BlockData>> R) {
711
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_data, handle, std::move(R));
713
auto id = td::actor::create_actor<WaitBlockData>("waitdata", handle, priority, actor_id(this),
714
td::Timestamp::in(10.0), std::move(P))
716
wait_block_data_[handle->id()].actor_ = id;
717
it = wait_block_data_.find(handle->id());
720
it->second.waiting_.emplace_back(timeout, priority, std::move(promise));
721
auto X = it->second.get_timeout();
722
td::actor::send_closure(it->second.actor_, &WaitBlockData::update_timeout, X.first, X.second);
725
void ValidatorManagerImpl::wait_block_data_short(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
726
td::Promise<td::Ref<BlockData>> promise) {
727
auto P = td::PromiseCreator::lambda(
728
[SelfId = actor_id(this), priority, timeout, promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
730
promise.set_error(R.move_as_error());
733
td::actor::send_closure(SelfId, &ValidatorManagerImpl::wait_block_data, R.move_as_ok(), priority, timeout,
736
get_block_handle(block_id, true, std::move(P));
739
void ValidatorManagerImpl::wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority,
740
td::Timestamp timeout, td::Promise<td::Ref<ShardState>> promise) {
741
td::actor::create_actor<WaitBlockStateMerge>("merge", left_id, right_id, priority, actor_id(this), timeout,
746
void ValidatorManagerImpl::wait_prev_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
747
td::Promise<td::Ref<ShardState>> promise) {
749
CHECK(!handle->is_zero());
750
if (!handle->merge_before()) {
751
auto shard = handle->id().shard_full();
752
auto prev_shard = handle->one_prev(true).shard_full();
753
if (shard == prev_shard) {
754
wait_block_state_short(handle->one_prev(true), priority, timeout, std::move(promise));
756
CHECK(shard_parent(shard) == prev_shard);
757
bool left = shard_child(prev_shard, true) == shard;
759
td::PromiseCreator::lambda([promise = std::move(promise), left](td::Result<td::Ref<ShardState>> R) mutable {
761
promise.set_error(R.move_as_error());
763
auto s = R.move_as_ok();
766
promise.set_error(r.move_as_error());
768
auto v = r.move_as_ok();
769
promise.set_value(left ? std::move(v.first) : std::move(v.second));
773
wait_block_state_short(handle->one_prev(true), priority, timeout, std::move(P));
776
wait_block_state_merge(handle->one_prev(true), handle->one_prev(false), priority, timeout, std::move(promise));
780
void ValidatorManagerImpl::wait_block_proof(BlockHandle handle, td::Timestamp timeout,
781
td::Promise<td::Ref<Proof>> promise) {
782
td::actor::send_closure(db_, &Db::get_block_proof, std::move(handle), std::move(promise));
785
void ValidatorManagerImpl::wait_block_proof_short(BlockIdExt block_id, td::Timestamp timeout,
786
td::Promise<td::Ref<Proof>> promise) {
787
auto P = td::PromiseCreator::lambda(
788
[SelfId = actor_id(this), timeout, promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
790
promise.set_error(R.move_as_error());
793
td::actor::send_closure(SelfId, &ValidatorManagerImpl::wait_block_proof, R.move_as_ok(), timeout,
796
get_block_handle(block_id, true, std::move(P));
799
void ValidatorManagerImpl::wait_block_proof_link(BlockHandle handle, td::Timestamp timeout,
800
td::Promise<td::Ref<ProofLink>> promise) {
801
td::actor::send_closure(db_, &Db::get_block_proof_link, std::move(handle), std::move(promise));
804
void ValidatorManagerImpl::wait_block_proof_link_short(BlockIdExt block_id, td::Timestamp timeout,
805
td::Promise<td::Ref<ProofLink>> promise) {
806
auto P = td::PromiseCreator::lambda(
807
[SelfId = actor_id(this), timeout, promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
809
promise.set_error(R.move_as_error());
812
td::actor::send_closure(SelfId, &ValidatorManagerImpl::wait_block_proof_link, R.move_as_ok(), timeout,
815
get_block_handle(block_id, true, std::move(P));
818
void ValidatorManagerImpl::wait_block_signatures(BlockHandle handle, td::Timestamp timeout,
819
td::Promise<td::Ref<BlockSignatureSet>> promise) {
820
td::actor::send_closure(db_, &Db::get_block_signatures, handle, std::move(promise));
823
void ValidatorManagerImpl::wait_block_signatures_short(BlockIdExt block_id, td::Timestamp timeout,
824
td::Promise<td::Ref<BlockSignatureSet>> promise) {
825
auto P = td::PromiseCreator::lambda(
826
[SelfId = actor_id(this), timeout, promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
828
promise.set_error(R.move_as_error());
831
td::actor::send_closure(SelfId, &ValidatorManagerImpl::wait_block_signatures, R.move_as_ok(), timeout,
834
get_block_handle(block_id, true, std::move(P));
837
void ValidatorManagerImpl::wait_block_message_queue(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
838
td::Promise<td::Ref<MessageQueue>> promise) {
839
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<td::Ref<ShardState>> R) mutable {
841
promise.set_error(R.move_as_error());
843
auto state = R.move_as_ok();
844
promise.set_result(state->message_queue());
848
wait_block_state(handle, priority, timeout, std::move(P));
851
void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, td::uint32 priority,
852
td::Timestamp timeout,
853
td::Promise<td::Ref<MessageQueue>> promise) {
854
auto P = td::PromiseCreator::lambda(
855
[SelfId = actor_id(this), priority, timeout, promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
857
promise.set_error(R.move_as_error());
860
td::actor::send_closure(SelfId, &ValidatorManagerImpl::wait_block_message_queue, R.move_as_ok(), priority,
861
timeout, std::move(promise));
863
get_block_handle(block_id, true, std::move(P));
866
void ValidatorManagerImpl::get_external_messages(
867
ShardIdFull shard, td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) {
869
size_t processed = 0, deleted = 0;
870
std::vector<std::pair<td::Ref<ExtMessage>, int>> res;
871
MessageId<ExtMessage> left{AccountIdPrefixFull{shard.workchain, shard.shard & (shard.shard - 1)}, Bits256::zero()};
872
size_t total_msgs = 0;
873
td::Random::Fast rnd;
874
for (auto iter = ext_msgs_.rbegin(); iter != ext_msgs_.rend(); ++iter) {
875
std::vector<std::pair<td::Ref<ExtMessage>, int>> cur_res;
876
int priority = iter->first;
877
auto &msgs = iter->second;
878
auto it = msgs.ext_messages_.lower_bound(left);
879
while (it != msgs.ext_messages_.end()) {
881
if (!shard_contains(shard, s.dst)) {
885
if (it->second->expired()) {
886
msgs.ext_addr_messages_[it->second->address()].erase(it->first.hash);
887
ext_messages_hashes_.erase(it->first.hash);
888
it = msgs.ext_messages_.erase(it);
892
if (it->second->is_active()) {
893
cur_res.emplace_back(it->second->message(), priority);
897
td::random_shuffle(td::as_mutable_span(cur_res), rnd);
898
res.insert(res.end(), cur_res.begin(), cur_res.end());
899
total_msgs += msgs.ext_messages_.size();
901
LOG(WARNING) << "get_external_messages to shard " << shard.to_str() << " : time=" << t.elapsed()
902
<< " result_size=" << res.size() << " processed=" << processed << " expired=" << deleted
903
<< " total_size=" << total_msgs;
904
promise.set_value(std::move(res));
907
void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) {
908
std::vector<td::Ref<IhrMessage>> res;
909
MessageId<IhrMessage> left{AccountIdPrefixFull{shard.workchain, shard.shard & (shard.shard - 1)}, Bits256::zero()};
910
auto it = ihr_messages_.lower_bound(left);
911
while (it != ihr_messages_.end()) {
913
if (!shard_contains(shard, s.dst)) {
916
if (it->second->expired()) {
917
ihr_messages_hashes_.erase(it->first.hash);
918
it = ihr_messages_.erase(it);
921
if (it->second->is_active()) {
922
res.push_back(it->second->message());
926
promise.set_value(std::move(res));
929
void ValidatorManagerImpl::get_shard_blocks(BlockIdExt masterchain_block_id,
930
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
931
std::vector<td::Ref<ShardTopBlockDescription>> v;
932
for (auto &b : shard_blocks_) {
933
v.push_back(b.second);
935
promise.set_value(std::move(v));
938
void ValidatorManagerImpl::complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
939
std::vector<ExtMessage::Hash> to_delete) {
940
for (auto &hash : to_delete) {
941
auto it = ext_messages_hashes_.find(hash);
942
if (it != ext_messages_hashes_.end()) {
943
int priority = it->second.first;
944
auto msg_id = it->second.second;
945
ext_msgs_[priority].erase(msg_id);
946
ext_messages_hashes_.erase(it);
949
unsigned long soft_mempool_limit = 1024;
950
for (auto &hash : to_delay) {
951
auto it = ext_messages_hashes_.find(hash);
952
if (it != ext_messages_hashes_.end()) {
953
int priority = it->second.first;
954
auto msg_id = it->second.second;
955
auto &msgs = ext_msgs_[priority];
956
auto it2 = msgs.ext_messages_.find(msg_id);
957
if ((msgs.ext_messages_.size() < soft_mempool_limit) && it2->second->can_postpone()) {
958
it2->second->postpone();
961
ext_messages_hashes_.erase(it);
967
void ValidatorManagerImpl::complete_ihr_messages(std::vector<IhrMessage::Hash> to_delay,
968
std::vector<IhrMessage::Hash> to_delete) {
969
for (auto &hash : to_delete) {
970
auto it = ihr_messages_hashes_.find(hash);
971
if (it != ihr_messages_hashes_.end()) {
972
ihr_messages_.erase(it->second);
973
ihr_messages_hashes_.erase(it);
976
for (auto &hash : to_delay) {
977
auto it = ihr_messages_hashes_.find(hash);
978
if (it != ihr_messages_hashes_.end()) {
979
auto it2 = ihr_messages_.find(it->second);
980
CHECK(it2 != ihr_messages_.end());
981
if (it2->second->can_postpone()) {
982
it2->second->postpone();
984
ihr_messages_.erase(it2);
985
ihr_messages_hashes_.erase(it);
991
void ValidatorManagerImpl::get_block_data_from_db(ConstBlockHandle handle, td::Promise<td::Ref<BlockData>> promise) {
992
td::actor::send_closure(db_, &Db::get_block_data, std::move(handle), std::move(promise));
995
void ValidatorManagerImpl::get_block_data_from_db_short(BlockIdExt block_id, td::Promise<td::Ref<BlockData>> promise) {
997
td::PromiseCreator::lambda([db = db_.get(), promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
999
promise.set_error(R.move_as_error());
1001
auto handle = R.move_as_ok();
1002
td::actor::send_closure(db, &Db::get_block_data, std::move(handle), std::move(promise));
1005
get_block_handle(block_id, false, std::move(P));
1008
void ValidatorManagerImpl::get_shard_state_from_db(ConstBlockHandle handle, td::Promise<td::Ref<ShardState>> promise) {
1009
td::actor::send_closure(db_, &Db::get_block_state, handle, std::move(promise));
1012
void ValidatorManagerImpl::get_shard_state_from_db_short(BlockIdExt block_id,
1013
td::Promise<td::Ref<ShardState>> promise) {
1015
td::PromiseCreator::lambda([db = db_.get(), promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
1017
promise.set_error(R.move_as_error());
1019
auto handle = R.move_as_ok();
1020
td::actor::send_closure(db, &Db::get_block_state, std::move(handle), std::move(promise));
1023
get_block_handle(block_id, false, std::move(P));
1026
void ValidatorManagerImpl::get_block_candidate_from_db(PublicKey source, BlockIdExt id,
1027
FileHash collated_data_file_hash,
1028
td::Promise<BlockCandidate> promise) {
1029
td::actor::send_closure(db_, &Db::get_block_candidate, source, id, collated_data_file_hash, std::move(promise));
1032
void ValidatorManagerImpl::get_block_proof_from_db(ConstBlockHandle handle, td::Promise<td::Ref<Proof>> promise) {
1033
td::actor::send_closure(db_, &Db::get_block_proof, std::move(handle), std::move(promise));
1036
void ValidatorManagerImpl::get_block_proof_from_db_short(BlockIdExt block_id, td::Promise<td::Ref<Proof>> promise) {
1038
td::PromiseCreator::lambda([db = db_.get(), promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
1040
promise.set_error(R.move_as_error());
1042
auto handle = R.move_as_ok();
1043
td::actor::send_closure(db, &Db::get_block_proof, std::move(handle), std::move(promise));
1046
get_block_handle(block_id, false, std::move(P));
1049
void ValidatorManagerImpl::get_block_proof_link_from_db(ConstBlockHandle handle,
1050
td::Promise<td::Ref<ProofLink>> promise) {
1051
if (handle->inited_proof_link()) {
1052
td::actor::send_closure(db_, &Db::get_block_proof_link, std::move(handle), std::move(promise));
1053
} else if (handle->inited_proof()) {
1054
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<td::Ref<Proof>> R) mutable {
1056
promise.set_error(R.move_as_error());
1058
promise.set_result(R.move_as_ok()->export_as_proof_link());
1061
td::actor::send_closure(db_, &Db::get_block_proof, std::move(handle), std::move(P));
1063
promise.set_error(td::Status::Error(ErrorCode::notready, "not in db"));
1067
void ValidatorManagerImpl::get_block_proof_link_from_db_short(BlockIdExt block_id,
1068
td::Promise<td::Ref<ProofLink>> promise) {
1069
auto P = td::PromiseCreator::lambda(
1070
[SelfId = actor_id(this), promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
1072
promise.set_error(R.move_as_error());
1074
auto handle = R.move_as_ok();
1075
td::actor::send_closure(SelfId, &ValidatorManager::get_block_proof_link_from_db, std::move(handle),
1076
std::move(promise));
1079
get_block_handle(block_id, false, std::move(P));
1082
void ValidatorManagerImpl::get_block_by_lt_from_db(AccountIdPrefixFull account, LogicalTime lt,
1083
td::Promise<ConstBlockHandle> promise) {
1084
td::actor::send_closure(db_, &Db::get_block_by_lt, account, lt, std::move(promise));
1087
void ValidatorManagerImpl::get_block_by_unix_time_from_db(AccountIdPrefixFull account, UnixTime ts,
1088
td::Promise<ConstBlockHandle> promise) {
1089
td::actor::send_closure(db_, &Db::get_block_by_unix_time, account, ts, std::move(promise));
1092
void ValidatorManagerImpl::get_block_by_seqno_from_db(AccountIdPrefixFull account, BlockSeqno seqno,
1093
td::Promise<ConstBlockHandle> promise) {
1094
td::actor::send_closure(db_, &Db::get_block_by_seqno, account, seqno, std::move(promise));
1097
void ValidatorManagerImpl::finished_wait_state(BlockHandle handle, td::Result<td::Ref<ShardState>> R) {
1099
block_state_cache_[handle->id()] = {R.ok(), td::Timestamp::in(30.0)};
1101
auto it = wait_state_.find(handle->id());
1102
if (it != wait_state_.end()) {
1104
auto S = R.move_as_error();
1105
if (S.code() != ErrorCode::timeout) {
1106
for (auto &X : it->second.waiting_) {
1107
X.promise.set_error(S.clone());
1109
} else if (it->second.waiting_.size() != 0) {
1110
auto X = it->second.get_timeout();
1111
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Ref<ShardState>> R) {
1112
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_state, handle, std::move(R));
1114
auto id = td::actor::create_actor<WaitBlockState>("waitstate", handle, X.second, actor_id(this), X.first,
1117
it->second.actor_ = id;
1121
auto r = R.move_as_ok();
1122
for (auto &X : it->second.waiting_) {
1123
X.promise.set_result(r);
1126
wait_state_.erase(it);
1130
void ValidatorManagerImpl::finished_wait_data(BlockHandle handle, td::Result<td::Ref<BlockData>> R) {
1131
auto it = wait_block_data_.find(handle->id());
1132
if (it != wait_block_data_.end()) {
1134
auto S = R.move_as_error();
1135
if (S.code() != ErrorCode::timeout) {
1136
for (auto &X : it->second.waiting_) {
1137
X.promise.set_error(S.clone());
1140
auto X = it->second.get_timeout();
1141
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Ref<BlockData>> R) {
1142
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_data, handle, std::move(R));
1145
td::actor::create_actor<WaitBlockData>("waitdata", handle, X.second, actor_id(this), X.first, std::move(P))
1147
it->second.actor_ = id;
1151
auto r = R.move_as_ok();
1152
for (auto &X : it->second.waiting_) {
1153
X.promise.set_result(r);
1156
wait_block_data_.erase(it);
1160
void ValidatorManagerImpl::set_block_state(BlockHandle handle, td::Ref<ShardState> state,
1161
td::Promise<td::Ref<ShardState>> promise) {
1162
auto P = td::PromiseCreator::lambda(
1163
[SelfId = actor_id(this), handle, promise = std::move(promise)](td::Result<td::Ref<ShardState>> R) mutable {
1165
promise.set_error(R.move_as_error());
1167
promise.set_value(R.move_as_ok());
1168
td::actor::send_closure(SelfId, &ValidatorManagerImpl::written_handle, std::move(handle), [](td::Unit) {});
1171
td::actor::send_closure(db_, &Db::store_block_state, handle, state, std::move(P));
1174
void ValidatorManagerImpl::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) {
1175
td::actor::send_closure(db_, &Db::get_cell_db_reader, std::move(promise));
1178
void ValidatorManagerImpl::store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id,
1179
td::BufferSlice state, td::Promise<td::Unit> promise) {
1180
td::actor::send_closure(db_, &Db::store_persistent_state_file, block_id, masterchain_block_id, std::move(state),
1181
std::move(promise));
1184
void ValidatorManagerImpl::store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
1185
std::function<td::Status(td::FileFd&)> write_data,
1186
td::Promise<td::Unit> promise) {
1187
td::actor::send_closure(db_, &Db::store_persistent_state_file_gen, block_id, masterchain_block_id,
1188
std::move(write_data), std::move(promise));
1191
void ValidatorManagerImpl::store_zero_state_file(BlockIdExt block_id, td::BufferSlice state,
1192
td::Promise<td::Unit> promise) {
1193
td::actor::send_closure(db_, &Db::store_zero_state_file, block_id, std::move(state), std::move(promise));
1196
void ValidatorManagerImpl::set_block_data(BlockHandle handle, td::Ref<BlockData> data, td::Promise<td::Unit> promise) {
1197
auto P = td::PromiseCreator::lambda(
1198
[SelfId = actor_id(this), data, handle, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
1200
promise.set_error(R.move_as_error());
1202
promise.set_value(td::Unit());
1203
td::actor::send_closure(SelfId, &ValidatorManagerImpl::written_handle, std::move(handle), [](td::Unit) {});
1206
td::actor::send_closure(db_, &Db::store_block_data, handle, std::move(data), std::move(P));
1209
void ValidatorManagerImpl::set_block_proof(BlockHandle handle, td::Ref<Proof> proof, td::Promise<td::Unit> promise) {
1210
auto P = td::PromiseCreator::lambda(
1211
[SelfId = actor_id(this), handle, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
1213
promise.set_error(R.move_as_error());
1215
promise.set_value(td::Unit());
1216
td::actor::send_closure(SelfId, &ValidatorManagerImpl::written_handle, std::move(handle), [](td::Unit) {});
1219
td::actor::send_closure(db_, &Db::store_block_proof, handle, std::move(proof), std::move(P));
1222
void ValidatorManagerImpl::set_block_proof_link(BlockHandle handle, td::Ref<ProofLink> proof,
1223
td::Promise<td::Unit> promise) {
1224
auto P = td::PromiseCreator::lambda(
1225
[SelfId = actor_id(this), handle, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
1227
promise.set_error(R.move_as_error());
1229
promise.set_value(td::Unit());
1230
td::actor::send_closure(SelfId, &ValidatorManagerImpl::written_handle, std::move(handle), [](td::Unit) {});
1233
td::actor::send_closure(db_, &Db::store_block_proof_link, handle, std::move(proof), std::move(P));
1236
/*void ValidatorManagerImpl::set_zero_state(ZeroStateIdExt id, td::Ref<ShardState> state, td::Promise<td::Unit> promise) {
1237
td::actor::send_closure(db_, &Db::store_zero_state, id, std::move(state), std::move(promise));
1240
void ValidatorManagerImpl::set_block_signatures(BlockHandle handle, td::Ref<BlockSignatureSet> signatures,
1241
td::Promise<td::Unit> promise) {
1242
td::actor::send_closure(db_, &Db::store_block_signatures, handle, std::move(signatures), std::move(promise));
1245
void ValidatorManagerImpl::set_next_block(BlockIdExt block_id, BlockIdExt next, td::Promise<td::Unit> promise) {
1246
auto P = td::PromiseCreator::lambda(
1247
[SelfId = actor_id(this), next, promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
1249
promise.set_error(R.move_as_error());
1251
auto handle = R.move_as_ok();
1252
handle->set_next(next);
1253
if (handle->need_flush()) {
1254
handle->flush(SelfId, handle, std::move(promise));
1256
promise.set_value(td::Unit());
1260
get_block_handle(block_id, true, std::move(P));
1263
void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno,
1264
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) {
1265
if (!candidates_buffer_.empty()) {
1266
td::actor::send_closure(candidates_buffer_, &CandidatesBuffer::add_new_candidate, id,
1267
PublicKey{pubkeys::Ed25519{candidate.pubkey.as_bits256()}}, candidate.collated_file_hash);
1269
if (!id.is_masterchain()) {
1270
add_cached_block_candidate(ReceivedBlock{id, candidate.data.clone()});
1271
callback_->send_block_candidate(id, cc_seqno, validator_set_hash, candidate.data.clone());
1273
td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise));
1276
void ValidatorManagerImpl::write_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
1277
auto P = td::PromiseCreator::lambda(
1278
[SelfId = actor_id(this), handle, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
1280
promise.set_error(R.move_as_error());
1282
td::actor::send_closure(SelfId, &ValidatorManagerImpl::written_handle, std::move(handle), std::move(promise));
1285
td::actor::send_closure(db_, &Db::store_block_handle, std::move(handle), std::move(P));
1288
void ValidatorManagerImpl::written_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
1289
bool received = handle->received();
1290
bool inited_state = handle->received_state();
1291
bool inited_proof = handle->id().is_masterchain() ? handle->inited_proof() : handle->inited_proof_link();
1293
if (handle->need_flush()) {
1294
handle->flush(actor_id(this), handle, std::move(promise));
1298
if (received && inited_proof) {
1299
auto it = wait_block_data_.find(handle->id());
1300
if (it != wait_block_data_.end()) {
1301
td::actor::send_closure(it->second.actor_, &WaitBlockData::force_read_from_db);
1305
auto it = wait_state_.find(handle->id());
1306
if (it != wait_state_.end()) {
1307
td::actor::send_closure(it->second.actor_, &WaitBlockState::force_read_from_db);
1310
if (handle->inited_proof_link()) {
1311
auto it = wait_state_.find(handle->id());
1312
if (it != wait_state_.end()) {
1313
td::actor::send_closure(it->second.actor_, &WaitBlockState::after_get_proof_link);
1316
if (handle->id().is_masterchain() && handle->inited_proof()) {
1317
auto it = wait_state_.find(handle->id());
1318
if (it != wait_state_.end()) {
1319
td::actor::send_closure(it->second.actor_, &WaitBlockState::after_get_proof);
1324
promise.set_value(td::Unit());
1327
void ValidatorManagerImpl::new_block_cont(BlockHandle handle, td::Ref<ShardState> state,
1328
td::Promise<td::Unit> promise) {
1329
if (state->get_shard().is_masterchain() && handle->id().id.seqno > last_masterchain_seqno_) {
1330
if (handle->id().id.seqno == last_masterchain_seqno_ + 1) {
1331
last_masterchain_seqno_ = handle->id().id.seqno;
1332
last_masterchain_state_ = td::Ref<MasterchainState>{state};
1333
last_masterchain_block_id_ = handle->id();
1334
last_masterchain_block_handle_ = handle;
1335
last_masterchain_block_handle_->set_processed();
1337
new_masterchain_block();
1339
promise.set_value(td::Unit());
1342
auto it = pending_masterchain_states_.find(last_masterchain_seqno_ + 1);
1343
if (it != pending_masterchain_states_.end()) {
1344
CHECK(it == pending_masterchain_states_.begin());
1345
last_masterchain_block_handle_ = std::move(std::get<0>(it->second));
1346
last_masterchain_state_ = std::move(std::get<1>(it->second));
1347
last_masterchain_block_id_ = last_masterchain_block_handle_->id();
1348
last_masterchain_seqno_ = last_masterchain_block_id_.id.seqno;
1349
CHECK(it->first == last_masterchain_seqno_);
1351
auto l_promise = std::move(std::get<2>(it->second));
1352
last_masterchain_block_handle_->set_processed();
1354
pending_masterchain_states_.erase(it);
1356
new_masterchain_block();
1358
for (auto &p : l_promise) {
1359
p.set_value(td::Unit());
1366
auto it = pending_masterchain_states_.find(handle->id().id.seqno);
1367
if (it != pending_masterchain_states_.end()) {
1368
std::get<2>(it->second).emplace_back(std::move(promise));
1370
std::vector<td::Promise<td::Unit>> v;
1371
v.emplace_back(std::move(promise));
1372
pending_masterchain_states_.emplace(
1373
handle->id().id.seqno,
1374
std::forward_as_tuple(handle, td::Ref<MasterchainState>{std::move(state)}, std::move(v)));
1378
handle->set_processed();
1379
promise.set_value(td::Unit());
1383
void ValidatorManagerImpl::new_block(BlockHandle handle, td::Ref<ShardState> state, td::Promise<td::Unit> promise) {
1384
if (handle->is_applied()) {
1385
return new_block_cont(std::move(handle), std::move(state), std::move(promise));
1387
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle, state = std::move(state),
1388
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
1390
promise.set_error(R.move_as_error());
1392
td::actor::send_closure(SelfId, &ValidatorManagerImpl::new_block_cont, std::move(handle), std::move(state),
1393
std::move(promise));
1396
td::actor::send_closure(db_, &Db::apply_block, handle, std::move(P));
1400
void ValidatorManagerImpl::get_block_handle(BlockIdExt id, bool force, td::Promise<BlockHandle> promise) {
1401
if (!id.is_valid()) {
1402
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "bad block id"));
1406
// updates LRU position if found
1407
auto B = get_handle_from_lru(id);
1409
CHECK(B->id() == id);
1410
promise.set_value(std::move(B));
1414
auto it = handles_.find(id);
1415
if (it != handles_.end()) {
1416
auto handle = it->second.lock();
1418
CHECK(handle->id() == id);
1419
promise.set_value(std::move(handle));
1426
auto it2 = wait_block_handle_.find(id);
1427
if (it2 != wait_block_handle_.end()) {
1428
it2->second.waiting_.emplace_back(std::move(promise));
1432
wait_block_handle_.emplace(id, WaitBlockHandle{});
1433
wait_block_handle_[id].waiting_.emplace_back(std::move(promise));
1435
auto P = td::PromiseCreator::lambda([id, force = true, SelfId = actor_id(this)](td::Result<BlockHandle> R) mutable {
1438
auto S = R.move_as_error();
1439
if (S.code() == ErrorCode::notready && force) {
1440
handle = create_empty_block_handle(id);
1442
LOG(FATAL) << "db error: failed to get block " << id << ": " << S;
1446
handle = R.move_as_ok();
1449
CHECK(handle->id() == id);
1450
td::actor::send_closure(SelfId, &ValidatorManagerImpl::register_block_handle, std::move(handle));
1453
td::actor::send_closure(db_, &Db::get_block_handle, id, std::move(P));
1456
void ValidatorManagerImpl::register_block_handle(BlockHandle handle) {
1457
CHECK(handles_.find(handle->id()) == handles_.end());
1458
handles_.emplace(handle->id(), std::weak_ptr<BlockHandleInterface>(handle));
1459
add_handle_to_lru(handle);
1461
auto it = wait_block_handle_.find(handle->id());
1462
CHECK(it != wait_block_handle_.end());
1463
for (auto &p : it->second.waiting_) {
1464
p.set_result(handle);
1466
wait_block_handle_.erase(it);
1470
void ValidatorManagerImpl::get_top_masterchain_state(td::Promise<td::Ref<MasterchainState>> promise) {
1471
if (last_masterchain_state_.is_null()) {
1472
promise.set_error(td::Status::Error(ton::ErrorCode::notready, "not started"));
1474
promise.set_result(last_masterchain_state_);
1478
td::Ref<MasterchainState> ValidatorManagerImpl::do_get_last_liteserver_state() {
1479
if (last_masterchain_state_.is_null()) {
1482
if (last_liteserver_state_.is_null()) {
1483
last_liteserver_state_ = last_masterchain_state_;
1484
return last_liteserver_state_;
1486
if (last_liteserver_state_->get_seqno() == last_masterchain_state_->get_seqno()) {
1487
return last_liteserver_state_;
1489
// If liteserver seqno (i.e. shard client) lags then use last masterchain state for liteserver
1490
// Allowed lag depends on the block rate
1491
double time_per_block = double(last_masterchain_state_->get_unix_time() - last_liteserver_state_->get_unix_time()) /
1492
double(last_masterchain_state_->get_seqno() - last_liteserver_state_->get_seqno());
1493
if (td::Clocks::system() - double(last_liteserver_state_->get_unix_time()) > std::min(time_per_block * 8, 180.0)) {
1494
last_liteserver_state_ = last_masterchain_state_;
1496
return last_liteserver_state_;
1499
void ValidatorManagerImpl::get_top_masterchain_block(td::Promise<BlockIdExt> promise) {
1500
if (!last_masterchain_block_id_.is_valid()) {
1501
promise.set_error(td::Status::Error(ton::ErrorCode::notready, "not started"));
1503
promise.set_result(last_masterchain_block_id_);
1507
void ValidatorManagerImpl::get_top_masterchain_state_block(
1508
td::Promise<std::pair<td::Ref<MasterchainState>, BlockIdExt>> promise) {
1509
if (last_masterchain_state_.is_null()) {
1510
promise.set_error(td::Status::Error(ton::ErrorCode::notready, "not started"));
1513
std::pair<td::Ref<MasterchainState>, BlockIdExt>{last_masterchain_state_, last_masterchain_block_id_});
1517
void ValidatorManagerImpl::get_last_liteserver_state_block(
1518
td::Promise<std::pair<td::Ref<MasterchainState>, BlockIdExt>> promise) {
1519
auto state = do_get_last_liteserver_state();
1520
if (state.is_null()) {
1521
promise.set_error(td::Status::Error(ton::ErrorCode::notready, "not started"));
1523
promise.set_result(std::pair<td::Ref<MasterchainState>, BlockIdExt>{state, state->get_block_id()});
1527
void ValidatorManagerImpl::send_get_block_request(BlockIdExt id, td::uint32 priority,
1528
td::Promise<ReceivedBlock> promise) {
1530
auto it = cached_block_candidates_.find(id);
1531
if (it != cached_block_candidates_.end()) {
1532
LOG(DEBUG) << "send_get_block_request: got result from candidates cache for " << id.to_str();
1533
return promise.set_value(it->second.clone());
1536
callback_->download_block(id, priority, td::Timestamp::in(10.0), std::move(promise));
1539
void ValidatorManagerImpl::send_get_zero_state_request(BlockIdExt id, td::uint32 priority,
1540
td::Promise<td::BufferSlice> promise) {
1541
callback_->download_zero_state(id, priority, td::Timestamp::in(10.0), std::move(promise));
1544
void ValidatorManagerImpl::send_get_persistent_state_request(BlockIdExt id, BlockIdExt masterchain_block_id,
1545
td::uint32 priority,
1546
td::Promise<td::BufferSlice> promise) {
1547
callback_->download_persistent_state(id, masterchain_block_id, priority, td::Timestamp::in(3600 * 3),
1548
std::move(promise));
1551
void ValidatorManagerImpl::send_get_block_proof_request(BlockIdExt block_id, td::uint32 priority,
1552
td::Promise<td::BufferSlice> promise) {
1553
callback_->download_block_proof(block_id, priority, td::Timestamp::in(10.0), std::move(promise));
1556
void ValidatorManagerImpl::send_get_block_proof_link_request(BlockIdExt block_id, td::uint32 priority,
1557
td::Promise<td::BufferSlice> promise) {
1558
if (!block_id.is_masterchain()) {
1559
auto it = cached_block_candidates_.find(block_id);
1560
if (it != cached_block_candidates_.end()) {
1561
// Proof link can be created from the cached block candidate
1562
LOG(DEBUG) << "send_get_block_proof_link_request: creating proof link from cached caniddate for "
1563
<< block_id.to_str();
1564
TRY_RESULT_PROMISE_PREFIX(promise, block_root, vm::std_boc_deserialize(it->second.data),
1565
"failed to create proof link: ");
1566
TRY_RESULT_PROMISE_PREFIX(promise, proof_link, WaitBlockData::generate_proof_link(it->second.id, block_root),
1567
"failed to create proof link: ");
1568
promise.set_result(std::move(proof_link));
1572
callback_->download_block_proof_link(block_id, priority, td::Timestamp::in(10.0), std::move(promise));
1575
void ValidatorManagerImpl::send_get_next_key_blocks_request(BlockIdExt block_id, td::uint32 priority,
1576
td::Promise<std::vector<BlockIdExt>> promise) {
1577
callback_->get_next_key_blocks(block_id, td::Timestamp::in(10.0), std::move(promise));
1580
void ValidatorManagerImpl::send_external_message(td::Ref<ExtMessage> message) {
1581
callback_->send_ext_message(message->shard(), message->serialize());
1582
add_external_message(std::move(message), 0);
1585
void ValidatorManagerImpl::send_ihr_message(td::Ref<IhrMessage> message) {
1586
callback_->send_ihr_message(message->shard(), message->serialize());
1589
void ValidatorManagerImpl::send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) {
1590
if (!resend_shard_blocks_at_) {
1591
resend_shard_blocks_at_ = td::Timestamp::in(td::Random::fast(0, 100) * 0.01 + 2.0);
1592
alarm_timestamp().relax(resend_shard_blocks_at_);
1594
auto it = out_shard_blocks_.find(ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()});
1595
if (it != out_shard_blocks_.end() && desc->block_id().id.seqno <= it->second->block_id().id.seqno) {
1596
VLOG(VALIDATOR_DEBUG) << "dropping duplicate top block description";
1598
out_shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}] = desc;
1599
callback_->send_shard_block_info(desc->block_id(), desc->catchain_seqno(), desc->serialize());
1603
void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) {
1604
callback_->send_broadcast(std::move(broadcast), custom_overlays_only);
1607
void ValidatorManagerImpl::start_up() {
1608
db_ = create_db_actor(actor_id(this), db_root_, opts_);
1609
lite_server_cache_ = create_liteserver_cache_actor(actor_id(this), db_root_);
1610
token_manager_ = td::actor::create_actor<TokenManager>("tokenmanager");
1611
td::mkdir(db_root_ + "/tmp/").ensure();
1612
td::mkdir(db_root_ + "/catchains/").ensure();
1615
td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::actor::ActorOwn<adnl::AdnlExtServer>> R) {
1617
td::actor::send_closure(SelfId, &ValidatorManagerImpl::created_ext_server, R.move_as_ok());
1619
td::actor::send_closure(adnl_, &adnl::Adnl::create_ext_server, std::vector<adnl::AdnlNodeIdShort>{},
1620
std::vector<td::uint16>{}, std::move(Q));
1622
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<ValidatorManagerInitResult> R) {
1624
td::actor::send_closure(SelfId, &ValidatorManagerImpl::started, R.move_as_ok());
1627
auto to_import_dir = db_root_ + "/import";
1628
auto S = td::WalkPath::run(to_import_dir, [&](td::CSlice cfname, td::WalkPath::Type t) -> void {
1629
auto fname = td::Slice(cfname);
1630
if (t == td::WalkPath::Type::NotDir) {
1631
auto d = fname.rfind(TD_DIR_SLASH);
1632
if (d != td::Slice::npos) {
1633
fname = fname.substr(d + 1);
1635
if (fname.size() <= 13) {
1638
if (fname.substr(fname.size() - 5) != ".pack") {
1641
fname = fname.substr(0, fname.size() - 5);
1642
if (fname.substr(0, 8) != "archive.") {
1645
fname = fname.substr(8);
1647
while (fname.size() > 1 && fname[0] == '0') {
1648
fname.remove_prefix(1);
1650
auto v = td::to_integer_safe<BlockSeqno>(fname);
1654
auto pos = v.move_as_ok();
1655
LOG(INFO) << "found archive slice '" << cfname << "' for position " << pos;
1656
to_import_[pos] = std::make_pair(cfname.str(), true);
1660
LOG(INFO) << "failed to load blocks from import dir: " << S;
1663
validator_manager_init(opts_, actor_id(this), db_.get(), std::move(P));
1665
check_waiters_at_ = td::Timestamp::in(1.0);
1666
alarm_timestamp().relax(check_waiters_at_);
1669
void ValidatorManagerImpl::started(ValidatorManagerInitResult R) {
1671
CHECK(R.state.not_null());
1672
last_masterchain_block_handle_ = std::move(R.handle);
1673
last_masterchain_block_id_ = last_masterchain_block_handle_->id();
1674
last_masterchain_seqno_ = last_masterchain_block_id_.id.seqno;
1675
last_masterchain_state_ = std::move(R.state);
1677
last_key_block_handle_ = std::move(R.last_key_block_handle_);
1678
last_known_key_block_handle_ = last_key_block_handle_;
1680
CHECK(last_masterchain_block_handle_->is_applied());
1681
if (last_known_key_block_handle_->inited_is_key_block()) {
1682
callback_->new_key_block(last_key_block_handle_);
1685
gc_masterchain_handle_ = std::move(R.gc_handle);
1686
gc_masterchain_state_ = std::move(R.gc_state);
1688
shard_client_ = std::move(R.clients);
1690
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::vector<ValidatorSessionId>> R) {
1692
if (R.error().code() == ErrorCode::notready) {
1693
td::actor::send_closure(SelfId, &ValidatorManagerImpl::read_gc_list, std::vector<ValidatorSessionId>{});
1695
LOG(FATAL) << "db error: " << R.move_as_error();
1698
td::actor::send_closure(SelfId, &ValidatorManagerImpl::read_gc_list, R.move_as_ok());
1701
td::actor::send_closure(db_, &Db::get_destroyed_validator_sessions, std::move(P));
1703
if (opts_->nonfinal_ls_queries_enabled()) {
1704
candidates_buffer_ = td::actor::create_actor<CandidatesBuffer>("candidates-buffer", actor_id(this));
1708
void ValidatorManagerImpl::read_gc_list(std::vector<ValidatorSessionId> list) {
1709
for (auto &v : list) {
1710
check_gc_list_.insert(v);
1713
new_masterchain_block();
1716
td::actor::create_actor<AsyncStateSerializer>("serializer", last_key_block_handle_->id(), opts_, actor_id(this));
1718
if (last_masterchain_block_handle_->inited_next_left()) {
1719
auto b = last_masterchain_block_handle_->one_next(true);
1720
if (opts_->is_hardfork(b) && !out_of_sync()) {
1721
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), b](td::Result<td::BufferSlice> R) {
1723
LOG(INFO) << "NO HARDFORK BLOCK IN STATIC FILES";
1724
td::actor::send_closure(SelfId, &ValidatorManagerImpl::applied_hardfork);
1728
auto dataR = create_block(b, R.move_as_ok());
1731
auto P = td::PromiseCreator::lambda([SelfId](td::Result<td::Unit> R) {
1733
td::actor::send_closure(SelfId, &ValidatorManagerImpl::applied_hardfork);
1735
run_hardfork_accept_block_query(b, dataR.move_as_ok(), SelfId, std::move(P));
1737
td::actor::send_closure(db_, &Db::try_get_static_file, b.file_hash, std::move(P));
1742
if (!out_of_sync()) {
1743
completed_prestart_sync();
1749
void ValidatorManagerImpl::applied_hardfork() {
1750
if (!out_of_sync()) {
1751
completed_prestart_sync();
1757
bool ValidatorManagerImpl::out_of_sync() {
1758
auto seqno = std::min(last_masterchain_seqno_, shard_client_handle_->id().seqno());
1759
if (seqno < opts_->sync_upto()) {
1762
if (shard_client_handle_->id().seqno() + 16 < last_masterchain_seqno_) {
1765
if (last_masterchain_block_handle_->unix_time() + 600 > td::Clocks::system()) {
1769
if (last_masterchain_seqno_ < last_known_key_block_handle_->id().seqno()) {
1773
bool masterchain_validator = false;
1774
if (!validator_groups_.size()) {
1775
auto val_set = last_masterchain_state_->get_validator_set(ShardIdFull{masterchainId});
1776
if (!get_validator(ShardIdFull{masterchainId}, val_set).is_zero()) {
1777
masterchain_validator = true;
1781
if ((masterchain_validator || validator_groups_.size() > 0) &&
1782
last_known_key_block_handle_->id().seqno() <= last_masterchain_seqno_) {
1785
LOG(INFO) << "groups=" << validator_groups_.size() << " seqno=" << last_known_key_block_handle_->id().seqno()
1786
<< " our_seqno=" << last_masterchain_seqno_;
1791
void ValidatorManagerImpl::prestart_sync() {
1792
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
1794
td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive);
1796
td::actor::send_closure(db_, &Db::set_async_mode, false, std::move(P));
1799
void ValidatorManagerImpl::download_next_archive() {
1800
if (!out_of_sync()) {
1801
finish_prestart_sync();
1805
auto seqno = std::min(last_masterchain_seqno_, shard_client_handle_->id().seqno());
1806
auto it = to_import_.upper_bound(seqno + 1);
1807
if (it != to_import_.begin()) {
1809
if (it->second.second) {
1810
it->second.second = false;
1811
downloaded_archive_slice(it->second.first, false);
1815
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::string> R) {
1817
LOG(INFO) << "failed to download archive slice: " << R.error();
1818
delay_action([SelfId]() { td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive); },
1819
td::Timestamp::in(2.0));
1821
td::actor::send_closure(SelfId, &ValidatorManagerImpl::downloaded_archive_slice, R.move_as_ok(), true);
1824
callback_->download_archive(seqno + 1, db_root_ + "/tmp/", td::Timestamp::in(36000.0), std::move(P));
1827
void ValidatorManagerImpl::downloaded_archive_slice(std::string name, bool is_tmp) {
1828
LOG(INFO) << "downloaded archive slice: " << name;
1829
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), name, is_tmp](td::Result<std::vector<BlockSeqno>> R) {
1831
td::unlink(name).ensure();
1834
LOG(INFO) << "failed to check downloaded archive slice: " << R.error();
1835
delay_action([SelfId]() { td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive); },
1836
td::Timestamp::in(2.0));
1838
td::actor::send_closure(SelfId, &ValidatorManagerImpl::checked_archive_slice, R.move_as_ok());
1842
auto seqno = std::min(last_masterchain_seqno_, shard_client_handle_->id().seqno());
1844
td::actor::create_actor<ArchiveImporter>("archiveimport", name, last_masterchain_state_, seqno, opts_, actor_id(this),
1849
void ValidatorManagerImpl::checked_archive_slice(std::vector<BlockSeqno> seqno) {
1850
CHECK(seqno.size() == 2);
1851
LOG(INFO) << "checked downloaded archive slice: mc_top_seqno=" << seqno[0] << " shard_top_seqno_=" << seqno[1];
1852
CHECK(seqno[0] <= last_masterchain_seqno_);
1853
CHECK(seqno[1] <= last_masterchain_seqno_);
1856
if (seqno[1] < last_masterchain_seqno_) {
1857
CHECK(last_masterchain_state_->get_old_mc_block_id(seqno[1], b));
1859
b = last_masterchain_block_id_;
1862
auto P = td::PromiseCreator::lambda(
1863
[SelfId = actor_id(this), db = db_.get(), client = shard_client_.get()](td::Result<BlockHandle> R) {
1865
auto handle = R.move_as_ok();
1866
auto P = td::PromiseCreator::lambda([SelfId, client, handle](td::Result<td::Ref<ShardState>> R) mutable {
1867
auto P = td::PromiseCreator::lambda([SelfId](td::Result<td::Unit> R) {
1869
td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive);
1871
td::actor::send_closure(client, &ShardClient::force_update_shard_client_ex, std::move(handle),
1872
td::Ref<MasterchainState>{R.move_as_ok()}, std::move(P));
1874
td::actor::send_closure(db, &Db::get_block_state, std::move(handle), std::move(P));
1876
get_block_handle(b, true, std::move(P));
1879
void ValidatorManagerImpl::finish_prestart_sync() {
1882
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
1884
td::actor::send_closure(SelfId, &ValidatorManagerImpl::completed_prestart_sync);
1886
td::actor::send_closure(db_, &Db::set_async_mode, false, std::move(P));
1889
void ValidatorManagerImpl::completed_prestart_sync() {
1890
td::actor::send_closure(shard_client_, &ShardClient::start);
1892
send_peek_key_block_request();
1894
LOG(WARNING) << "initial read complete: " << last_masterchain_block_handle_->id() << " "
1895
<< last_masterchain_block_id_;
1896
callback_->initial_read_complete(last_masterchain_block_handle_);
1899
void ValidatorManagerImpl::new_masterchain_block() {
1900
if (last_masterchain_seqno_ > 0 && last_masterchain_block_handle_->is_key_block()) {
1901
last_key_block_handle_ = last_masterchain_block_handle_;
1902
if (last_key_block_handle_->id().seqno() > last_known_key_block_handle_->id().seqno()) {
1903
last_known_key_block_handle_ = last_key_block_handle_;
1904
callback_->new_key_block(last_key_block_handle_);
1909
update_shard_blocks();
1911
if (!shard_client_.empty()) {
1912
td::actor::send_closure(shard_client_, &ShardClient::new_masterchain_block_notification,
1913
last_masterchain_block_handle_, last_masterchain_state_);
1916
if (last_masterchain_seqno_ % 1024 == 0) {
1917
LOG(WARNING) << "applied masterchain block " << last_masterchain_block_id_;
1921
void ValidatorManagerImpl::update_shards() {
1922
if ((last_masterchain_state_->rotated_all_shards() || last_masterchain_seqno_ == 0) &&
1923
opts_->get_last_fork_masterchain_seqno() <= last_masterchain_seqno_) {
1924
allow_validate_ = true;
1926
auto exp_vec = last_masterchain_state_->get_shards();
1927
auto config = last_masterchain_state_->get_consensus_config();
1928
validatorsession::ValidatorSessionOptions opts{config};
1929
td::uint32 threshold = 9407194;
1930
bool force_group_id_upgrade = last_masterchain_seqno_ == threshold;
1931
auto legacy_opts_hash = opts.get_hash();
1932
if (last_masterchain_seqno_ >= threshold) { //TODO move to get_consensus_config()
1933
opts.proto_version = std::max<td::uint32>(opts.proto_version, 1);
1935
auto opts_hash = opts.get_hash();
1937
std::map<ShardIdFull, std::vector<BlockIdExt>> new_shards;
1938
std::set<ShardIdFull> future_shards;
1940
auto cur_time = static_cast<UnixTime>(td::Clocks::system());
1942
for (auto &v : exp_vec) {
1943
auto shard = v->shard();
1944
if (v->before_split()) {
1945
CHECK(!v->before_merge());
1946
ShardIdFull l_shard{shard.workchain, shard_child(shard.shard, true)};
1947
ShardIdFull r_shard{shard.workchain, shard_child(shard.shard, false)};
1948
new_shards.emplace(l_shard, std::vector<BlockIdExt>{v->top_block_id()});
1949
new_shards.emplace(r_shard, std::vector<BlockIdExt>{v->top_block_id()});
1950
} else if (v->before_merge()) {
1951
ShardIdFull p_shard{shard.workchain, shard_parent(shard.shard)};
1952
auto it = new_shards.find(p_shard);
1953
if (it == new_shards.end()) {
1954
new_shards.emplace(p_shard, std::vector<BlockIdExt>(2));
1957
bool left = shard_child(p_shard.shard, true) == shard.shard;
1958
new_shards[p_shard][left ? 0 : 1] = v->top_block_id();
1960
new_shards.emplace(shard, std::vector<BlockIdExt>{v->top_block_id()});
1962
switch (v->fsm_state()) {
1963
case McShardHash::FsmState::fsm_none: {
1964
future_shards.insert(shard);
1967
case McShardHash::FsmState::fsm_split: {
1968
if (v->fsm_utime() < cur_time + 60) {
1969
ShardIdFull l_shard{shard.workchain, shard_child(shard.shard, true)};
1970
ShardIdFull r_shard{shard.workchain, shard_child(shard.shard, false)};
1971
future_shards.insert(l_shard);
1972
future_shards.insert(r_shard);
1974
future_shards.insert(shard);
1978
case McShardHash::FsmState::fsm_merge: {
1979
if (v->fsm_utime() < cur_time + 60) {
1980
ShardIdFull p_shard{shard.workchain, shard_parent(shard.shard)};
1981
future_shards.insert(p_shard);
1983
future_shards.insert(shard);
1988
LOG(FATAL) << "state=" << static_cast<td::uint32>(v->fsm_state());
1992
new_shards.emplace(ShardIdFull{masterchainId, shardIdAll}, std::vector<BlockIdExt>{last_masterchain_block_id_});
1993
future_shards.insert(ShardIdFull{masterchainId, shardIdAll});
1995
VLOG(VALIDATOR_DEBUG) << "total shards=" << new_shards.size() << " config shards=" << exp_vec.size();
1997
std::map<ValidatorSessionId, ValidatorGroupEntry> new_validator_groups_;
1998
std::map<ValidatorSessionId, ValidatorGroupEntry> new_next_validator_groups_;
2000
bool force_recover = false;
2002
auto val_set = last_masterchain_state_->get_validator_set(ShardIdFull{masterchainId});
2003
auto r = opts_->check_unsafe_catchain_rotate(last_masterchain_seqno_, val_set->get_catchain_seqno());
2004
force_recover = r > 0;
2007
BlockSeqno key_seqno = last_key_block_handle_->id().seqno();
2009
if (force_group_id_upgrade) {
2010
for (auto &desc : new_shards) {
2011
auto shard = desc.first;
2012
auto prev = desc.second;
2013
for (auto &p : prev) {
2014
CHECK(p.is_valid());
2016
auto val_set = last_masterchain_state_->get_validator_set(shard);
2017
auto validator_id = get_validator(shard, val_set);
2019
if (!validator_id.is_zero()) {
2020
auto legacy_val_group_id = get_validator_set_id(shard, val_set, legacy_opts_hash, key_seqno, opts);
2021
auto val_group_id = get_validator_set_id(shard, val_set, opts_hash, key_seqno, opts);
2024
auto it = validator_groups_.find(legacy_val_group_id);
2025
if (it != validator_groups_.end()) {
2026
new_validator_groups_.emplace(val_group_id, std::move(it->second));
2028
auto it2 = next_validator_groups_.find(legacy_val_group_id);
2029
if (it2 != next_validator_groups_.end()) {
2030
if (!it2->second.actor.empty()) {
2031
td::actor::send_closure(it2->second.actor, &ValidatorGroup::start, prev, last_masterchain_block_id_,
2032
last_masterchain_state_->get_unix_time());
2034
new_validator_groups_.emplace(val_group_id, std::move(it2->second));
2036
auto G = create_validator_group(val_group_id, shard, val_set, opts, started_);
2038
td::actor::send_closure(G, &ValidatorGroup::start, prev, last_masterchain_block_id_,
2039
last_masterchain_state_->get_unix_time());
2041
new_validator_groups_.emplace(val_group_id, ValidatorGroupEntry{std::move(G), shard});
2048
if (allow_validate_) {
2049
for (auto &desc : new_shards) {
2050
auto shard = desc.first;
2051
if (force_recover && !desc.first.is_masterchain()) {
2054
auto prev = desc.second;
2055
for (auto &p : prev) {
2056
CHECK(p.is_valid());
2058
auto val_set = last_masterchain_state_->get_validator_set(shard);
2059
auto x = val_set->export_vector();
2061
auto validator_id = get_validator(shard, val_set);
2063
if (!validator_id.is_zero()) {
2064
auto val_group_id = get_validator_set_id(shard, val_set, opts_hash, key_seqno, opts);
2066
if (force_recover) {
2067
auto r = opts_->check_unsafe_catchain_rotate(last_masterchain_seqno_, val_set->get_catchain_seqno());
2070
td::MutableSlice x{b, 36};
2071
x.copy_from(val_group_id.as_slice());
2072
x.remove_prefix(32);
2073
CHECK(x.size() == 4);
2074
x.copy_from(td::Slice(reinterpret_cast<const td::uint8 *>(&r), 4));
2075
val_group_id = sha256_bits256(td::Slice(b, 36));
2079
VLOG(VALIDATOR_DEBUG) << "validating group " << val_group_id;
2080
auto it = validator_groups_.find(val_group_id);
2081
if (it != validator_groups_.end()) {
2082
new_validator_groups_.emplace(val_group_id, std::move(it->second));
2084
auto it2 = next_validator_groups_.find(val_group_id);
2085
if (it2 != next_validator_groups_.end()) {
2086
if (!it2->second.actor.empty()) {
2087
td::actor::send_closure(it2->second.actor, &ValidatorGroup::start, prev, last_masterchain_block_id_,
2088
last_masterchain_state_->get_unix_time());
2090
new_validator_groups_.emplace(val_group_id, std::move(it2->second));
2092
auto G = create_validator_group(val_group_id, shard, val_set, opts, started_);
2094
td::actor::send_closure(G, &ValidatorGroup::start, prev, last_masterchain_block_id_,
2095
last_masterchain_state_->get_unix_time());
2097
new_validator_groups_.emplace(val_group_id, ValidatorGroupEntry{std::move(G), shard});
2103
for (auto &shard : future_shards) {
2104
auto val_set = last_masterchain_state_->get_next_validator_set(shard);
2105
if (val_set.is_null()) {
2109
auto validator_id = get_validator(shard, val_set);
2110
if (!validator_id.is_zero()) {
2111
auto val_group_id = get_validator_set_id(shard, val_set, opts_hash, key_seqno, opts);
2112
auto it = next_validator_groups_.find(val_group_id);
2113
if (it != next_validator_groups_.end()) {
2114
//CHECK(!it->second.empty());
2115
new_next_validator_groups_.emplace(val_group_id, std::move(it->second));
2117
new_next_validator_groups_.emplace(
2119
ValidatorGroupEntry{create_validator_group(val_group_id, shard, val_set, opts, started_), shard});
2124
std::vector<td::actor::ActorId<ValidatorGroup>> gc;
2125
for (auto &v : validator_groups_) {
2126
if (!v.second.actor.empty()) {
2127
gc_list_.push_back(v.first);
2128
gc.push_back(v.second.actor.release());
2131
for (auto &v : next_validator_groups_) {
2132
if (!v.second.actor.empty()) {
2133
gc_list_.push_back(v.first);
2134
gc.push_back(v.second.actor.release());
2138
validator_groups_ = std::move(new_validator_groups_);
2139
next_validator_groups_ = std::move(new_next_validator_groups_);
2141
if (last_masterchain_state_->rotated_all_shards()) {
2143
check_gc_list_.clear();
2144
CHECK(last_masterchain_block_handle_->received_state());
2145
auto P = td::PromiseCreator::lambda(
2146
[SelfId = actor_id(this), gc = std::move(gc), block_id = last_masterchain_block_id_](td::Result<td::Unit> R) {
2148
td::actor::send_closure(SelfId, &ValidatorManagerImpl::written_destroyed_validator_sessions, std::move(gc));
2149
td::actor::send_closure(SelfId, &ValidatorManagerImpl::updated_init_block, block_id);
2151
td::actor::send_closure(db_, &Db::update_init_masterchain_block, last_masterchain_block_id_, std::move(P));
2153
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), gc = std::move(gc)](td::Result<td::Unit> R) {
2155
td::actor::send_closure(SelfId, &ValidatorManagerImpl::written_destroyed_validator_sessions, std::move(gc));
2157
td::actor::send_closure(db_, &Db::update_destroyed_validator_sessions, gc_list_, std::move(P));
2159
} // namespace validator
2161
void ValidatorManagerImpl::written_destroyed_validator_sessions(std::vector<td::actor::ActorId<ValidatorGroup>> list) {
2162
for (auto &v : list) {
2163
td::actor::send_closure(v, &ValidatorGroup::destroy);
2167
void ValidatorManagerImpl::update_shard_blocks() {
2169
auto it = shard_blocks_.begin();
2170
while (it != shard_blocks_.end()) {
2171
auto &B = it->second;
2172
if (!B->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
2174
shard_blocks_.erase(it2);
2182
auto it = out_shard_blocks_.begin();
2183
while (it != out_shard_blocks_.end()) {
2184
auto &B = it->second;
2185
if (!B->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
2187
out_shard_blocks_.erase(it2);
2195
ValidatorSessionId ValidatorManagerImpl::get_validator_set_id(ShardIdFull shard, td::Ref<ValidatorSet> val_set,
2196
td::Bits256 opts_hash, BlockSeqno last_key_block_seqno,
2197
const validatorsession::ValidatorSessionOptions &opts) {
2198
std::vector<tl_object_ptr<ton_api::validator_groupMember>> vec;
2199
auto v = val_set->export_vector();
2200
auto vert_seqno = opts_->get_maximal_vertical_seqno();
2202
auto pub_key = PublicKey{pubkeys::Ed25519{n.key}};
2204
create_tl_object<ton_api::validator_groupMember>(pub_key.compute_short_id().bits256_value(), n.addr, n.weight));
2206
if (!opts.new_catchain_ids) {
2207
if (vert_seqno == 0) {
2208
return create_hash_tl_object<ton_api::validator_group>(shard.workchain, shard.shard,
2209
val_set->get_catchain_seqno(), opts_hash, std::move(vec));
2211
return create_hash_tl_object<ton_api::validator_groupEx>(
2212
shard.workchain, shard.shard, vert_seqno, val_set->get_catchain_seqno(), opts_hash, std::move(vec));
2215
return create_hash_tl_object<ton_api::validator_groupNew>(shard.workchain, shard.shard, vert_seqno,
2216
last_key_block_seqno, val_set->get_catchain_seqno(),
2217
opts_hash, std::move(vec));
2221
td::actor::ActorOwn<ValidatorGroup> ValidatorManagerImpl::create_validator_group(
2222
ValidatorSessionId session_id, ShardIdFull shard, td::Ref<ValidatorSet> validator_set,
2223
validatorsession::ValidatorSessionOptions opts, bool init_session) {
2224
if (check_gc_list_.count(session_id) == 1) {
2225
return td::actor::ActorOwn<ValidatorGroup>{};
2227
// Call get_external_messages to cleanup mempool for the shard
2228
get_external_messages(shard, [](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
2230
auto validator_id = get_validator(shard, validator_set);
2231
CHECK(!validator_id.is_zero());
2232
auto G = td::actor::create_actor<ValidatorGroup>(
2233
"validatorgroup", shard, validator_id, session_id, validator_set, opts, keyring_, adnl_, rldp_, overlays_,
2234
db_root_, actor_id(this), init_session,
2235
opts_->check_unsafe_resync_allowed(validator_set->get_catchain_seqno()), opts_);
2240
void ValidatorManagerImpl::add_handle_to_lru(BlockHandle handle) {
2241
auto it = handle_lru_map_.find(handle->id());
2242
if (it != handle_lru_map_.end()) {
2243
CHECK(it->second->handle() == handle);
2244
it->second->remove();
2245
handle_lru_.put(it->second.get());
2247
auto id = handle->id();
2248
auto x = std::make_unique<BlockHandleLru>(std::move(handle));
2249
handle_lru_.put(x.get());
2250
handle_lru_map_.emplace(id, std::move(x));
2252
if (handle_lru_size_ > handle_lru_max_size_) {
2253
auto to_remove = BlockHandleLru::from_list_node(handle_lru_.get());
2255
CHECK(handle_lru_map_.count(to_remove->handle()->id()) == 1);
2256
handle_lru_map_.erase(to_remove->handle()->id());
2262
BlockHandle ValidatorManagerImpl::get_handle_from_lru(BlockIdExt id) {
2263
auto it = handle_lru_map_.find(id);
2264
if (it != handle_lru_map_.end()) {
2265
it->second->remove();
2266
handle_lru_.put(it->second.get());
2267
auto handle = it->second->handle();
2268
CHECK(handle->id() == id);
2275
void ValidatorManagerImpl::try_advance_gc_masterchain_block() {
2276
if (gc_masterchain_handle_ && last_masterchain_seqno_ > 0 && !gc_advancing_ &&
2277
gc_masterchain_handle_->inited_next_left() &&
2278
gc_masterchain_handle_->id().id.seqno < last_rotate_block_id_.id.seqno &&
2279
gc_masterchain_handle_->id().id.seqno < last_masterchain_state_->min_ref_masterchain_seqno() &&
2280
gc_masterchain_handle_->id().id.seqno + 1024 < last_masterchain_seqno_ &&
2281
gc_masterchain_handle_->id().id.seqno < last_masterchain_state_->last_key_block_id().seqno() &&
2282
gc_masterchain_handle_->id().id.seqno < min_confirmed_masterchain_seqno_ &&
2283
gc_masterchain_handle_->id().id.seqno < state_serializer_masterchain_seqno_ &&
2284
gc_masterchain_state_->get_unix_time() < td::Clocks::system() - state_ttl()) {
2285
gc_advancing_ = true;
2286
auto block_id = gc_masterchain_handle_->one_next(true);
2288
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
2290
td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_gc_masterchain_handle, R.move_as_ok());
2292
get_block_handle(block_id, true, std::move(P));
2296
void ValidatorManagerImpl::allow_persistent_state_file_gc(BlockIdExt block_id, BlockIdExt masterchain_block_id,
2297
td::Promise<bool> promise) {
2298
if (!gc_masterchain_handle_) {
2299
promise.set_result(false);
2302
if (masterchain_block_id.seqno() == 0) {
2303
promise.set_result(false);
2306
if (masterchain_block_id.seqno() >= gc_masterchain_handle_->id().seqno()) {
2307
promise.set_result(false);
2310
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
2312
auto handle = R.move_as_ok();
2313
CHECK(handle->is_key_block());
2314
promise.set_result(ValidatorManager::persistent_state_ttl(handle->unix_time()) < td::Clocks::system());
2316
get_block_handle(masterchain_block_id, false, std::move(P));
2319
void ValidatorManagerImpl::allow_archive(BlockIdExt block_id, td::Promise<bool> promise) {
2320
/*if (!gc_masterchain_handle_) {
2321
promise.set_result(false);
2324
if (!block_id.is_masterchain()) {
2325
if (!gc_masterchain_state_->workchain_is_active(block_id.id.workchain)) {
2326
promise.set_result(false);
2330
auto S = gc_masterchain_state_->get_shard_from_config(block_id.shard_full());
2332
if (block_id.id.seqno >= S->top_block_id().id.seqno) {
2333
promise.set_result(false);
2338
auto shards = gc_masterchain_state_->get_shards();
2339
for (auto shard : shards) {
2340
if (shard_intersects(shard->shard(), block_id.shard_full())) {
2341
if (block_id.id.seqno >= shard->top_block_id().id.seqno) {
2342
promise.set_result(false);
2351
if (block_id.id.seqno >= gc_masterchain_handle_->id().id.seqno) {
2352
promise.set_result(false);
2356
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<td::Unit> R) mutable {
2358
promise.set_error(R.move_as_error());
2360
promise.set_result(true);
2363
td::actor::send_closure(db_, &Db::archive, block_id, std::move(P));*/
2364
promise.set_result(false);
2367
void ValidatorManagerImpl::allow_delete(BlockIdExt block_id, td::Promise<bool> promise) {
2368
auto key_ttl = td::Clocks::system() - opts_->key_proof_ttl();
2369
auto ttl = td::Clocks::system() - opts_->archive_ttl();
2370
auto P = td::PromiseCreator::lambda(
2371
[SelfId = actor_id(this), promise = std::move(promise), ttl, key_ttl](td::Result<BlockHandle> R) mutable {
2373
promise.set_result(true);
2376
auto handle = R.move_as_ok();
2377
if (!handle->inited_unix_time()) {
2378
promise.set_result(true);
2381
if (!handle->inited_is_key_block() || !handle->is_key_block()) {
2382
promise.set_result(handle->unix_time() <= ttl);
2384
promise.set_result(handle->unix_time() <= key_ttl);
2387
get_block_handle(block_id, false, std::move(P));
2390
void ValidatorManagerImpl::allow_block_state_gc(BlockIdExt block_id, td::Promise<bool> promise) {
2391
if (!gc_masterchain_handle_) {
2392
promise.set_result(false);
2395
if (block_id.is_masterchain()) {
2396
promise.set_result(block_id.id.seqno < gc_masterchain_handle_->id().id.seqno);
2399
if (!gc_masterchain_state_->workchain_is_active(block_id.id.workchain)) {
2400
promise.set_result(false);
2403
auto S = gc_masterchain_state_->get_shard_from_config(block_id.shard_full());
2405
promise.set_result(block_id.id.seqno < S->top_block_id().id.seqno);
2408
auto shards = gc_masterchain_state_->get_shards();
2409
for (auto shard : shards) {
2410
if (shard_intersects(shard->shard(), block_id.shard_full())) {
2411
promise.set_result(block_id.id.seqno < shard->top_block_id().id.seqno);
2418
void ValidatorManagerImpl::allow_block_info_gc(BlockIdExt block_id, td::Promise<bool> promise) {
2420
td::PromiseCreator::lambda([db = db_.get(), promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
2422
promise.set_result(false);
2424
auto handle = R.move_as_ok();
2425
if (!handle->moved_to_archive() || !handle->is_applied()) {
2426
promise.set_result(false);
2428
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<td::Unit> R) mutable {
2430
promise.set_result(true);
2432
td::actor::send_closure(db, &Db::store_block_handle, handle, std::move(P));
2436
get_block_handle(block_id, false, std::move(P));
2439
void ValidatorManagerImpl::got_next_gc_masterchain_handle(BlockHandle handle) {
2440
CHECK(gc_advancing_);
2441
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Ref<ShardState>> R) {
2443
if (R.error().code() == ErrorCode::timeout) {
2444
LOG(ERROR) << "Failed to get gc masterchain state, retrying: " << R.move_as_error();
2445
td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_gc_masterchain_handle, std::move(handle));
2447
LOG(FATAL) << "Failed to get gc masterchain state: " << R.move_as_error();
2451
td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_gc_masterchain_state, std::move(handle),
2452
td::Ref<MasterchainState>{R.move_as_ok()});
2454
wait_block_state(handle, 0, td::Timestamp::in(60.0), std::move(P));
2457
void ValidatorManagerImpl::got_next_gc_masterchain_state(BlockHandle handle, td::Ref<MasterchainState> state) {
2458
auto P = td::PromiseCreator::lambda([handle, state, SelfId = actor_id(this)](td::Result<td::Unit> R) {
2460
td::actor::send_closure(SelfId, &ValidatorManagerImpl::advance_gc, handle, state);
2462
update_gc_block_handle(std::move(handle), std::move(P));
2465
void ValidatorManagerImpl::update_gc_block_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
2466
td::actor::send_closure(db_, &Db::update_gc_masterchain_block, handle->id(), std::move(promise));
2469
void ValidatorManagerImpl::advance_gc(BlockHandle handle, td::Ref<MasterchainState> state) {
2470
CHECK(gc_advancing_);
2471
gc_advancing_ = false;
2472
gc_masterchain_handle_ = std::move(handle);
2473
gc_masterchain_state_ = std::move(state);
2474
try_advance_gc_masterchain_block();
2477
void ValidatorManagerImpl::update_shard_client_block_handle(BlockHandle handle, td::Ref<MasterchainState> state,
2478
td::Promise<td::Unit> promise) {
2479
shard_client_handle_ = std::move(handle);
2480
auto seqno = shard_client_handle_->id().seqno();
2481
if (state.not_null()) {
2482
shard_client_shards_ = state->get_shards();
2483
if (last_liteserver_state_.is_null() || last_liteserver_state_->get_block_id().seqno() < seqno) {
2484
last_liteserver_state_ = std::move(state);
2487
shard_client_update(seqno);
2488
promise.set_value(td::Unit());
2491
void ValidatorManagerImpl::shard_client_update(BlockSeqno seqno) {
2492
if (min_confirmed_masterchain_seqno_ < seqno) {
2493
min_confirmed_masterchain_seqno_ = seqno;
2497
while (shard_client_waiters_.size() > 0) {
2498
auto it = shard_client_waiters_.begin();
2499
if (it->first > min_confirmed_masterchain_seqno_) {
2502
for (auto &y : it->second.waiting_) {
2503
y.promise.set_value(td::Unit());
2505
shard_client_waiters_.erase(it);
2509
void ValidatorManagerImpl::state_serializer_update(BlockSeqno seqno) {
2510
if (state_serializer_masterchain_seqno_ < seqno) {
2511
state_serializer_masterchain_seqno_ = seqno;
2515
void ValidatorManagerImpl::alarm() {
2516
try_advance_gc_masterchain_block();
2517
alarm_timestamp() = td::Timestamp::in(1.0);
2518
if (shard_client_handle_ && gc_masterchain_handle_) {
2519
td::actor::send_closure(db_, &Db::run_gc, shard_client_handle_->unix_time(), gc_masterchain_handle_->unix_time(),
2520
static_cast<UnixTime>(opts_->archive_ttl()));
2522
if (log_status_at_.is_in_past()) {
2523
if (last_masterchain_block_handle_) {
2524
LOG(ERROR) << "STATUS: last_masterchain_block_ago="
2525
<< td::format::as_time(td::Clocks::system() - last_masterchain_block_handle_->unix_time())
2526
<< " last_known_key_block_ago="
2527
<< td::format::as_time(td::Clocks::system() - (last_known_key_block_handle_->inited_unix_time()
2528
? last_known_key_block_handle_->unix_time()
2530
<< " shard_client_ago="
2531
<< td::format::as_time(td::Clocks::system() -
2532
(shard_client_handle_ ? shard_client_handle_->unix_time() : 0));
2534
log_status_at_ = td::Timestamp::in(60.0);
2536
alarm_timestamp().relax(log_status_at_);
2537
if (resend_shard_blocks_at_ && resend_shard_blocks_at_.is_in_past()) {
2538
resend_shard_blocks_at_ = td::Timestamp::never();
2539
for (auto &B : out_shard_blocks_) {
2540
callback_->send_shard_block_info(B.second->block_id(), B.second->catchain_seqno(), B.second->serialize());
2542
if (out_shard_blocks_.size() > 0) {
2543
resend_shard_blocks_at_ = td::Timestamp::in(td::Random::fast(0, 100) * 0.01 + 2);
2546
alarm_timestamp().relax(resend_shard_blocks_at_);
2547
if (check_waiters_at_.is_in_past()) {
2548
check_waiters_at_ = td::Timestamp::in(1.0);
2549
for (auto &w : wait_block_data_) {
2550
w.second.check_timers();
2552
for (auto &w : wait_state_) {
2553
w.second.check_timers();
2555
for (auto &w : shard_client_waiters_) {
2556
w.second.check_timers();
2558
for (auto it = block_state_cache_.begin(); it != block_state_cache_.end();) {
2559
bool del = it->second.ttl_.is_in_past();
2561
auto block_id = it->first;
2562
if (block_id.is_masterchain()) {
2563
if (block_id.seqno() == last_masterchain_seqno_) {
2564
it->second.ttl_ = td::Timestamp::in(30.0);
2567
} else if (last_masterchain_state_.not_null()) {
2568
auto shard = last_masterchain_state_->get_shard_from_config(block_id.shard_full());
2569
if (shard.not_null()) {
2570
if (block_id.seqno() == shard->top_block_id().seqno()) {
2571
it->second.ttl_ = td::Timestamp::in(30.0);
2578
it = block_state_cache_.erase(it);
2584
alarm_timestamp().relax(check_waiters_at_);
2585
if (check_shard_clients_.is_in_past()) {
2586
check_shard_clients_ = td::Timestamp::in(10.0);
2588
if (!serializer_.empty()) {
2589
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockSeqno> R) {
2591
VLOG(VALIDATOR_WARNING) << "failed to get shard client status: " << R.move_as_error();
2593
td::actor::send_closure(SelfId, &ValidatorManagerImpl::state_serializer_update, R.move_as_ok());
2596
td::actor::send_closure(serializer_, &AsyncStateSerializer::get_masterchain_seqno, std::move(P));
2599
alarm_timestamp().relax(check_shard_clients_);
2601
if (log_ls_stats_at_.is_in_past()) {
2602
if (!ls_stats_.empty() || ls_stats_check_ext_messages_ != 0) {
2603
td::StringBuilder sb;
2604
sb << "Liteserver stats (1 minute):";
2605
td::uint32 total = 0;
2606
for (const auto &p : ls_stats_) {
2607
sb << " " << lite_query_name_by_id(p.first) << ":" << p.second;
2611
sb << " TOTAL:" << total;
2613
if (ls_stats_check_ext_messages_ > 0) {
2614
sb << " checkExtMessage:" << ls_stats_check_ext_messages_;
2616
LOG(WARNING) << sb.as_cslice();
2619
ls_stats_check_ext_messages_ = 0;
2620
log_ls_stats_at_ = td::Timestamp::in(60.0);
2622
alarm_timestamp().relax(log_ls_stats_at_);
2623
if (cleanup_mempool_at_.is_in_past()) {
2624
if (is_validator()) {
2625
get_external_messages(ShardIdFull{masterchainId, shardIdAll},
2626
[](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
2627
get_external_messages(ShardIdFull{basechainId, shardIdAll},
2628
[](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
2630
cleanup_mempool_at_ = td::Timestamp::in(250.0);
2632
alarm_timestamp().relax(cleanup_mempool_at_);
2635
void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) {
2636
td::actor::send_closure(db_, &Db::update_shard_client_state, masterchain_block_id, std::move(promise));
2639
void ValidatorManagerImpl::get_shard_client_state(bool from_db, td::Promise<BlockIdExt> promise) {
2640
if (shard_client_handle_ && !from_db) {
2641
promise.set_result(shard_client_handle_->id());
2643
td::actor::send_closure(db_, &Db::get_shard_client_state, std::move(promise));
2647
void ValidatorManagerImpl::subscribe_to_shard(ShardIdFull shard) {
2648
callback_->add_shard(shard);
2651
void ValidatorManagerImpl::update_async_serializer_state(AsyncSerializerState state, td::Promise<td::Unit> promise) {
2652
td::actor::send_closure(db_, &Db::update_async_serializer_state, std::move(state), std::move(promise));
2655
void ValidatorManagerImpl::get_async_serializer_state(td::Promise<AsyncSerializerState> promise) {
2656
td::actor::send_closure(db_, &Db::get_async_serializer_state, std::move(promise));
2659
void ValidatorManagerImpl::try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) {
2660
td::actor::send_closure(db_, &Db::try_get_static_file, file_hash, std::move(promise));
2663
void ValidatorManagerImpl::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::uint64> promise) {
2664
if (masterchain_seqno > last_masterchain_seqno_) {
2665
promise.set_error(td::Status::Error(ErrorCode::notready, "masterchain seqno too big"));
2668
td::actor::send_closure(db_, &Db::get_archive_id, masterchain_seqno, std::move(promise));
2671
void ValidatorManagerImpl::get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit,
2672
td::Promise<td::BufferSlice> promise) {
2673
td::actor::send_closure(db_, &Db::get_archive_slice, archive_id, offset, limit, std::move(promise));
2676
bool ValidatorManagerImpl::is_validator() {
2677
return temp_keys_.size() > 0 || permanent_keys_.size() > 0;
2680
PublicKeyHash ValidatorManagerImpl::get_validator(ShardIdFull shard, td::Ref<ValidatorSet> val_set) {
2681
if (!opts_->need_validate(shard, val_set->get_catchain_seqno())) {
2682
return PublicKeyHash::zero();
2684
for (auto &key : temp_keys_) {
2685
if (val_set->is_validator(key.bits256_value())) {
2689
return PublicKeyHash::zero();
2692
void ValidatorManagerImpl::got_next_key_blocks(std::vector<BlockIdExt> r) {
2693
if (r.size() == 0) {
2694
delay_action([SelfId = actor_id(
2695
this)]() { td::actor::send_closure(SelfId, &ValidatorManagerImpl::send_peek_key_block_request); },
2696
td::Timestamp::in(2.0 + td::Random::fast(0, 100) * 0.01));
2699
auto block_id = *r.rbegin();
2700
if (block_id.seqno() <= last_known_key_block_handle_->id().seqno()) {
2701
delay_action([SelfId = actor_id(
2702
this)]() { td::actor::send_closure(SelfId, &ValidatorManagerImpl::send_peek_key_block_request); },
2703
td::Timestamp::in(2.0 + td::Random::fast(0, 100) * 0.01));
2707
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
2709
td::actor::send_closure(SelfId, &ValidatorManagerImpl::update_last_known_key_block, R.move_as_ok(), true);
2711
get_block_handle(block_id, false, std::move(P));
2714
void ValidatorManagerImpl::update_last_known_key_block(BlockHandle handle, bool send_request) {
2715
if (last_known_key_block_handle_ && handle->id().seqno() > last_known_key_block_handle_->id().seqno()) {
2716
last_known_key_block_handle_ = std::move(handle);
2717
callback_->new_key_block(last_known_key_block_handle_);
2720
delay_action([SelfId = actor_id(
2721
this)]() { td::actor::send_closure(SelfId, &ValidatorManagerImpl::send_peek_key_block_request); },
2722
td::Timestamp::in(0.1 + td::Random::fast(0, 100) * 0.001));
2726
void ValidatorManagerImpl::send_peek_key_block_request() {
2727
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::vector<BlockIdExt>> R) {
2729
td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_key_blocks, std::vector<BlockIdExt>{});
2731
td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_key_blocks, R.move_as_ok());
2735
send_get_next_key_blocks_request(last_known_key_block_handle_->id(), 1, std::move(P));
2738
void ValidatorManagerImpl::prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise) {
2739
auto merger = StatsMerger::create(std::move(promise));
2741
std::vector<std::pair<std::string, std::string>> vec;
2742
vec.emplace_back("unixtime", td::to_string(static_cast<UnixTime>(td::Clocks::system())));
2743
if (last_masterchain_block_handle_) {
2744
vec.emplace_back("masterchainblock", last_masterchain_block_id_.to_str());
2745
vec.emplace_back("masterchainblocktime", td::to_string(last_masterchain_block_handle_->unix_time()));
2746
vec.emplace_back("gcmasterchainblock", gc_masterchain_handle_->id().to_str());
2747
vec.emplace_back("keymasterchainblock", last_key_block_handle_->id().to_str());
2748
vec.emplace_back("knownkeymasterchainblock", last_known_key_block_handle_->id().to_str());
2749
vec.emplace_back("rotatemasterchainblock", last_rotate_block_id_.to_str());
2750
//vec.emplace_back("shardclientmasterchainseqno", td::to_string(min_confirmed_masterchain_seqno_));
2751
vec.emplace_back("stateserializermasterchainseqno", td::to_string(state_serializer_masterchain_seqno_));
2754
if (!shard_client_.empty()) {
2755
auto P = td::PromiseCreator::lambda([promise = merger.make_promise("")](td::Result<BlockSeqno> R) mutable {
2757
promise.set_error(R.move_as_error());
2760
std::vector<std::pair<std::string, std::string>> vec;
2761
vec.emplace_back("shardclientmasterchainseqno", td::to_string(R.move_as_ok()));
2762
promise.set_value(std::move(vec));
2764
td::actor::send_closure(shard_client_, &ShardClient::get_processed_masterchain_block, std::move(P));
2767
merger.make_promise("").set_value(std::move(vec));
2769
td::actor::send_closure(db_, &Db::prepare_stats, merger.make_promise("db."));
2772
void ValidatorManagerImpl::prepare_perf_timer_stats(td::Promise<std::vector<PerfTimerStats>> promise) {
2773
promise.set_value(std::vector<PerfTimerStats>(perf_timer_stats));
2776
void ValidatorManagerImpl::add_perf_timer_stat(std::string name, double duration) {
2777
for (auto &s : perf_timer_stats) {
2778
if (s.name == name) {
2779
double now = td::Time::now();
2780
while (!s.stats.empty() && s.stats.front().first < now - 3600.0) {
2781
s.stats.pop_front();
2783
s.stats.push_back({td::Time::now(), duration});
2787
perf_timer_stats.push_back({name, {{td::Time::now(), duration}}});
2790
void ValidatorManagerImpl::truncate(BlockSeqno seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise) {
2791
td::actor::send_closure(db_, &Db::truncate, seqno, std::move(handle), std::move(promise));
2794
void ValidatorManagerImpl::wait_shard_client_state(BlockSeqno seqno, td::Timestamp timeout,
2795
td::Promise<td::Unit> promise) {
2796
if (seqno <= min_confirmed_masterchain_seqno_) {
2797
promise.set_value(td::Unit());
2800
if (timeout.is_in_past()) {
2801
promise.set_error(td::Status::Error(ErrorCode::timeout, "timeout"));
2804
if (seqno > min_confirmed_masterchain_seqno_ + 100) {
2805
promise.set_error(td::Status::Error(ErrorCode::notready, "too big masterchain block seqno"));
2809
shard_client_waiters_[seqno].waiting_.emplace_back(timeout, 0, std::move(promise));
2812
void ValidatorManagerImpl::log_validator_session_stats(BlockIdExt block_id,
2813
validatorsession::ValidatorSessionStats stats) {
2814
std::string fname = opts_->get_session_logs_file();
2815
if (fname.empty()) {
2819
std::vector<tl_object_ptr<ton_api::validatorSession_statsRound>> rounds;
2820
for (const auto &round : stats.rounds) {
2821
std::vector<tl_object_ptr<ton_api::validatorSession_statsProducer>> producers;
2822
for (const auto &producer : round.producers) {
2823
producers.push_back(create_tl_object<ton_api::validatorSession_statsProducer>(
2824
producer.id.bits256_value(), producer.candidate_id, producer.block_status, producer.comment,
2825
producer.block_timestamp, producer.is_accepted, producer.is_ours, producer.got_submit_at,
2826
producer.collation_time, producer.collated_at, producer.collation_cached, producer.validation_time,
2827
producer.validated_at, producer.validation_cached, producer.gen_utime, producer.approved_weight,
2828
producer.approved_33pct_at, producer.approved_66pct_at, producer.signed_weight, producer.signed_33pct_at,
2829
producer.signed_66pct_at, producer.serialize_time, producer.deserialize_time, producer.serialized_size));
2831
rounds.push_back(create_tl_object<ton_api::validatorSession_statsRound>(round.timestamp, std::move(producers)));
2834
auto obj = create_tl_object<ton_api::validatorSession_stats>(
2835
stats.success, create_tl_block_id(block_id), stats.timestamp, stats.self.bits256_value(), stats.session_id,
2836
stats.cc_seqno, stats.creator.bits256_value(), stats.total_validators, stats.total_weight, stats.signatures,
2837
stats.signatures_weight, stats.approve_signatures, stats.approve_signatures_weight, stats.first_round,
2839
auto s = td::json_encode<std::string>(td::ToJson(*obj.get()), false);
2840
s.erase(std::remove_if(s.begin(), s.end(), [](char c) { return c == '\n' || c == '\r'; }), s.end());
2843
file.open(fname, std::ios_base::app);
2847
LOG(INFO) << "Writing validator session stats for " << block_id.id.to_str();
2850
void ValidatorManagerImpl::log_new_validator_group_stats(validatorsession::NewValidatorGroupStats stats) {
2851
std::string fname = opts_->get_session_logs_file();
2852
if (fname.empty()) {
2855
std::vector<tl_object_ptr<ton_api::validatorSession_newValidatorGroupStats_node>> nodes;
2856
for (const auto &node : stats.nodes) {
2858
create_tl_object<ton_api::validatorSession_newValidatorGroupStats_node>(node.id.bits256_value(), node.weight));
2860
auto obj = create_tl_object<ton_api::validatorSession_newValidatorGroupStats>(
2861
stats.session_id, stats.shard.workchain, stats.shard.shard, stats.cc_seqno, stats.timestamp, stats.self_idx,
2863
auto s = td::json_encode<std::string>(td::ToJson(*obj.get()), false);
2864
s.erase(std::remove_if(s.begin(), s.end(), [](char c) { return c == '\n' || c == '\r'; }), s.end());
2867
file.open(fname, std::ios_base::app);
2871
LOG(INFO) << "Writing new validator group stats for " << stats.shard.to_str();
2874
void ValidatorManagerImpl::get_block_handle_for_litequery(BlockIdExt block_id, td::Promise<ConstBlockHandle> promise) {
2875
get_block_handle(block_id, false,
2876
[SelfId = actor_id(this), block_id, promise = std::move(promise),
2877
allow_not_applied = opts_->nonfinal_ls_queries_enabled()](td::Result<BlockHandle> R) mutable {
2878
if (R.is_ok() && (allow_not_applied || R.ok()->is_applied())) {
2879
promise.set_value(R.move_as_ok());
2881
td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_block_handle_for_litequery_error,
2882
block_id, std::move(R), std::move(promise));
2887
void ValidatorManagerImpl::get_block_data_for_litequery(BlockIdExt block_id, td::Promise<td::Ref<BlockData>> promise) {
2888
if (candidates_buffer_.empty()) {
2889
get_block_handle_for_litequery(
2890
block_id, [manager = actor_id(this), promise = std::move(promise)](td::Result<ConstBlockHandle> R) mutable {
2891
TRY_RESULT_PROMISE(promise, handle, std::move(R));
2892
td::actor::send_closure_later(manager, &ValidatorManager::get_block_data_from_db, std::move(handle),
2893
std::move(promise));
2896
td::actor::send_closure(
2897
candidates_buffer_, &CandidatesBuffer::get_block_data, block_id,
2898
[manager = actor_id(this), promise = std::move(promise), block_id](td::Result<td::Ref<BlockData>> R) mutable {
2900
promise.set_result(R.move_as_ok());
2903
td::actor::send_closure(manager, &ValidatorManagerImpl::get_block_handle_for_litequery, block_id,
2904
[manager, promise = std::move(promise)](td::Result<ConstBlockHandle> R) mutable {
2905
TRY_RESULT_PROMISE(promise, handle, std::move(R));
2906
td::actor::send_closure_later(manager, &ValidatorManager::get_block_data_from_db,
2907
std::move(handle), std::move(promise));
2913
void ValidatorManagerImpl::get_block_state_for_litequery(BlockIdExt block_id,
2914
td::Promise<td::Ref<ShardState>> promise) {
2915
if (candidates_buffer_.empty()) {
2916
get_block_handle_for_litequery(
2917
block_id, [manager = actor_id(this), promise = std::move(promise)](td::Result<ConstBlockHandle> R) mutable {
2918
TRY_RESULT_PROMISE(promise, handle, std::move(R));
2919
td::actor::send_closure_later(manager, &ValidatorManager::get_shard_state_from_db, std::move(handle),
2920
std::move(promise));
2923
td::actor::send_closure(
2924
candidates_buffer_, &CandidatesBuffer::get_block_state, block_id,
2925
[manager = actor_id(this), promise = std::move(promise), block_id](td::Result<td::Ref<ShardState>> R) mutable {
2927
promise.set_result(R.move_as_ok());
2930
td::actor::send_closure(manager, &ValidatorManagerImpl::get_block_handle_for_litequery,
2931
block_id, [manager, promise = std::move(promise)](td::Result<ConstBlockHandle> R) mutable {
2932
TRY_RESULT_PROMISE(promise, handle, std::move(R));
2933
td::actor::send_closure_later(manager, &ValidatorManager::get_shard_state_from_db, std::move(handle),
2934
std::move(promise));
2940
void ValidatorManagerImpl::get_block_by_lt_for_litequery(AccountIdPrefixFull account, LogicalTime lt,
2941
td::Promise<ConstBlockHandle> promise) {
2942
get_block_by_lt_from_db(
2943
account, lt, [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result<ConstBlockHandle> R) mutable {
2944
if (R.is_ok() && R.ok()->is_applied()) {
2945
promise.set_value(R.move_as_ok());
2947
td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_lookup_block_for_litequery_error, account, 0,
2948
lt, std::move(R), std::move(promise));
2953
void ValidatorManagerImpl::get_block_by_unix_time_for_litequery(AccountIdPrefixFull account, UnixTime ts,
2954
td::Promise<ConstBlockHandle> promise) {
2955
get_block_by_unix_time_from_db(
2956
account, ts, [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result<ConstBlockHandle> R) mutable {
2957
if (R.is_ok() && R.ok()->is_applied()) {
2958
promise.set_value(R.move_as_ok());
2960
td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_lookup_block_for_litequery_error, account, 1,
2961
ts, std::move(R), std::move(promise));
2966
void ValidatorManagerImpl::get_block_by_seqno_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno,
2967
td::Promise<ConstBlockHandle> promise) {
2968
get_block_by_seqno_from_db(
2970
[=, SelfId = actor_id(this), promise = std::move(promise)](td::Result<ConstBlockHandle> R) mutable {
2971
if (R.is_ok() && R.ok()->is_applied()) {
2972
promise.set_value(R.move_as_ok());
2974
td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_lookup_block_for_litequery_error, account, 2,
2975
seqno, std::move(R), std::move(promise));
2980
void ValidatorManagerImpl::process_block_handle_for_litequery_error(BlockIdExt block_id,
2981
td::Result<BlockHandle> r_handle,
2982
td::Promise<ConstBlockHandle> promise) {
2984
if (r_handle.is_error()) {
2985
err = r_handle.move_as_error();
2987
auto handle = r_handle.move_as_ok();
2988
if (handle->is_applied()) {
2989
promise.set_value(std::move(handle));
2992
if (!handle->received() || !handle->received_state()) {
2993
err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << block_id.id.to_str() << " is not in db");
2995
err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << block_id.id.to_str() << " is not applied");
2998
if (block_id.is_masterchain()) {
2999
if (block_id.seqno() > last_masterchain_seqno_) {
3000
err = err.move_as_error_suffix(PSTRING() << " (last known masterchain block: " << last_masterchain_seqno_ << ")");
3003
for (auto &shard : shard_client_shards_) {
3004
if (shard_intersects(shard->shard(), block_id.shard_full())) {
3005
if (block_id.seqno() > shard->top_block_id().seqno()) {
3006
err = err.move_as_error_suffix(
3007
PSTRING() << " (possibly out of sync: shard_client_seqno="
3008
<< (shard_client_handle_ ? shard_client_handle_->id().seqno() : 0) << " ls_seqno="
3009
<< (last_liteserver_state_.not_null() ? last_liteserver_state_->get_seqno() : 0) << ")");
3015
promise.set_error(std::move(err));
3018
void ValidatorManagerImpl::process_lookup_block_for_litequery_error(AccountIdPrefixFull account, int type,
3020
td::Result<ConstBlockHandle> r_handle,
3021
td::Promise<ConstBlockHandle> promise) {
3023
if (r_handle.is_error()) {
3024
err = r_handle.move_as_error();
3026
auto handle = r_handle.move_as_ok();
3027
if (handle->is_applied()) {
3028
promise.set_value(std::move(handle));
3031
if (!handle->received() || !handle->received_state()) {
3032
err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << handle->id().to_str() << " is not in db");
3034
err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << handle->id().to_str() << " is not applied");
3037
if (account.is_masterchain()) {
3038
if (value > (type == 0
3039
? last_masterchain_state_->get_logical_time()
3040
: (type == 1 ? last_masterchain_state_->get_unix_time() : last_masterchain_state_->get_seqno()))) {
3041
err = err.move_as_error_suffix(PSTRING() << " (last known masterchain block: " << last_masterchain_seqno_ << ")");
3044
for (auto &shard : shard_client_shards_) {
3045
if (shard_intersects(shard->shard(), account.as_leaf_shard())) {
3046
if (value > (type == 0 ? shard->end_lt()
3047
: (type == 1 ? (shard_client_handle_ ? shard_client_handle_->unix_time() : 0)
3048
: shard->top_block_id().seqno()))) {
3049
err = err.move_as_error_suffix(
3050
PSTRING() << " (possibly out of sync: shard_client_seqno="
3051
<< (shard_client_handle_ ? shard_client_handle_->id().seqno() : 0) << " ls_seqno="
3052
<< (last_liteserver_state_.not_null() ? last_liteserver_state_->get_seqno() : 0) << ")");
3058
static std::string names[3] = {"lt", "utime", "seqno"};
3059
err = err.move_as_error_prefix(PSTRING() << "cannot find block " << account.to_str() << " " << names[type] << "="
3061
promise.set_error(std::move(err));
3064
void ValidatorManagerImpl::get_block_candidate_for_litequery(PublicKey source, BlockIdExt block_id,
3065
FileHash collated_data_hash,
3066
td::Promise<BlockCandidate> promise) {
3067
if (!opts_->nonfinal_ls_queries_enabled()) {
3068
promise.set_error(td::Status::Error("query is not allowed"));
3071
get_block_candidate_from_db(source, block_id, collated_data_hash, std::move(promise));
3074
void ValidatorManagerImpl::get_validator_groups_info_for_litequery(
3075
td::optional<ShardIdFull> shard,
3076
td::Promise<tl_object_ptr<lite_api::liteServer_nonfinal_validatorGroups>> promise) {
3077
if (!opts_->nonfinal_ls_queries_enabled()) {
3078
promise.set_error(td::Status::Error("query is not allowed"));
3081
class Actor : public td::actor::Actor {
3083
explicit Actor(std::vector<td::actor::ActorId<ValidatorGroup>> groups,
3084
td::Promise<tl_object_ptr<lite_api::liteServer_nonfinal_validatorGroups>> promise)
3085
: groups_(std::move(groups)), promise_(std::move(promise)) {
3088
void start_up() override {
3089
pending_ = groups_.size();
3090
if (pending_ == 0) {
3091
promise_.set_result(std::move(result_));
3095
for (auto &x : groups_) {
3096
td::actor::send_closure(
3097
x, &ValidatorGroup::get_validator_group_info_for_litequery,
3098
[SelfId = actor_id(this)](td::Result<tl_object_ptr<lite_api::liteServer_nonfinal_validatorGroupInfo>> R) {
3099
td::actor::send_closure(SelfId, &Actor::on_result, R.is_ok() ? R.move_as_ok() : nullptr);
3104
void on_result(tl_object_ptr<lite_api::liteServer_nonfinal_validatorGroupInfo> r) {
3106
result_->groups_.push_back(std::move(r));
3109
if (pending_ == 0) {
3110
promise_.set_result(std::move(result_));
3116
std::vector<td::actor::ActorId<ValidatorGroup>> groups_;
3118
td::Promise<tl_object_ptr<lite_api::liteServer_nonfinal_validatorGroups>> promise_;
3119
tl_object_ptr<lite_api::liteServer_nonfinal_validatorGroups> result_ =
3120
create_tl_object<lite_api::liteServer_nonfinal_validatorGroups>();
3122
std::vector<td::actor::ActorId<ValidatorGroup>> groups;
3123
for (auto &x : validator_groups_) {
3124
if (x.second.actor.empty()) {
3127
if (shard && shard.value() != x.second.shard) {
3130
groups.push_back(x.second.actor.get());
3132
td::actor::create_actor<Actor>("get-validator-groups-info", std::move(groups), std::move(promise)).release();
3135
void ValidatorManagerImpl::update_options(td::Ref<ValidatorManagerOptions> opts) {
3136
// Currently options can be updated only to change state_serializer_enabled flag
3137
if (!serializer_.empty()) {
3138
td::actor::send_closure(serializer_, &AsyncStateSerializer::update_options, opts);
3140
opts_ = std::move(opts);
3143
td::actor::ActorOwn<ValidatorManagerInterface> ValidatorManagerFactory::create(
3144
td::Ref<ValidatorManagerOptions> opts, std::string db_root, td::actor::ActorId<keyring::Keyring> keyring,
3145
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
3146
td::actor::ActorId<overlay::Overlays> overlays) {
3147
return td::actor::create_actor<validator::ValidatorManagerImpl>("manager", std::move(opts), db_root, keyring, adnl,
3151
size_t ValidatorManagerImpl::CheckedExtMsgCounter::get_msg_count(WorkchainId wc, StdSmcAddress addr) {
3153
auto it1 = counter_cur_.find({wc, addr});
3154
auto it2 = counter_prev_.find({wc, addr});
3155
return (it1 == counter_cur_.end() ? 0 : it1->second) + (it2 == counter_prev_.end() ? 0 : it2->second);
3157
size_t ValidatorManagerImpl::CheckedExtMsgCounter::inc_msg_count(WorkchainId wc, StdSmcAddress addr) {
3159
auto it2 = counter_prev_.find({wc, addr});
3160
return (it2 == counter_prev_.end() ? 0 : it2->second) + ++counter_cur_[{wc, addr}];
3162
void ValidatorManagerImpl::CheckedExtMsgCounter::before_query() {
3163
while (cleanup_at_.is_in_past()) {
3164
counter_prev_ = std::move(counter_cur_);
3165
counter_cur_.clear();
3166
if (counter_prev_.empty()) {
3167
cleanup_at_ = td::Timestamp::in(max_ext_msg_per_addr_time_window() / 2.0);
3170
cleanup_at_ += max_ext_msg_per_addr_time_window() / 2.0;
3174
} // namespace validator