Ton

Форк
0
/
overlay.cpp 
658 строк · 25.6 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19
#include "auto/tl/ton_api.h"
20
#include "td/utils/Random.h"
21

22
#include "adnl/utils.hpp"
23
#include "dht/dht.h"
24

25
#include "overlay.hpp"
26
#include "auto/tl/ton_api.hpp"
27

28
#include "keys/encryptor.h"
29
#include "td/utils/StringBuilder.h"
30

31
namespace ton {
32

33
namespace overlay {
34

35
td::actor::ActorOwn<Overlay> Overlay::create(td::actor::ActorId<keyring::Keyring> keyring,
36
                                             td::actor::ActorId<adnl::Adnl> adnl,
37
                                             td::actor::ActorId<OverlayManager> manager,
38
                                             td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
39
                                             OverlayIdFull overlay_id, std::unique_ptr<Overlays::Callback> callback,
40
                                             OverlayPrivacyRules rules, td::string scope, OverlayOptions opts) {
41
  auto R = td::actor::create_actor<OverlayImpl>("overlay", keyring, adnl, manager, dht_node, local_id,
42
                                                std::move(overlay_id), true, std::vector<adnl::AdnlNodeIdShort>(),
43
                                                std::move(callback), std::move(rules), scope, opts);
44
  return td::actor::ActorOwn<Overlay>(std::move(R));
45
}
46

47
td::actor::ActorOwn<Overlay> Overlay::create(td::actor::ActorId<keyring::Keyring> keyring,
48
                                             td::actor::ActorId<adnl::Adnl> adnl,
49
                                             td::actor::ActorId<OverlayManager> manager,
50
                                             td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
51
                                             OverlayIdFull overlay_id, std::vector<adnl::AdnlNodeIdShort> nodes,
52
                                             std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules,
53
                                             std::string scope) {
54
  auto R = td::actor::create_actor<OverlayImpl>("overlay", keyring, adnl, manager, dht_node, local_id,
55
                                                std::move(overlay_id), false, std::move(nodes), std::move(callback),
56
                                                std::move(rules), std::move(scope));
57
  return td::actor::ActorOwn<Overlay>(std::move(R));
58
}
59

60
OverlayImpl::OverlayImpl(td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
61
                         td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node,
62
                         adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, bool pub,
63
                         std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Overlays::Callback> callback,
64
                         OverlayPrivacyRules rules, td::string scope, OverlayOptions opts)
65
    : keyring_(keyring)
66
    , adnl_(adnl)
67
    , manager_(manager)
68
    , dht_node_(dht_node)
69
    , local_id_(local_id)
70
    , id_full_(std::move(overlay_id))
71
    , callback_(std::move(callback))
72
    , public_(pub)
73
    , rules_(std::move(rules))
74
    , scope_(scope)
75
    , announce_self_(opts.announce_self_)
76
    , frequent_dht_lookup_(opts.frequent_dht_lookup_) {
77
  overlay_id_ = id_full_.compute_short_id();
78

79
  VLOG(OVERLAY_INFO) << this << ": creating " << (public_ ? "public" : "private");
80

81
  for (auto &node : nodes) {
82
    CHECK(!public_);
83
    auto X = OverlayNode{node, overlay_id_};
84
    do_add_peer(std::move(X));
85
  }
86

87
  update_neighbours(static_cast<td::uint32>(nodes.size()));
88
}
89

90
void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getRandomPeers &query,
91
                                td::Promise<td::BufferSlice> promise) {
92
  if (public_) {
93
    VLOG(OVERLAY_DEBUG) << this << ": received " << query.peers_->nodes_.size() << " nodes from " << src
94
                        << " in getRandomPeers query";
95
    std::vector<OverlayNode> nodes;
96
    for (auto &n : query.peers_->nodes_) {
97
      auto N = OverlayNode::create(n);
98
      if (N.is_ok()) {
99
        nodes.emplace_back(N.move_as_ok());
100
      }
101
    }
102
    add_peers(std::move(nodes));
103
    send_random_peers(src, std::move(promise));
104
  } else {
105
    VLOG(OVERLAY_WARNING) << this << ": DROPPING getRandomPeers query from " << src << " in private overlay";
106
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "overlay is private"));
107
  }
108
}
109

