Ton

Форк
0
/
queue-size-counter.cpp 
301 строка · 12.0 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16
*/
17
#include "queue-size-counter.hpp"
18
#include "block/block-auto.h"
19
#include "block/block-parse.h"
20
#include "common/delay.h"
21
#include "td/actor/MultiPromise.h"
22
#include "td/utils/Random.h"
23

24
namespace ton::validator {
25

26
static td::Result<td::uint32> calc_queue_size(const td::Ref<ShardState> &state) {
27
  td::uint32 size = 0;
28
  TRY_RESULT(outq_descr, state->message_queue());
29
  block::gen::OutMsgQueueInfo::Record qinfo;
30
  if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
31
    return td::Status::Error("invalid message queue");
32
  }
33
  vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
34
  bool ok = queue.check_for_each([&](td::Ref<vm::CellSlice>, td::ConstBitPtr, int) -> bool {
35
    ++size;
36
    return true;
37
  });
38
  if (!ok) {
39
    return td::Status::Error("invalid message queue dict");
40
  }
41
  return size;
42
}
43

44
static td::Result<td::uint32> recalc_queue_size(const td::Ref<ShardState> &state, const td::Ref<ShardState> &prev_state,
45
                                                td::uint32 prev_size) {
46
  TRY_RESULT(outq_descr, state->message_queue());
47
  block::gen::OutMsgQueueInfo::Record qinfo;
48
  if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
49
    return td::Status::Error("invalid message queue");
50
  }
51
  vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
52

53
  TRY_RESULT(prev_outq_descr, prev_state->message_queue());
54
  block::gen::OutMsgQueueInfo::Record prev_qinfo;
55
  if (!tlb::unpack_cell(prev_outq_descr->root_cell(), prev_qinfo)) {
56
    return td::Status::Error("invalid message queue");
57
  }
58
  vm::AugmentedDictionary prev_queue{prev_qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
59
  td::uint32 add = 0, rem = 0;
60
  bool ok = prev_queue.scan_diff(
61
      queue, [&](td::ConstBitPtr, int, td::Ref<vm::CellSlice> prev_val, td::Ref<vm::CellSlice> new_val) -> bool {
62
        if (prev_val.not_null()) {
63
          ++rem;
64
        }
65
        if (new_val.not_null()) {
66
          ++add;
67
        }
68
        return true;
69
      });
70
  if (!ok) {
71
    return td::Status::Error("invalid message queue dict");
72
  }
73
  if (prev_size + add < rem) {
74
    return td::Status::Error("negative value");
75
  }
76
  return prev_size + add - rem;
77
}
78

79
void QueueSizeCounter::start_up() {
80
  if (init_masterchain_state_.is_null()) {
81
    // Used in manager-hardfork or manager-disk
82
    simple_mode_ = true;
83
    return;
84
  }
85
  current_seqno_ = init_masterchain_state_->get_seqno();
86
  process_top_shard_blocks_cont(init_masterchain_state_, true);
87
  init_masterchain_state_ = {};
88
  alarm();
89
}
90

91
void QueueSizeCounter::get_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) {
92
  get_queue_size_ex(block_id, simple_mode_ || is_block_too_old(block_id), std::move(promise));
93
}
94

95
void QueueSizeCounter::get_queue_size_ex(ton::BlockIdExt block_id, bool calc_whole, td::Promise<td::uint32> promise) {
96
  Entry &entry = results_[block_id];
97
  if (entry.done_) {
98
    promise.set_result(entry.queue_size_);
99
    return;
100
  }
101
  entry.promises_.push_back(std::move(promise));
102
  if (entry.started_) {
103
    return;
104
  }
105
  entry.started_ = true;
106
  entry.calc_whole_ = calc_whole;
107
  td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, block_id, true,
108
                          [SelfId = actor_id(this), block_id, manager = manager_](td::Result<BlockHandle> R) mutable {
109
                            if (R.is_error()) {
110
                              td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, block_id, R.move_as_error());
111
                              return;
112
                            }
113
                            BlockHandle handle = R.move_as_ok();
114
                            td::actor::send_closure(
115
                                manager, &ValidatorManager::wait_block_state, handle, 0, td::Timestamp::in(10.0),
116
                                [SelfId, handle](td::Result<td::Ref<ShardState>> R) mutable {
117
                                  if (R.is_error()) {
118
                                    td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, handle->id(),
119
                                                            R.move_as_error());
120
                                    return;
121
                                  }
122
                                  td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont,
123
                                                          std::move(handle), R.move_as_ok());
124
                                });
