Ton

Форк
0
/
PeerActor.cpp 
534 строки · 18.5 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19

20
#include "PeerActor.h"
21
#include "auto/tl/ton_api.hpp"
22

23
#include "tl-utils/tl-utils.hpp"
24

25
#include "td/utils/overloaded.h"
26
#include "td/utils/Random.h"
27
#include "vm/boc.h"
28
#include "common/delay.h"
29

30
namespace ton {
31

32
PeerState::State from_ton_api(const ton::ton_api::storage_state &state) {
33
  PeerState::State res;
34
  res.want_download = state.want_download_;
35
  res.will_upload = state.will_upload_;
36
  return res;
37
}
38

39
ton::ton_api::object_ptr<ton::ton_api::storage_state> to_ton_api(const PeerState::State &state) {
40
  return ton::ton_api::make_object<ton::ton_api::storage_state>(state.will_upload, state.want_download);
41
}
42

43
PeerActor::PeerActor(td::unique_ptr<Callback> callback, std::shared_ptr<PeerState> state)
44
    : callback_(std::move(callback)), state_(std::move(state)) {
45
  CHECK(callback_);
46
}
47

48
template <class T, class... ArgsT>
49
td::uint64 PeerActor::create_and_send_query(ArgsT &&...args) {
50
  return send_query(ton::create_serialize_tl_object<T>(std::forward<ArgsT>(args)...));
51
}
52

53
td::uint64 PeerActor::send_query(td::BufferSlice query) {
54
  auto query_id = next_query_id_++;
55
  callback_->send_query(query_id, std::move(query));
56
  return query_id;
57
}
58

59
void PeerActor::schedule_loop() {
60
  yield();
61
}
62

63
void PeerActor::notify_node() {
64
  need_notify_node_ = true;
65
}
66

67
void PeerActor::execute_query(td::BufferSlice query, td::Promise<td::BufferSlice> promise) {
68
  on_pong();
69
  TRY_RESULT_PROMISE(promise, f, ton::fetch_tl_object<ton::ton_api::Function>(std::move(query), true));
70
  ton::ton_api::downcast_call(
71
      *f, td::overloaded(
72
              [&](ton::ton_api::storage_ping &ping) {
73
                execute_ping(static_cast<td::uint64>(ping.session_id_), std::move(promise));
74
              },
75
              [&](ton::ton_api::storage_addUpdate &add_update) { execute_add_update(add_update, std::move(promise)); },
76
              [&](ton::ton_api::storage_getPiece &get_piece) { execute_get_piece(get_piece, std::move(promise)); },
77
              [&](ton::ton_api::storage_getTorrentInfo &) { execute_get_torrent_info(std::move(promise)); },
78
              [&](auto &other) { promise.set_error(td::Status::Error("Unknown function")); }));
79
  schedule_loop();
80
}
81

82
void PeerActor::on_ping_result(td::Result<td::BufferSlice> r_answer) {
83
  ping_query_id_ = {};
84
  if (r_answer.is_ok()) {
85
    on_pong();
86
  }
87
}
88

89
void PeerActor::on_pong() {
90
  wait_pong_till_ = td::Timestamp::in(10);
91
  if (!state_->peer_online_.exchange(true)) {
92
    state_->peer_online_set_ = true;
93
  }
94
  notify_node();
95
}
96

97
void PeerActor::on_update_result(td::Result<td::BufferSlice> r_answer) {
98
  update_query_id_ = {};
99
  if (r_answer.is_ok()) {
100
    if (!peer_is_inited_) {
101
      peer_init_offset_ += UPDATE_INIT_BLOCK_SIZE;
102
      if (peer_init_offset_ >= have_pieces_.as_slice().size()) {
103
        peer_is_inited_ = true;
104
      }
105
    }
106
  } else {
107
    have_pieces_list_.insert(have_pieces_list_.end(), sent_have_pieces_list_.begin(), sent_have_pieces_list_.end());
108
  }
109
  sent_have_pieces_list_.clear();
110
}
111

112
void PeerActor::on_get_piece_result(PartId piece_id, td::Result<td::BufferSlice> r_answer) {
113
  //TODO: handle errors ???
114
  auto res = [&]() -> td::Result<PeerState::Part> {
115
    TRY_RESULT(slice, std::move(r_answer));
116
    TRY_RESULT(piece, ton::fetch_result<ton::ton_api::storage_getPiece>(slice.as_slice()));
117
    PeerState::Part res;
118
    res.data = std::move(piece->data_);
119
    res.proof = std::move(piece->proof_);
120
    return std::move(res);
121
  }();
122
  if (res.is_error()) {
123
    LOG(DEBUG) << "getPiece " << piece_id << " query: " << res.error();
124
  } else {
125
    LOG(DEBUG) << "getPiece " << piece_id << " query: OK";
126
  }
127
  state_->node_queries_results_.add_element(std::make_pair(piece_id, std::move(res)));
128
  notify_node();
129
}
130

131
void PeerActor::on_update_state_result(td::Result<td::BufferSlice> r_answer) {
132
  if (r_answer.is_error()) {
133
    update_state_query_.query_id = {};
134
  }
135
}
136

137
void PeerActor::on_get_info_result(td::Result<td::BufferSlice> r_answer) {
138
  get_info_query_id_ = {};
139
  next_get_info_at_ = td::Timestamp::in(5.0);
140
  alarm_timestamp().relax(next_get_info_at_);
141
  if (r_answer.is_error()) {
142
    LOG(DEBUG) << "getTorrentInfo query: " << r_answer.move_as_error();
143
    return;
144
  }
145
  auto R = fetch_tl_object<ton::ton_api::storage_torrentInfo>(r_answer.move_as_ok(), true);
146
  if (R.is_error()) {
147
    LOG(DEBUG) << "getTorrentInfo query: " << R.move_as_error();
148
    return;
149
  }
150
  td::BufferSlice data = std::move(R.ok_ref()->data_);
151
  LOG(DEBUG) << "getTorrentInfo query: got result (" << data.size() << " bytes)";
152
  if (!data.empty() && !state_->torrent_info_ready_) {
153
    state_->torrent_info_response_callback_(std::move(data));
154
  }
155
}
156

157
void PeerActor::on_query_result(td::uint64 query_id, td::Result<td::BufferSlice> r_answer) {
158
  if (r_answer.is_ok()) {
159
    on_pong();
160
  }
161
  if (ping_query_id_ && ping_query_id_.value() == query_id) {
162
    on_ping_result(std::move(r_answer));
163
  } else if (update_query_id_ && update_query_id_.value() == query_id) {
164
    on_update_result(std::move(r_answer));
165
  } else if (update_state_query_.query_id && update_state_query_.query_id.value() == query_id) {
166
    on_update_state_result(std::move(r_answer));
167
  } else if (get_info_query_id_ && get_info_query_id_.value() == query_id) {
168
    on_get_info_result(std::move(r_answer));
169
  } else {
170
    for (auto &query_it : node_get_piece_) {
171
      if (query_it.second.query_id && query_it.second.query_id.value() == query_id) {
172
        on_get_piece_result(query_it.first, std::move(r_answer));
173
        node_get_piece_.erase(query_it.first);
174
        break;
175
      }
176
    }
177
  }
178

179
  schedule_loop();
180
}
181

182
void PeerActor::start_up() {
183
  callback_->register_self(actor_id(this));
184

185
  node_session_id_ = td::Random::secure_uint64();
186

187
  state_->peer = actor_id(this);
188
  state_->peer_ready_ = true;
189

190
  notify_node();
191
  schedule_loop();
192
}
193

194
void PeerActor::loop() {
195
  loop_ping();
196
  loop_pong();
197

198
  loop_update_init();
199
  loop_update_state();
200
  loop_update_pieces();
201
  loop_get_torrent_info();
202

203
  loop_node_get_piece();
204
  loop_peer_get_piece();
205

206
  loop_notify_node();
207
}
208

209
void PeerActor::loop_pong() {
210
  if (wait_pong_till_ && wait_pong_till_.is_in_past()) {
211
    wait_pong_till_ = {};
212
    LOG(DEBUG) << "Disconnected from peer";
213
    state_->peer_online_ = false;
214
    notify_node();
215
  }
216
  alarm_timestamp().relax(wait_pong_till_);
217
}
218

219
void PeerActor::loop_ping() {
220
  if (ping_query_id_) {
221
    return;
222
  }
223
  if (!next_ping_at_.is_in_past()) {
224
    alarm_timestamp().relax(next_ping_at_);
225
    return;
226
  }
227

228
  next_ping_at_ = td::Timestamp::in(2);
229
  alarm_timestamp().relax(next_ping_at_);
230
  ping_query_id_ = create_and_send_query<ton::ton_api::storage_ping>(node_session_id_);
231
}
232

233
td::BufferSlice PeerActor::create_update_query(ton::tl_object_ptr<ton::ton_api::storage_Update> update) {
234
  auto session_id = static_cast<td::int64>(peer_session_id_.value());
235
  auto seqno = static_cast<td::int32>(++node_seqno_);
236
  return ton::create_serialize_tl_object<ton::ton_api::storage_addUpdate>(session_id, seqno, std::move(update));
237
}
238

239
void PeerActor::loop_update_init() {
240
  if (!peer_session_id_ || update_query_id_ || peer_is_inited_) {
241
    return;
242
  }
243

244
  update_have_pieces();
245

246
  auto node_state = state_->node_state_.load();
247
  auto s = have_pieces_.as_slice();
248
  if (s.size() <= peer_init_offset_) {
249
    peer_is_inited_ = true;
250
    return;
251
  }
252
  s = s.substr(peer_init_offset_, UPDATE_INIT_BLOCK_SIZE);
253
  auto query = create_update_query(ton::create_tl_object<ton::ton_api::storage_updateInit>(
254
      td::BufferSlice(s), (int)peer_init_offset_, to_ton_api(node_state)));
255

256
  // take care about update_state_query initial state
257
  update_state_query_.state = node_state;
258
  update_state_query_.query_id = 0;
259

260
  LOG(DEBUG) << "Sending updateInit query (" << update_state_query_.state.want_download << ", "
261
             << update_state_query_.state.will_upload << ", offset=" << peer_init_offset_ * 8 << ")";
262
  update_query_id_ = send_query(std::move(query));
263
}
264

265
void PeerActor::loop_update_state() {
266
  if (!peer_is_inited_) {
267
    return;
268
  }
269

270
  auto node_state = state_->node_state_.load();
271
  if (!(update_state_query_.state == node_state)) {
272
    update_state_query_.state = node_state;
273
    update_state_query_.query_id = {};
274
  }
275

276
  if (update_state_query_.query_id) {
277
    return;
278
  }
279

280
  auto query = create_update_query(
281
      ton::create_tl_object<ton::ton_api::storage_updateState>(to_ton_api(update_state_query_.state)));
282
  LOG(DEBUG) << "Sending updateState query (" << update_state_query_.state.want_download << ", "
283
             << update_state_query_.state.will_upload << ")";
284
  update_state_query_.query_id = send_query(std::move(query));
285
}
286

287
void PeerActor::update_have_pieces() {
288
  auto node_ready_parts = state_->node_ready_parts_.read();
289
  for (auto piece_id : node_ready_parts) {
290
    if (piece_id < (peer_init_offset_ + UPDATE_INIT_BLOCK_SIZE) * 8 && !have_pieces_.get(piece_id)) {
291
      have_pieces_list_.push_back(piece_id);
292
    }
293
    have_pieces_.set_one(piece_id);
294
  }
295
}
296

297
void PeerActor::loop_update_pieces() {
298
  if (update_query_id_ || !peer_is_inited_) {
299
    return;
300
  }
301

302
  update_have_pieces();
303

304
  if (!have_pieces_list_.empty()) {
305
    size_t count = std::min<size_t>(have_pieces_list_.size(), 1500);
306
    sent_have_pieces_list_.assign(have_pieces_list_.end() - count, have_pieces_list_.end());
307
    have_pieces_list_.erase(have_pieces_list_.end() - count, have_pieces_list_.end());
308
    auto query = create_update_query(ton::create_tl_object<ton::ton_api::storage_updateHavePieces>(
309
        td::transform(sent_have_pieces_list_, [](auto x) { return static_cast<td::int32>(x); })));
310
    LOG(DEBUG) << "Sending updateHavePieces query (" << sent_have_pieces_list_.size() << " pieces)";
311
    update_query_id_ = send_query(std::move(query));
312
  }
313
}
314

315
void PeerActor::loop_get_torrent_info() {
316
  if (state_->torrent_info_ready_) {
317
    if (!torrent_info_) {
318
      torrent_info_ = state_->torrent_info_;
319
      for (const auto &u : pending_update_peer_parts_) {
320
        process_update_peer_parts(u);
321
      }
322
      pending_update_peer_parts_.clear();
323
    }
324
    return;
325
  }
326
  if (get_info_query_id_) {
327
    return;
328
  }
329
  if (next_get_info_at_ && !next_get_info_at_.is_in_past()) {
330
    return;
331
  }
332
  LOG(DEBUG) << "Sending getTorrentInfo query";
333
  get_info_query_id_ = create_and_send_query<ton::ton_api::storage_getTorrentInfo>();
334
}
335

336
void PeerActor::loop_node_get_piece() {
337
  for (auto part : state_->node_queries_.read()) {
338
    if (state_->speed_limiters_.download.empty()) {
339
      node_get_piece_.emplace(part, NodePieceQuery{});
340
    } else {
341
      if (!torrent_info_) {
342
        CHECK(state_->torrent_info_ready_);
343
        loop_get_torrent_info();
344
      }
345
      auto piece_size =
346
          std::min<td::uint64>(torrent_info_->piece_size, torrent_info_->file_size - part * torrent_info_->piece_size);
347
      td::Timestamp timeout = td::Timestamp::in(3.0);
348
      td::actor::send_closure(
349
          state_->speed_limiters_.download, &SpeedLimiter::enqueue, (double)piece_size, timeout,
350
          [=, SelfId = actor_id(this)](td::Result<td::Unit> R) {
351
            if (R.is_ok()) {
352
              td::actor::send_closure(SelfId, &PeerActor::node_get_piece_query_ready, part, std::move(R));
353
            } else {
354
              delay_action(
355
                  [=, R = std::move(R)]() mutable {
356
                    td::actor::send_closure(SelfId, &PeerActor::node_get_piece_query_ready, part, std::move(R));
357
                  },
358
                  timeout);
359
            }
360
          });
361
    }
362
  }
363

364
  for (auto &query_it : node_get_piece_) {
365
    if (query_it.second.query_id) {
366
      continue;
367
    }
368

369
    LOG(DEBUG) << "Sending getPiece " << query_it.first << " query";
370
    query_it.second.query_id =
371
        create_and_send_query<ton::ton_api::storage_getPiece>(static_cast<td::int32>(query_it.first));
372
  }
373
}
374

375
void PeerActor::node_get_piece_query_ready(PartId part, td::Result<td::Unit> R) {
376
  if (R.is_error()) {
377
    on_get_piece_result(part, R.move_as_error());
378
  } else {
379
    node_get_piece_.emplace(part, NodePieceQuery{});
380
  }
381
  schedule_loop();
382
}
383

384
void PeerActor::loop_peer_get_piece() {
385
  // process answers
386
  for (auto &p : state_->peer_queries_results_.read()) {
387
    state_->peer_queries_active_.erase(p.first);
388
    auto promise_it = peer_get_piece_.find(p.first);
389
    if (promise_it == peer_get_piece_.end()) {
390
      continue;
391
    }
392
    td::Promise<PeerState::Part> promise =
393
        [i = p.first, promise = std::move(promise_it->second.promise)](td::Result<PeerState::Part> R) mutable {
394
          LOG(DEBUG) << "Responding to getPiece " << i << ": " << (R.is_ok() ? "OK" : R.error().to_string());
395
          promise.set_result(R.move_map([](PeerState::Part part) {
396
            return create_serialize_tl_object<ton_api::storage_piece>(std::move(part.proof), std::move(part.data));
397
          }));
398
        };
399
    if (p.second.is_error()) {
400
      promise.set_error(p.second.move_as_error());
401
    } else {
402
      auto part = p.second.move_as_ok();
403
      auto size = (double)part.data.size();
404
      td::Promise<td::Unit> P = promise.wrap([part = std::move(part)](td::Unit) mutable { return std::move(part); });
405
      if (state_->speed_limiters_.upload.empty()) {
406
        P.set_result(td::Unit());
407
      } else {
408
        td::actor::send_closure(state_->speed_limiters_.upload, &SpeedLimiter::enqueue, size, td::Timestamp::in(3.0),
409
                                std::move(P));
410
      }
411
    }
412
    peer_get_piece_.erase(promise_it);
413
    notify_node();
414
  }
415

416
  // create queries
417
  std::vector<td::uint32> new_peer_queries;
418
  for (auto &query_it : peer_get_piece_) {
419
    if (state_->peer_queries_active_.insert(query_it.first).second) {
420
      new_peer_queries.push_back(query_it.first);
421
      notify_node();
422
    }
423
  }
424
  state_->peer_queries_.add_elements(std::move(new_peer_queries));
425
}
426

427
void PeerActor::loop_notify_node() {
428
  if (!need_notify_node_) {
429
    return;
430
  }
431
  need_notify_node_ = false;
432
  state_->notify_node();
433
}
434

435
void PeerActor::execute_ping(td::uint64 session_id, td::Promise<td::BufferSlice> promise) {
436
  if (!peer_session_id_ || peer_session_id_.value() != session_id) {
437
    peer_session_id_ = session_id;
438
    peer_is_inited_ = false;
439
    peer_init_offset_ = 0;
440

441
    update_query_id_ = {};
442
    update_state_query_.query_id = {};
443
  }
444

445
  promise.set_value(ton::create_serialize_tl_object<ton::ton_api::storage_pong>());
446
}
447

448
void PeerActor::execute_add_update(ton::ton_api::storage_addUpdate &add_update, td::Promise<td::BufferSlice> promise) {
449
  auto session_id = static_cast<td::uint64>(add_update.session_id_);
450
  if (session_id != node_session_id_) {
451
    promise.set_error(td::Status::Error(404, "INVALID_SESSION"));
452
    return;
453
  }
454
  promise.set_value(ton::create_serialize_tl_object<ton::ton_api::storage_ok>());
455

456
  auto seqno = static_cast<td::uint32>(add_update.seqno_);
457
  auto update_peer_state = [&](PeerState::State peer_state) {
458
    if (peer_seqno_ >= seqno) {
459
      return;
460
    }
461
    if (state_->peer_state_ready_ && state_->peer_state_.load() == peer_state) {
462
      return;
463
    }
464
    LOG(DEBUG) << "Processing update peer state query (" << peer_state.want_download << ", " << peer_state.will_upload
465
               << ")";
466
    peer_seqno_ = seqno;
467
    state_->peer_state_.exchange(peer_state);
468
    state_->peer_state_ready_ = true;
469
    notify_node();
470
  };
471
  downcast_call(
472
      *add_update.update_,
473
      td::overloaded(
474
          [&](const ton::ton_api::storage_updateHavePieces &have_pieces) {},
475
          [&](const ton::ton_api::storage_updateState &state) { update_peer_state(from_ton_api(*state.state_)); },
476
          [&](const ton::ton_api::storage_updateInit &init) { update_peer_state(from_ton_api(*init.state_)); }));
477
  if (torrent_info_) {
478
    process_update_peer_parts(add_update.update_);
479
  } else {
480
    pending_update_peer_parts_.push_back(std::move(add_update.update_));
481
  }
482
}
483

484
void PeerActor::process_update_peer_parts(const tl_object_ptr<ton_api::storage_Update> &update) {
485
  CHECK(torrent_info_);
486
  td::uint64 pieces_count = torrent_info_->pieces_count();
487
  std::vector<td::uint32> new_peer_ready_parts;
488
  auto add_piece = [&](PartId id) {
489
    if (id < pieces_count && !peer_have_pieces_.get(id)) {
490
      peer_have_pieces_.set_one(id);
491
      new_peer_ready_parts.push_back(id);
492
      notify_node();
493
    }
494
  };
495
  downcast_call(*update,
496
                td::overloaded(
497
                    [&](const ton::ton_api::storage_updateHavePieces &have_pieces) {
498
                      LOG(DEBUG) << "Processing updateHavePieces query (" << have_pieces.piece_id_ << " pieces)";
499
                      for (auto id : have_pieces.piece_id_) {
500
                        add_piece(id);
501
                      }
502
                    },
503
                    [&](const ton::ton_api::storage_updateState &state) {},
504
                    [&](const ton::ton_api::storage_updateInit &init) {
505
                      LOG(DEBUG) << "Processing updateInit query (offset=" << init.have_pieces_offset_ * 8 << ")";
506
                      td::Bitset new_bitset;
507
                      new_bitset.set_raw(init.have_pieces_.as_slice().str());
508
                      size_t offset = init.have_pieces_offset_ * 8;
509
                      for (auto size = new_bitset.size(), i = size_t(0); i < size; i++) {
510
                        if (new_bitset.get(i)) {
511
                          add_piece(static_cast<PartId>(offset + i));
512
                        }
513
                      }
514
                    }));
515
  state_->peer_ready_parts_.add_elements(std::move(new_peer_ready_parts));
516
}
517

518
void PeerActor::execute_get_piece(ton::ton_api::storage_getPiece &get_piece, td::Promise<td::BufferSlice> promise) {
519
  PartId piece_id = get_piece.piece_id_;
520
  peer_get_piece_[piece_id] = {std::move(promise)};
521
}
522

523
void PeerActor::execute_get_torrent_info(td::Promise<td::BufferSlice> promise) {
524
  td::BufferSlice result = create_serialize_tl_object<ton_api::storage_torrentInfo>(
525
      state_->torrent_info_ready_ ? vm::std_boc_serialize(state_->torrent_info_->as_cell()).move_as_ok()
526
                                  : td::BufferSlice());
527
  if (state_->torrent_info_ready_) {
528
    LOG(DEBUG) << "Responding to getTorrentInfo: " << result.size() << " bytes";
529
  } else {
530
    LOG(DEBUG) << "Responding to getTorrentInfo: no info";
531
  }
532
  promise.set_result(std::move(result));
533
}
534
}  // namespace ton
535

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.