110
void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getBroadcast &query,
111
                                td::Promise<td::BufferSlice> promise) {
112
  auto it = broadcasts_.find(query.hash_);
113
  if (it == broadcasts_.end()) {
114
    VLOG(OVERLAY_NOTICE) << this << ": received getBroadcastQuery(" << query.hash_ << ") from " << src
115
                         << " but broadcast is unknown";
116
    promise.set_value(create_serialize_tl_object<ton_api::overlay_broadcastNotFound>());
117
    return;
118
  }
119
  if (delivered_broadcasts_.find(query.hash_) != delivered_broadcasts_.end()) {
120
    VLOG(OVERLAY_DEBUG) << this << ": received getBroadcastQuery(" << query.hash_ << ") from " << src
121
                        << " but broadcast already deleted";
122
    promise.set_value(create_serialize_tl_object<ton_api::overlay_broadcastNotFound>());
123
    return;
124
  }
125

126
  VLOG(OVERLAY_DEBUG) << this << ": received getBroadcastQuery(" << query.hash_ << ") from " << src
127
                      << " sending broadcast";
128
  promise.set_value(it->second->serialize());
129
}
130

131
void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getBroadcastList &query,
132
                                td::Promise<td::BufferSlice> promise) {
133
  VLOG(OVERLAY_WARNING) << this << ": DROPPING getBroadcastList query";
134
  promise.set_error(td::Status::Error(ErrorCode::protoviolation, "dropping get broadcast list query"));
135
}
136

137
/*void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, adnl::AdnlQueryId query_id, ton_api::overlay_customQuery &query) {
138
  callback_->receive_query(src, query_id, id_, std::move(query.data_));
139
}
140
*/
141

142
void OverlayImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
143
  if (!public_) {
144
    auto P = peers_.get(src);
145
    if (P == nullptr) {
146
      VLOG(OVERLAY_WARNING) << this << ": received query in private overlay from unknown source " << src;
147
      promise.set_error(td::Status::Error(ErrorCode::protoviolation, "overlay is private"));
148
      return;
149
    }
150
  } else {
151
    on_ping_result(src, true);
152
  }
153
  auto R = fetch_tl_object<ton_api::Function>(data.clone(), true);
154

155
  if (R.is_error()) {
156
    // allow custom query to be here
157
    callback_->receive_query(src, overlay_id_, std::move(data), std::move(promise));
158
    return;
159
  }
160

161
  auto Q = R.move_as_ok();
162

163
  VLOG(OVERLAY_EXTRA_DEBUG) << this << "query from " << src << ": " << ton_api::to_string(Q);
164

165
  ton_api::downcast_call(*Q.get(), [&](auto &object) { this->process_query(src, object, std::move(promise)); });
166
}
167

168
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
169
                                          tl_object_ptr<ton_api::overlay_broadcast> bcast) {
170
  return BroadcastSimple::create(this, message_from, std::move(bcast));
171
}
172

173
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
174
                                          tl_object_ptr<ton_api::overlay_broadcastFec> b) {
175
  return OverlayFecBroadcastPart::create(this, message_from, std::move(b));
176
}
177

178
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
179
                                          tl_object_ptr<ton_api::overlay_broadcastFecShort> b) {
180
  return OverlayFecBroadcastPart::create(this, message_from, std::move(b));
181
}
182

183
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
184
                                          tl_object_ptr<ton_api::overlay_broadcastNotFound> bcast) {
185
  return td::Status::Error(ErrorCode::protoviolation,
186
                           PSTRING() << "received strange message broadcastNotFound from " << message_from);
187
}
188

189
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
190
                                          tl_object_ptr<ton_api::overlay_fec_received> msg) {
191
  auto it = fec_broadcasts_.find(msg->hash_);
192
  if (it != fec_broadcasts_.end()) {
193
    VLOG(OVERLAY_DEBUG) << this << ": received fec opt-out message from " << message_from << " for broadcast "
194
                        << msg->hash_;
195
    it->second->add_received(message_from);
196
  } else {
197
    VLOG(OVERLAY_DEBUG) << this << ": received fec opt-out message from " << message_from << " for unknown broadcast "
198
                        << msg->hash_;
199
  }
200
  return td::Status::OK();
201
}
202