125
                          });
126
}
127

128
void QueueSizeCounter::get_queue_size_cont(BlockHandle handle, td::Ref<ShardState> state) {
129
  Entry &entry = results_[handle->id()];
130
  CHECK(entry.started_);
131
  bool calc_whole = entry.calc_whole_ || handle->id().seqno() == 0;
132
  if (!calc_whole) {
133
    CHECK(handle->inited_prev());
134
    auto prev_blocks = handle->prev();
135
    bool after_split = prev_blocks.size() == 1 && handle->id().shard_full() != prev_blocks[0].shard_full();
136
    bool after_merge = prev_blocks.size() == 2;
137
    calc_whole = after_split || after_merge;
138
  }
139
  if (calc_whole) {
140
    auto r_size = calc_queue_size(state);
141
    if (r_size.is_error()) {
142
      on_error(handle->id(), r_size.move_as_error());
143
      return;
144
    }
145
    entry.done_ = true;
146
    entry.queue_size_ = r_size.move_as_ok();
147
    for (auto &promise : entry.promises_) {
148
      promise.set_result(entry.queue_size_);
149
    }
150
    entry.promises_.clear();
151
    return;
152
  }
153

154
  auto prev_block_id = handle->one_prev(true);
155
  get_queue_size(prev_block_id, [=, SelfId = actor_id(this), manager = manager_](td::Result<td::uint32> R) {
156
    if (R.is_error()) {
157
      td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error());
158
      return;
159
    }
160
    td::uint32 prev_size = R.move_as_ok();
161
    td::actor::send_closure(
162
        manager, &ValidatorManager::wait_block_state_short, prev_block_id, 0, td::Timestamp::in(10.0),
163
        [=](td::Result<td::Ref<ShardState>> R) {
164
          if (R.is_error()) {
165
            td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error());
166
            return;
167
          }
168
          td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont2, state, R.move_as_ok(), prev_size);
169
        });
170
  });
171
}
172

173
void QueueSizeCounter::get_queue_size_cont2(td::Ref<ShardState> state, td::Ref<ShardState> prev_state,
174
                                            td::uint32 prev_size) {
175
  BlockIdExt block_id = state->get_block_id();
176
  Entry &entry = results_[block_id];
177
  CHECK(entry.started_);
178
  auto r_size = recalc_queue_size(state, prev_state, prev_size);
179
  if (r_size.is_error()) {
180
    on_error(block_id, r_size.move_as_error());
181
    return;
182
  }
183
  entry.done_ = true;
184
  entry.queue_size_ = r_size.move_as_ok();
185
  for (auto &promise : entry.promises_) {
186
    promise.set_result(entry.queue_size_);
187
  }
188
  entry.promises_.clear();
189
}
190

191
void QueueSizeCounter::on_error(ton::BlockIdExt block_id, td::Status error) {
192
  auto it = results_.find(block_id);
193
  if (it == results_.end()) {
194
    return;
195
  }
196
  Entry &entry = it->second;
197
  CHECK(!entry.done_);
198
  for (auto &promise : entry.promises_) {
199
    promise.set_error(error.clone());
200
  }
201
  results_.erase(it);
202
}
203

204
void QueueSizeCounter::process_top_shard_blocks() {
205
  LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks seqno=" << current_seqno_;
206
  td::actor::send_closure(
207
      manager_, &ValidatorManager::get_block_by_seqno_from_db, AccountIdPrefixFull{masterchainId, 0}, current_seqno_,
208
      [SelfId = actor_id(this), manager = manager_](td::Result<ConstBlockHandle> R) {
209
        if (R.is_error()) {
210
          LOG(WARNING) << "Failed to get masterchain block id: " << R.move_as_error();
211
          delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); },
212
                       td::Timestamp::in(5.0));
213
          return;
214
        }
215
        td::actor::send_closure(
216
            manager, &ValidatorManager::wait_block_state_short, R.ok()->id(), 0, td::Timestamp::in(10.0),
217
            [=](td::Result<td::Ref<ShardState>> R) {
218
              if (R.is_error()) {
219
                LOG(WARNING) << "Failed to get masterchain state: " << R.move_as_error();
220
                delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); },
221
                             td::Timestamp::in(5.0));
222
                return;
223
              }
224
              td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_cont,
225
                                      td::Ref<MasterchainState>(R.move_as_ok()), false);
226
            });
227
      });
228
}
229

230
void QueueSizeCounter::process_top_shard_blocks_cont(td::Ref<MasterchainState> state, bool init) {
231
  LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks_cont seqno=" << current_seqno_ << " init=" << init;
232
  td::MultiPromise mp;
233
  auto ig = mp.init_guard();
234
  last_top_blocks_.clear();
235
  last_top_blocks_.push_back(state->get_block_id());
236
  for (auto &shard : state->get_shards()) {
237
    last_top_blocks_.push_back(shard->top_block_id());
238
  }
239
  for (const BlockIdExt &block_id : last_top_blocks_) {
240
    get_queue_size_ex_retry(block_id, init, ig.get_promise());
241
  }
242
  ig.add_promise([SelfId = actor_id(this)](td::Result<td::Unit> R) {
243
    if (R.is_error()) {
244
      return;
245
    }
246
    td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_finish);
247
  });
248
  if (init) {
249
    init_top_blocks_ = last_top_blocks_;
250
  }
251
}
252

253
void QueueSizeCounter::get_queue_size_ex_retry(BlockIdExt block_id, bool calc_whole, td::Promise<td::Unit> promise) {
254
  get_queue_size_ex(block_id, calc_whole,
255
                    [=, promise = std::move(promise), SelfId = actor_id(this)](td::Result<td::uint32> R) mutable {
256
                      if (R.is_error()) {
257
                        LOG(WARNING) << "Failed to calculate queue size for block " << block_id.to_str() << ": "
258
                                     << R.move_as_error();
259
                        delay_action(
260
                            [=, promise = std::move(promise)]() mutable {
261
                              td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_ex_retry, block_id,
262
                                                      calc_whole, std::move(promise));
263
                            },
264
                            td::Timestamp::in(5.0));
265
                        return;
266
                      }
267
                      promise.set_result(td::Unit());
268
                    });
269
}
270

271
void QueueSizeCounter::process_top_shard_blocks_finish() {
272
  ++current_seqno_;
273
  wait_shard_client();
274
}
275

276
void QueueSizeCounter::wait_shard_client() {
277
  LOG(DEBUG) << "QueueSizeCounter::wait_shard_client seqno=" << current_seqno_;
278
  td::actor::send_closure(
279
      manager_, &ValidatorManager::wait_shard_client_state, current_seqno_, td::Timestamp::in(60.0),
280
      [SelfId = actor_id(this)](td::Result<td::Unit> R) {
281
        if (R.is_error()) {
282
          delay_action([=]() mutable { td::actor::send_closure(SelfId, &QueueSizeCounter::wait_shard_client); },
283
                       td::Timestamp::in(5.0));
284
          return;
285
        }
286
        td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks);
287
      });
288
}
289

290
void QueueSizeCounter::alarm() {
291
  for (auto it = results_.begin(); it != results_.end();) {
292
    if (it->second.done_ && is_block_too_old(it->first)) {
293
      it = results_.erase(it);
294
    } else {
295
      ++it;
296
    }
297
  }
298
  alarm_timestamp() = td::Timestamp::in(td::Random::fast(20.0, 40.0));
299
}
300

301
}  // namespace ton::validator

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.