203
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
204
                                          tl_object_ptr<ton_api::overlay_fec_completed> msg) {
205
  auto it = fec_broadcasts_.find(msg->hash_);
206
  if (it != fec_broadcasts_.end()) {
207
    VLOG(OVERLAY_DEBUG) << this << ": received fec completed message from " << message_from << " for broadcast "
208
                        << msg->hash_;
209
    it->second->add_completed(message_from);
210
  } else {
211
    VLOG(OVERLAY_DEBUG) << this << ": received fec completed message from " << message_from << " for unknown broadcast "
212
                        << msg->hash_;
213
  }
214
  return td::Status::OK();
215
}
216

217
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
218
                                          tl_object_ptr<ton_api::overlay_unicast> msg) {
219
  VLOG(OVERLAY_DEBUG) << this << ": received unicast from " << message_from;
220
  callback_->receive_message(message_from, overlay_id_, std::move(msg->data_));
221
  return td::Status::OK();
222
}
223

224
void OverlayImpl::receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data) {
225
  if (!public_) {
226
    if (peers_.get(src) == nullptr) {
227
      VLOG(OVERLAY_WARNING) << this << ": received query in private overlay from unknown source " << src;
228
      return;
229
    }
230
  } else {
231
    on_ping_result(src, true);
232
  }
233
  auto X = fetch_tl_object<ton_api::overlay_Broadcast>(data.clone(), true);
234
  if (X.is_error()) {
235
    VLOG(OVERLAY_DEBUG) << this << ": received custom message";
236
    callback_->receive_message(src, overlay_id_, std::move(data));
237
    return;
238
  }
239
  auto Q = X.move_as_ok();
240
  ton_api::downcast_call(*Q.get(), [Self = this, &Q, &src](auto &object) {
241
    Self->process_broadcast(src, move_tl_object_as<std::remove_reference_t<decltype(object)>>(Q));
242
  });
243
}
244

245
void OverlayImpl::alarm() {
246
  bcast_gc();
247
  
248
  if(update_throughput_at_.is_in_past()) {
249
    double t_elapsed = td::Time::now() - last_throughput_update_.at();
250

251
    auto SelfId = actor_id(this);
252
    peers_.iterate([&](const adnl::AdnlNodeIdShort &key, OverlayPeer &peer) {
253
      peer.throughput_out_bytes = static_cast<td::uint32>(peer.throughput_out_bytes_ctr / t_elapsed);
254
      peer.throughput_in_bytes = static_cast<td::uint32>(peer.throughput_in_bytes_ctr / t_elapsed);
255
      
256
      peer.throughput_out_packets = static_cast<td::uint32>(peer.throughput_out_packets_ctr / t_elapsed);
257
      peer.throughput_in_packets = static_cast<td::uint32>(peer.throughput_in_packets_ctr / t_elapsed);
258
      
259
      peer.throughput_out_bytes_ctr = 0;
260
      peer.throughput_in_bytes_ctr = 0;
261
      
262
      peer.throughput_out_packets_ctr = 0;
263
      peer.throughput_in_packets_ctr = 0;
264
      
265
      auto P = td::PromiseCreator::lambda([SelfId, peer_id = key](td::Result<td::string> result) {
266
        result.ensure();
267
        td::actor::send_closure(SelfId, &Overlay::update_peer_ip_str, peer_id, result.move_as_ok());
268
      });
269
      
270
      td::actor::send_closure(adnl_, &adnl::AdnlSenderInterface::get_conn_ip_str, local_id_, key, std::move(P));
271
    });
272
    
273
    update_throughput_at_ = td::Timestamp::in(50.0);
274
    last_throughput_update_ = td::Timestamp::now();
275
  }
276
  
277
  if (public_) {
278
    if (peers_.size() > 0) {
279
      auto P = get_random_peer();
280
      if (P) {
281
        send_random_peers(P->get_id(), {});
282
      }
283
    }
284
    if (next_dht_query_ && next_dht_query_.is_in_past()) {
285
      next_dht_query_ = td::Timestamp::never();
286
      auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<dht::DhtValue> res) {
287
        td::actor::send_closure(SelfId, &OverlayImpl::receive_dht_nodes, std::move(res), true);
288
      });
289
      td::actor::send_closure(dht_node_, &dht::Dht::get_value, dht::DhtKey{overlay_id_.pubkey_hash(), "nodes", 0},
290
                              std::move(P));
291
    }
292
    if (update_db_at_.is_in_past()) {
293
      if (peers_.size() > 0) {
294
        std::vector<OverlayNode> vec;
295
        for (td::uint32 i = 0; i < 20; i++) {
296
          auto P = get_random_peer();
297
          if (!P) {
298
            break;
299
          }
300
          vec.push_back(P->get());
301
        }
302
        td::actor::send_closure(manager_, &OverlayManager::save_to_db, local_id_, overlay_id_, std::move(vec));
303
      }
304
      update_db_at_ = td::Timestamp::in(60.0);
305
    }
306

307
    alarm_timestamp() = td::Timestamp::in(1.0);
308
  } else {
309
    update_neighbours(0);
310
    alarm_timestamp() = td::Timestamp::in(60.0 + td::Random::fast(0, 100) * 0.6);
311
  }
312
}
313

314
void OverlayImpl::receive_dht_nodes(td::Result<dht::DhtValue> res, bool dummy) {
315
  CHECK(public_);
316
  if (res.is_ok()) {
317
    auto v = res.move_as_ok();
318
    auto R = fetch_tl_object<ton_api::overlay_nodes>(v.value().clone(), true);
319
    if (R.is_ok()) {
320
      auto r = R.move_as_ok();
321
      VLOG(OVERLAY_INFO) << this << ": received " << r->nodes_.size() << " nodes from overlay";
322
      VLOG(OVERLAY_EXTRA_DEBUG) << this << ": nodes: " << ton_api::to_string(r);
323
      std::vector<OverlayNode> nodes;
324
      for (auto &n : r->nodes_) {
325
        auto N = OverlayNode::create(n);
326
        if (N.is_ok()) {
327
          nodes.emplace_back(N.move_as_ok());
328
        }
329
      }
330
      add_peers(std::move(nodes));
331
    } else {
332
      VLOG(OVERLAY_WARNING) << this << ": incorrect value in DHT for overlay nodes: " << R.move_as_error();
333
    }
334
  } else {
335
    VLOG(OVERLAY_NOTICE) << this << ": can not get value from DHT: " << res.move_as_error();
336
  }
337

338
  if (!(next_dht_store_query_ && next_dht_store_query_.is_in_past())) {
339
    finish_dht_query();
340
    return;
341
  }
342
  next_dht_store_query_ = td::Timestamp::never();
343
  if (!announce_self_) {
344
    finish_dht_query();
345
    return;
346
  }
347

348
  VLOG(OVERLAY_INFO) << this << ": adding self node to DHT overlay's nodes";
349
  auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), oid = print_id()](td::Result<OverlayNode> R) {
350
    if (R.is_error()) {
351
      LOG(ERROR) << oid << "cannot get self node";
352
      td::actor::send_closure(SelfId, &OverlayImpl::finish_dht_query);
353
      return;
354
    }
355
    td::actor::send_closure(SelfId, &OverlayImpl::update_dht_nodes, R.move_as_ok());
356
  });
357
  get_self_node(std::move(P));
358
}
359

360
void OverlayImpl::update_dht_nodes(OverlayNode node) {
361
  if (!public_) {
362
    return;
363
  }
364

365
  auto nodes = create_tl_object<ton_api::overlay_nodes>(std::vector<tl_object_ptr<ton_api::overlay_node>>());
366
  nodes->nodes_.emplace_back(node.tl());
367

368
  dht::DhtKey dht_key{overlay_id_.pubkey_hash(), "nodes", 0};
369
  auto update_rule = dht::DhtUpdateRuleOverlayNodes::create();
370
  dht::DhtKeyDescription dht_key_descr(std::move(dht_key), id_full_.pubkey(), update_rule.move_as_ok(),
371
                                       td::BufferSlice());
372
  dht_key_descr.check().ensure();
373
  dht::DhtValue value{std::move(dht_key_descr), serialize_tl_object(nodes, true),
374
                      static_cast<td::uint32>(td::Clocks::system() + 3600), td::BufferSlice()};
375
  value.check().ensure();
376

377
  auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), oid = print_id()](td::Result<td::Unit> res) {
378
    if (res.is_error()) {
379
      VLOG(OVERLAY_NOTICE) << oid << ": error storing to DHT: " << res.move_as_error();
380
    }
381
    td::actor::send_closure(SelfId, &OverlayImpl::finish_dht_query);
382
  });
383

384
  td::actor::send_closure(dht_node_, &dht::Dht::set_value, std::move(value), std::move(P));
385
}
386

387
void OverlayImpl::bcast_gc() {
388
  while (broadcasts_.size() > max_data_bcasts()) {
389
    auto bcast = BroadcastSimple::from_list_node(bcast_data_lru_.get());
390
    CHECK(bcast);
391
    auto hash = bcast->get_hash();
392
    broadcasts_.erase(hash);
393
    if (delivered_broadcasts_.insert(hash).second) {
394
      bcast_lru_.push(hash);
395
    }
396
  }
397
  while (fec_broadcasts_.size() > 0) {
398
    auto bcast = BroadcastFec::from_list_node(bcast_fec_lru_.prev);
399
    CHECK(bcast);
400
    if (bcast->get_date() > td::Clocks::system() - 60) {
401
      break;
402
    }
403
    auto hash = bcast->get_hash();
404
    CHECK(fec_broadcasts_.count(hash) == 1);
405
    fec_broadcasts_.erase(hash);
406
    if (delivered_broadcasts_.insert(hash).second) {
407
      bcast_lru_.push(hash);
408
    }
409
  }
410
  while (bcast_lru_.size() > max_bcasts()) {
411
    auto Id = bcast_lru_.front();
412
    bcast_lru_.pop();
413
    CHECK(delivered_broadcasts_.erase(Id));
414
  }
415
  CHECK(delivered_broadcasts_.size() == bcast_lru_.size());
416
}
417

418
void OverlayImpl::send_message_to_neighbours(td::BufferSlice data) {
419
  for (auto &n : neighbours_) {
420
    td::actor::send_closure(manager_, &OverlayManager::send_message, n, local_id_, overlay_id_, data.clone());
421
  }
422
}
423

424
void OverlayImpl::send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
425
  auto S = BroadcastSimple::create_new(actor_id(this), keyring_, send_as, std::move(data), flags);
426
  if (S.is_error()) {
427
    LOG(WARNING) << "failed to send broadcast: " << S;
428
  }
429
}
430

431
void OverlayImpl::send_broadcast_fec(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
432
  OverlayOutboundFecBroadcast::create(std::move(data), flags, actor_id(this), send_as);
433
}
434

435
void OverlayImpl::print(td::StringBuilder &sb) {
436
  sb << this;
437
}
438

439
td::Status OverlayImpl::check_date(td::uint32 date) {
440
  auto n = td::Clocks::system();
441
  if (date < n - 20) {
442
    return td::Status::Error(ErrorCode::notready, "too old broadcast");
443
  }
444
  if (date > n + 20) {
445
    return td::Status::Error(ErrorCode::notready, "too new broadcast");
446
  }
447
  return td::Status::OK();
448
}
449

450
BroadcastCheckResult OverlayImpl::check_source_eligible(PublicKey source, const Certificate *cert, td::uint32 size,
451
                                                        bool is_fec) {
452
  if (size == 0) {
453
    return BroadcastCheckResult::Forbidden;
454
  }
455
  auto short_id = source.compute_short_id();
456

457
  auto r = rules_.check_rules(source.compute_short_id(), size, is_fec);
458
  if (!cert || r == BroadcastCheckResult::Allowed) {
459
    return r;
460
  }
461

462
  auto r2 = cert->check(short_id, overlay_id_, static_cast<td::int32>(td::Clocks::system()), size, is_fec);
463
  r2 = broadcast_check_result_min(r2, rules_.check_rules(cert->issuer_hash(), size, is_fec));
464
  return broadcast_check_result_max(r, r2);
465
}
466

467
td::Status OverlayImpl::check_delivered(BroadcastHash hash) {
468
  if (delivered_broadcasts_.count(hash) == 1 || broadcasts_.count(hash) == 1) {
469
    return td::Status::Error(ErrorCode::notready, "duplicate broadcast");
470
  } else {
471
    return td::Status::OK();
472
  }
473
}
474

475
BroadcastFec *OverlayImpl::get_fec_broadcast(BroadcastHash hash) {
476
  auto it = fec_broadcasts_.find(hash);
477
  if (it == fec_broadcasts_.end()) {
478
    return nullptr;
479
  } else {
480
    return it->second.get();
481
  }
482
}
483

484
void OverlayImpl::register_fec_broadcast(std::unique_ptr<BroadcastFec> bcast) {
485
  auto hash = bcast->get_hash();
486
  bcast_fec_lru_.put(bcast.get());
487
  fec_broadcasts_.emplace(hash, std::move(bcast));
488
  bcast_gc();
489
}
490

491
void OverlayImpl::get_self_node(td::Promise<OverlayNode> promise) {
492
  OverlayNode s{local_id_, overlay_id_};
493
  auto to_sign = s.to_sign();
494
  auto P = td::PromiseCreator::lambda([oid = print_id(), s = std::move(s), promise = std::move(promise)](
495
                                          td::Result<std::pair<td::BufferSlice, PublicKey>> R) mutable {
496
    if (R.is_error()) {
497
      auto S = R.move_as_error();
498
      LOG(ERROR) << oid << ": failed to get self node: " << S;
499
      promise.set_error(std::move(S));
500
      return;
501
    }
502
    auto V = R.move_as_ok();
503
    s.update_signature(std::move(V.first));
504
    s.update_adnl_id(adnl::AdnlNodeIdFull{V.second});
505
    promise.set_value(std::move(s));
506
  });
507

508
  td::actor::send_closure(keyring_, &keyring::Keyring::sign_add_get_public_key, local_id_.pubkey_hash(),
509
                          std::move(to_sign), std::move(P));
510
}
511

512
void OverlayImpl::send_new_fec_broadcast_part(PublicKeyHash local_id, Overlay::BroadcastDataHash data_hash,
513
                                              td::uint32 size, td::uint32 flags, td::BufferSlice part, td::uint32 seqno,
514
                                              fec::FecType fec_type, td::uint32 date) {
515
  auto S = OverlayFecBroadcastPart::create_new(this, actor_id(this), local_id, data_hash, size, flags, std::move(part),
516
                                               seqno, std::move(fec_type), date);
517
  if (S.is_error() && S.code() != ErrorCode::notready) {
518
    LOG(WARNING) << "failed to send broadcast part: " << S;
519
  }
520
}
521

522
void OverlayImpl::deliver_broadcast(PublicKeyHash source, td::BufferSlice data) {
523
  callback_->receive_broadcast(source, overlay_id_, std::move(data));
524
}
525

526
void OverlayImpl::failed_to_create_fec_broadcast(td::Status reason) {
527
  if (reason.code() == ErrorCode::notready) {
528
    LOG(DEBUG) << "failed to receive fec broadcast: " << reason;
529
  } else {
530
    LOG(WARNING) << "failed to receive fec broadcast: " << reason;
531
  }
532
}
533

534
void OverlayImpl::created_fec_broadcast(PublicKeyHash local_id, std::unique_ptr<OverlayFecBroadcastPart> bcast) {
535
  bcast->update_overlay(this);
536
  auto S = bcast->run();
537
  if (S.is_error() && S.code() != ErrorCode::notready) {
538
    LOG(WARNING) << "failed to send fec broadcast: " << S;
539
  }
540
}
541

542
void OverlayImpl::failed_to_create_simple_broadcast(td::Status reason) {
543
  if (reason.code() == ErrorCode::notready) {
544
    LOG(DEBUG) << "failed to send simple broadcast: " << reason;
545
  } else {
546
    LOG(WARNING) << "failed to send simple broadcast: " << reason;
547
  }
548
}
549

550
void OverlayImpl::created_simple_broadcast(std::unique_ptr<BroadcastSimple> bcast) {
551
  bcast->update_overlay(this);
552
  auto S = bcast->run();
553
  register_simple_broadcast(std::move(bcast));
554
  if (S.is_error() && S.code() != ErrorCode::notready) {
555
    LOG(WARNING) << "failed to receive fec broadcast: " << S;
556
  }
557
}
558

559
void OverlayImpl::register_simple_broadcast(std::unique_ptr<BroadcastSimple> bcast) {
560
  auto hash = bcast->get_hash();
561
  bcast_data_lru_.put(bcast.get());
562
  broadcasts_.emplace(hash, std::move(bcast));
563
  bcast_gc();
564
}
565

566
td::Result<Encryptor *> OverlayImpl::get_encryptor(PublicKey source) {
567
  auto short_id = source.compute_short_id();
568
  auto it = encryptor_map_.find(short_id);
569
  if (it != encryptor_map_.end()) {
570
    return it->second->get();
571
  }
572
  TRY_RESULT(e, source.create_encryptor());
573
  auto res = e.get();
574
  auto cache = std::make_unique<CachedEncryptor>(short_id, std::move(e));
575
  encryptor_lru_.put(cache.get());
576
  encryptor_map_.emplace(short_id, std::move(cache));
577
  while (encryptor_map_.size() > max_encryptors()) {
578
    auto x = CachedEncryptor::from_list_node(encryptor_lru_.get());
579
    auto id = x->id();
580
    encryptor_map_.erase(id);
581
  }
582
  return res;
583
}
584

585
std::shared_ptr<Certificate> OverlayImpl::get_certificate(PublicKeyHash source) {
586
  auto it = certs_.find(source);
587
  return (it == certs_.end()) ? nullptr : it->second;
588
}
589

590
void OverlayImpl::set_privacy_rules(OverlayPrivacyRules rules) {
591
  rules_ = std::move(rules);
592
}
593

594
void OverlayImpl::check_broadcast(PublicKeyHash src, td::BufferSlice data, td::Promise<td::Unit> promise) {
595
  callback_->check_broadcast(src, overlay_id_, std::move(data), std::move(promise));
596
}
597

598
void OverlayImpl::update_peer_err_ctr(adnl::AdnlNodeIdShort peer_id, bool is_fec) {
599
  auto src_peer = peers_.get(peer_id);
600
  if(src_peer) {
601
    if(is_fec) {
602
      src_peer->fec_broadcast_errors++;
603
    } else {
604
      src_peer->broadcast_errors++;
605
    }
606
  }
607
}
608

609
void OverlayImpl::broadcast_checked(Overlay::BroadcastHash hash, td::Result<td::Unit> R) {
610
  {
611
    auto it = broadcasts_.find(hash);
612
    if (it != broadcasts_.end()) {
613
      it->second->broadcast_checked(std::move(R));
614
    }
615
  }
616
  {
617
    auto it = fec_broadcasts_.find(hash);
618
    if (it != fec_broadcasts_.end()) {
619
      it->second->broadcast_checked(std::move(R));
620
    }
621
  }
622
}
623

624
void OverlayImpl::get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_overlayStats>> promise) {
625
  auto res = create_tl_object<ton_api::engine_validator_overlayStats>();
626
  res->adnl_id_ = local_id_.bits256_value();
627
  res->overlay_id_ = overlay_id_.bits256_value();
628
  res->overlay_id_full_ = id_full_.pubkey().tl();
629
  res->scope_ = scope_;
630
  peers_.iterate([&](const adnl::AdnlNodeIdShort &key, const OverlayPeer &peer) {
631
    auto node_obj = create_tl_object<ton_api::engine_validator_overlayStatsNode>();
632
    node_obj->adnl_id_ = key.bits256_value();
633
    node_obj->t_out_bytes_ = peer.throughput_out_bytes;
634
    node_obj->t_in_bytes_ = peer.throughput_in_bytes;
635
    
636
    node_obj->t_out_pckts_ = peer.throughput_out_packets;
637
    node_obj->t_in_pckts_ = peer.throughput_in_packets;
638
   
639
    node_obj->ip_addr_ = peer.ip_addr_str;
640
    
641
    node_obj->last_in_query_ = static_cast<td::uint32>(peer.last_in_query_at.at_unix());
642
    node_obj->last_out_query_ = static_cast<td::uint32>(peer.last_out_query_at.at_unix());
643
    
644
    node_obj->bdcst_errors_ = peer.broadcast_errors;
645
    node_obj->fec_bdcst_errors_ = peer.fec_broadcast_errors;
646
 
647
    res->nodes_.push_back(std::move(node_obj));
648
  });
649

650
  res->stats_.push_back(
651
      create_tl_object<ton_api::engine_validator_oneStat>("neighbours_cnt", PSTRING() << neighbours_.size()));
652

653
  promise.set_value(std::move(res));
654
}
655

656
}  // namespace overlay
657

658
}  // namespace ton
659

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.