Ton

Форк
0
/
dht.cpp 
748 строк · 30.2 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19
#include "dht.hpp"
20

21
#include "td/utils/tl_storers.h"
22
#include "td/utils/crypto.h"
23
#include "td/utils/Random.h"
24
#include "td/utils/base64.h"
25

26
#include "td/utils/format.h"
27

28
#include "td/db/RocksDb.h"
29

30
#include "auto/tl/ton_api.hpp"
31

32
#include "dht.h"
33
#include "dht-bucket.hpp"
34
#include "dht-query.hpp"
35
#include "dht-in.hpp"
36

37
namespace ton {
38

39
namespace dht {
40

41
td::actor::ActorOwn<DhtMember> DhtMember::create(adnl::AdnlNodeIdShort id, std::string db_root,
42
                                                 td::actor::ActorId<keyring::Keyring> keyring,
43
                                                 td::actor::ActorId<adnl::Adnl> adnl, td::int32 network_id,
44
                                                 td::uint32 k, td::uint32 a, bool client_only) {
45
  return td::actor::create_actor<DhtMemberImpl>("dht", id, db_root, keyring, adnl, network_id, k, a, client_only);
46
}
47

48
td::Result<td::actor::ActorOwn<Dht>> Dht::create(adnl::AdnlNodeIdShort id, std::string db_root,
49
                                                 std::shared_ptr<DhtGlobalConfig> conf,
50
                                                 td::actor::ActorId<keyring::Keyring> keyring,
51
                                                 td::actor::ActorId<adnl::Adnl> adnl) {
52
  CHECK(conf->get_k() > 0);
53
  CHECK(conf->get_a() > 0);
54

55
  auto D = DhtMember::create(id, db_root, keyring, adnl, conf->get_network_id(), conf->get_k(), conf->get_a());
56
  auto &nodes = conf->nodes();
57

58
  for (auto &node : nodes.list()) {
59
    auto key = node.get_key();
60
    td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone(), true);
61
  }
62
  return std::move(D);
63
}
64

65
td::Result<td::actor::ActorOwn<Dht>> Dht::create_client(adnl::AdnlNodeIdShort id, std::string db_root,
66
                                                        std::shared_ptr<DhtGlobalConfig> conf,
67
                                                        td::actor::ActorId<keyring::Keyring> keyring,
68
                                                        td::actor::ActorId<adnl::Adnl> adnl) {
69
  CHECK(conf->get_k() > 0);
70
  CHECK(conf->get_a() > 0);
71

72
  auto D = DhtMember::create(id, db_root, keyring, adnl, conf->get_network_id(), conf->get_k(), conf->get_a(), true);
73
  auto &nodes = conf->nodes();
74

75
  for (auto &node : nodes.list()) {
76
    auto key = node.get_key();
77
    td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone(), true);
78
  }
79
  return std::move(D);
80
}
81

82
void DhtMemberImpl::start_up() {
83
  std::vector<td::int32> methods = {ton_api::dht_getSignedAddressList::ID,
84
                                    ton_api::dht_findNode::ID,
85
                                    ton_api::dht_findValue::ID,
86
                                    ton_api::dht_store::ID,
87
                                    ton_api::dht_ping::ID,
88
                                    ton_api::dht_registerReverseConnection::ID,
89
                                    ton_api::dht_requestReversePing::ID,
90
                                    ton_api::dht_query::ID,
91
                                    ton_api::dht_message::ID,
92
                                    ton_api::dht_requestReversePingCont::ID};
93

94
  for (auto it : methods) {
95
    td::actor::send_closure(adnl_, &adnl::Adnl::subscribe, id_, adnl::Adnl::int_to_bytestring(it),
96
                            std::make_unique<Callback>(actor_id(this), id_));
97
  }
98
  alarm_timestamp() = td::Timestamp::in(1.0);
99

100
  if (!db_root_.empty()) {
101
    std::shared_ptr<td::KeyValue> kv = std::make_shared<td::RocksDb>(
102
        td::RocksDb::open(PSTRING() << db_root_ << "/dht-" << td::base64url_encode(id_.as_slice())).move_as_ok());
103
    for (td::uint32 bit = 0; bit < 256; bit++) {
104
      auto key = create_hash_tl_object<ton_api::dht_db_key_bucket>(bit);
105
      std::string value;
106
      auto R = kv->get(key.as_slice(), value);
107
      R.ensure();
108
      if (R.move_as_ok() == td::KeyValue::GetStatus::Ok) {
109
        auto V = fetch_tl_object<ton_api::dht_db_bucket>(td::BufferSlice{value}, true);
110
        V.ensure();
111
        auto nodes = std::move(V.move_as_ok()->nodes_);
112
        auto s = nodes->nodes_.size();
113
        DhtNodesList list{std::move(nodes), network_id_};
114
        CHECK(list.size() <= s);  // Some nodes can be dropped due to a wrong network id
115
        auto &B = buckets_[bit];
116
        for (auto &node : list.list()) {
117
          auto key = node.get_key();
118
          B.add_full_node(key, std::move(node), adnl_, id_, network_id_);
119
        }
120
      }
121
    }
122
    db_ = DbType{std::move(kv)};
123
  }
124
}
125

126
void DhtMemberImpl::tear_down() {
127
  std::vector<td::int32> methods = {ton_api::dht_getSignedAddressList::ID,
128
                                    ton_api::dht_findNode::ID,
129
                                    ton_api::dht_findValue::ID,
130
                                    ton_api::dht_store::ID,
131
                                    ton_api::dht_ping::ID,
132
                                    ton_api::dht_registerReverseConnection::ID,
133
                                    ton_api::dht_requestReversePing::ID,
134
                                    ton_api::dht_query::ID,
135
                                    ton_api::dht_message::ID,
136
                                    ton_api::dht_requestReversePingCont::ID};
137

138
  for (auto it : methods) {
139
    td::actor::send_closure(adnl_, &adnl::Adnl::unsubscribe, id_, adnl::Adnl::int_to_bytestring(it));
140
  }
141
}
142

143
void DhtMemberImpl::save_to_db() {
144
  if (db_root_.empty()) {
145
    return;
146
  }
147
  next_save_to_db_at_ = td::Timestamp::in(10.0);
148
  alarm_timestamp().relax(next_save_to_db_at_);
149

150
  td::uint32 bit = td::Random::fast(0, 255);
151
  auto &B = buckets_[bit];
152
  auto list = B.export_nodes();
153
  if (list.size() > 0) {
154
    auto key = create_hash_tl_object<ton_api::dht_db_key_bucket>(bit);
155
    auto value = create_serialize_tl_object<ton_api::dht_db_bucket>(list.tl());
156

157
    db_.set(key, std::move(value));
158
  }
159
}
160

161
DhtNodesList DhtMemberImpl::get_nearest_nodes(DhtKeyId id, td::uint32 k) {
162
  DhtNodesList vec;
163

164
  auto id_xor = id ^ key_;
165

166
  for (td::uint32 bit = 0; bit < 256; bit++) {
167
    if (id_xor.get_bit(bit)) {
168
      buckets_[bit].get_nearest_nodes(id, bit, vec, k);
169
      if (vec.size() >= k) {
170
        break;
171
      }
172
    }
173
  }
174

175
  for (auto &el : vec.list()) {
176
    CHECK((el.get_key() ^ id) < id_xor);
177
  }
178
  if (vec.size() < k) {
179
    for (td::uint32 bit = 255; bit != 256; bit = bit ? (bit - 1) : 256) {
180
      if (!id_xor.get_bit(bit)) {
181
        buckets_[bit].get_nearest_nodes(id, bit, vec, k);
182
        if (vec.size() >= k) {
183
          break;
184
        }
185
      }
186
    }
187
  }
188
  CHECK(vec.size() <= k);
189
  return vec;
190
}
191

192
td::uint32 DhtMemberImpl::distance(DhtKeyId key_id, td::uint32 max_value) {
193
  if (!max_value) {
194
    max_value = 2 * k_;
195
  }
196
  td::uint32 res = 0;
197
  auto id_xor = key_id ^ key_;
198

199
  for (td::uint32 bit = 0; bit < 256; bit++) {
200
    if (id_xor.get_bit(bit)) {
201
      res += buckets_[bit].active_cnt();
202
      if (res >= max_value) {
203
        return max_value;
204
      }
205
    }
206
  }
207
  return res;
208
}
209

210
void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_ping &query,
211
                                  td::Promise<td::BufferSlice> promise) {
212
  ping_queries_++;
213
  promise.set_value(create_serialize_tl_object<ton_api::dht_pong>(query.random_id_));
214
}
215

216
void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_findNode &query,
217
                                  td::Promise<td::BufferSlice> promise) {
218
  find_node_queries_++;
219
  auto k = static_cast<td::uint32>(query.k_);
220
  if (k > max_k()) {
221
    k = max_k();
222
  }
223
  auto R = get_nearest_nodes(DhtKeyId{query.key_}, k);
224
  promise.set_value(serialize_tl_object(R.tl(), true));
225
}
226

227
void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_findValue &query,
228
                                  td::Promise<td::BufferSlice> promise) {
229
  find_value_queries_++;
230
  auto it = values_.find(DhtKeyId{query.key_});
231
  if (it != values_.end() && it->second.expired()) {
232
    values_.erase(it);
233
    it = values_.end();
234
  }
235
  if (it != values_.end()) {
236
    promise.set_value(create_serialize_tl_object<ton_api::dht_valueFound>(it->second.tl()));
237
    return;
238
  }
239

240
  auto k = static_cast<td::uint32>(query.k_);
241
  if (k > max_k()) {
242
    k = max_k();
243
  }
244
  auto R = get_nearest_nodes(DhtKeyId{query.key_}, k);
245

246
  promise.set_value(create_serialize_tl_object<ton_api::dht_valueNotFound>(R.tl()));
247
}
248

249
td::Status DhtMemberImpl::store_in(DhtValue value) {
250
  if (value.expired()) {
251
    VLOG(DHT_INFO) << this << ": dropping expired value: " << value.key_id() << " expire_at = " << value.ttl();
252
    return td::Status::OK();
253
  }
254
  TRY_STATUS(value.check());
255

256
  auto key_id = value.key_id();
257

258
  auto dist = distance(key_id, k_ + 10);
259
  if (dist < k_ + 10) {
260
    auto it = values_.find(key_id);
261
    if (it != values_.end()) {
262
      it->second.update(std::move(value));
263
    } else {
264
      values_.emplace(key_id, std::move(value));
265
    }
266
  } else {
267
    VLOG(DHT_INFO) << this << ": dropping too remote value: " << value.key_id() << " distance = " << dist;
268
  }
269
  return td::Status::OK();
270
}
271

272
void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_store &query,
273
                                  td::Promise<td::BufferSlice> promise) {
274
  store_queries_++;
275
  auto V = DhtValue::create(std::move(query.value_), true);
276
  if (V.is_error()) {
277
    promise.set_error(td::Status::Error(ErrorCode::protoviolation,
278
                                        PSTRING() << "dropping invalid dht_store() value: " << V.error().to_string()));
279
    VLOG(DHT_INFO) << this << ": dropping invalid value: " << V.move_as_error();
280
    return;
281
  }
282
  auto b = store_in(V.move_as_ok());
283

284
  if (b.is_ok()) {
285
    promise.set_value(create_serialize_tl_object<ton_api::dht_stored>());
286
  } else {
287
    VLOG(DHT_INFO) << this << ": dropping store() query from " << src << ": " << b.move_as_error();
288
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "dropping dht_store() query"));
289
  }
290
}
291

292
void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_getSignedAddressList &query,
293
                                  td::Promise<td::BufferSlice> promise) {
294
  get_addr_list_queries_++;
295

296
  auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<DhtNode> R) mutable {
297
    R.ensure();
298
    promise.set_value(serialize_tl_object(R.move_as_ok().tl(), true));
299
  });
300
  get_self_node(std::move(P));
301
}
302

303
static td::BufferSlice register_reverse_connection_to_sign(adnl::AdnlNodeIdShort client, adnl::AdnlNodeIdShort dht_id,
304
                                                           td::uint32 ttl) {
305
  td::BufferSlice result(32 + 32 + 4);
306
  td::MutableSlice s = result.as_slice();
307
  s.copy_from(client.as_slice());
308
  s.remove_prefix(32);
309
  s.copy_from(dht_id.as_slice());
310
  s.remove_prefix(32);
311
  s.copy_from(std::string(reinterpret_cast<char *>(&ttl), 4));
312
  return result;
313
}
314

315
void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_registerReverseConnection &query,
316
                                  td::Promise<td::BufferSlice> promise) {
317
  td::uint32 ttl = query.ttl_, now = (td::uint32)td::Clocks::system();
318
  if (ttl <= now) {
319
    return;
320
  }
321
  PublicKey pub{query.node_};
322
  adnl::AdnlNodeIdShort client_id{pub.compute_short_id()};
323
  td::BufferSlice to_sign = register_reverse_connection_to_sign(client_id, src, ttl);
324
  TRY_RESULT_PROMISE(promise, encryptor, pub.create_encryptor());
325
  TRY_STATUS_PROMISE(promise, encryptor->check_signature(to_sign, query.signature_));
326
  DhtKeyId key_id = get_reverse_connection_key(client_id).compute_key_id();
327
  reverse_connections_[client_id] = ReverseConnection{src, key_id, td::Timestamp::at_unix(std::min(ttl, now + 300))};
328
  promise.set_value(create_serialize_tl_object<ton_api::dht_stored>());
329
}
330

331
void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_requestReversePing &query,
332
                                  td::Promise<td::BufferSlice> promise) {
333
  adnl::AdnlNodeIdShort client{query.client_};
334
  auto it = reverse_connections_.find(client);
335
  if (it != reverse_connections_.end()) {
336
    if (it->second.ttl_.is_in_past()) {
337
      reverse_connections_.erase(it);
338
    } else {
339
      PublicKey pub{query.target_->id_};
340
      TRY_RESULT_PROMISE(promise, encryptor, pub.create_encryptor());
341
      TRY_STATUS_PROMISE(promise,
342
                         encryptor->check_signature(serialize_tl_object(query.target_, true), query.signature_));
343
      td::actor::send_closure(adnl_, &adnl::Adnl::send_message, id_, it->second.dht_node_,
344
                              create_serialize_tl_object<ton_api::dht_requestReversePingCont>(
345
                                  std::move(query.target_), std::move(query.signature_), query.client_));
346
      promise.set_result(create_serialize_tl_object<ton_api::dht_reversePingOk>());
347
      return;
348
    }
349
  }
350
  auto k = static_cast<td::uint32>(query.k_);
351
  if (k > max_k()) {
352
    k = max_k();
353
  }
354
  auto R = get_nearest_nodes(get_reverse_connection_key(client).compute_key_id(), k);
355
  promise.set_value(create_serialize_tl_object<ton_api::dht_clientNotFound>(R.tl()));
356
}
357

358
void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data,
359
                                  td::Promise<td::BufferSlice> promise) {
360
  if (client_only_) {
361
    return;
362
  }
363
  {
364
    auto R = fetch_tl_prefix<ton_api::dht_query>(data, true);
365
    if (R.is_ok()) {
366
      auto N = DhtNode::create(std::move(R.move_as_ok()->node_), network_id_);
367
      if (N.is_ok()) {
368
        auto node = N.move_as_ok();
369
        if (node.adnl_id().compute_short_id() == src) {
370
          auto key = node.get_key();
371
          add_full_node(key, std::move(node), true);
372
        } else {
373
          VLOG(DHT_WARNING) << this << ": dropping bad node: unexpected adnl id";
374
        }
375
      } else {
376
        VLOG(DHT_WARNING) << this << ": dropping bad node " << N.move_as_error();
377
      }
378
    }
379
  }
380
  auto R = fetch_tl_object<ton_api::Function>(std::move(data), true);
381

382
  if (R.is_error()) {
383
    VLOG(DHT_WARNING) << this << ": dropping unknown query to DHT node: " << R.move_as_error();
384
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "failed to parse dht query"));
385
    return;
386
  }
387

388
  auto Q = R.move_as_ok();
389
  if (td::Random::fast(0, 127) == 0) {
390
    VLOG(DHT_DEBUG) << this << ": ping=" << ping_queries_ << " fnode=" << find_node_queries_
391
                    << " fvalue=" << find_value_queries_ << " store=" << store_queries_
392
                    << " addrlist=" << get_addr_list_queries_;
393
    VLOG(DHT_DEBUG) << this << ": query to DHT from " << src << ": " << ton_api::to_string(Q);
394
  }
395

396
  VLOG(DHT_EXTRA_DEBUG) << this << ": query to DHT from " << src << ": " << ton_api::to_string(Q);
397

398
  ton_api::downcast_call(*Q, [&](auto &object) { this->process_query(src, object, std::move(promise)); });
399
}
400

401
void DhtMemberImpl::add_full_node(DhtKeyId key, DhtNode node, bool set_active) {
402
  VLOG(DHT_EXTRA_DEBUG) << this << ": adding full node " << key;
403

404
  auto eid = key ^ key_;
405
  auto bit = eid.count_leading_zeroes();
406
#ifndef NDEBUG
407
  for (td::uint32 i = 0; i < bit; i++) {
408
    CHECK(key.get_bit(i) == key_.get_bit(i));
409
  }
410
#endif
411
  if (bit < 256) {
412
    CHECK(key.get_bit(bit) != key_.get_bit(bit));
413
    buckets_[bit].add_full_node(key, std::move(node), adnl_, id_, network_id_, set_active);
414
  } else {
415
    CHECK(key == key_);
416
  }
417
}
418

419
void DhtMemberImpl::receive_ping(DhtKeyId key, DhtNode result) {
420
  VLOG(DHT_EXTRA_DEBUG) << this << ": received ping from " << key;
421

422
  auto eid = key ^ key_;
423
  auto bit = eid.count_leading_zeroes();
424
  if (bit < 256) {
425
    buckets_[bit].receive_ping(key, std::move(result), adnl_, id_);
426
  } else {
427
    CHECK(key == key_);
428
  }
429
}
430

431
void DhtMemberImpl::receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data) {
432
  auto F = fetch_tl_object<ton_api::dht_requestReversePingCont>(data, true);
433
  if (F.is_ok()) {
434
    auto S = [&]() -> td::Status {
435
      auto f = F.move_as_ok();
436
      adnl::AdnlNodeIdShort client{f->client_};
437
      if (!our_reverse_connections_.count(client)) {
438
        return td::Status::Error(PSTRING() << ": unknown id for reverse ping: " << client);
439
      }
440
      TRY_RESULT_PREFIX(node, adnl::AdnlNode::create(f->target_), "failed to parse node: ");
441
      TRY_RESULT_PREFIX(encryptor, node.pub_id().pubkey().create_encryptor(), "failed to create encryptor: ");
442
      TRY_STATUS_PREFIX(encryptor->check_signature(serialize_tl_object(f->target_, true), f->signature_),
443
                        "invalid signature: ");
444
      VLOG(DHT_INFO) << this << ": sending reverse ping to " << node.compute_short_id();
445
      td::actor::send_closure(adnl_, &adnl::Adnl::add_peer, client, node.pub_id(), node.addr_list());
446
      td::actor::send_closure(adnl_, &adnl::Adnl::send_message, client, node.compute_short_id(), td::BufferSlice());
447
      return td::Status::OK();
448
    }();
449
    if (S.is_error()) {
450
      VLOG(DHT_INFO) << this << ": " << S;
451
    }
452
  }
453
}
454

455
void DhtMemberImpl::set_value(DhtValue value, td::Promise<td::Unit> promise) {
456
  auto S = value.check();
457
  if (S.is_error()) {
458
    promise.set_error(std::move(S));
459
    return;
460
  }
461
  auto h = value.key_id();
462
  our_values_.emplace(h, value.clone());
463

464
  send_store(std::move(value), std::move(promise));
465
}
466

467
void DhtMemberImpl::get_value_in(DhtKeyId key, td::Promise<DhtValue> result) {
468
  auto P = td::PromiseCreator::lambda([key, promise = std::move(result), SelfId = actor_id(this), print_id = print_id(),
469
                                       adnl = adnl_, list = get_nearest_nodes(key, k_ * 2), k = k_, a = a_,
470
                                       network_id = network_id_, id = id_,
471
                                       client_only = client_only_](td::Result<DhtNode> R) mutable {
472
    R.ensure();
473
    td::actor::create_actor<DhtQueryFindValue>("FindValueQuery", key, print_id, id, std::move(list), k, a, network_id,
474
                                               R.move_as_ok(), client_only, SelfId, adnl, std::move(promise))
475
        .release();
476
  });
477

478
  get_self_node(std::move(P));
479
}
480

481
void DhtMemberImpl::register_reverse_connection(adnl::AdnlNodeIdFull client, td::Promise<td::Unit> promise) {
482
  auto client_short = client.compute_short_id();
483
  td::uint32 ttl = (td::uint32)td::Clocks::system() + 300;
484
  our_reverse_connections_.insert(client_short);
485
  auto key_id = get_reverse_connection_key(client_short).compute_key_id();
486
  td::actor::send_closure(keyring_, &keyring::Keyring::sign_message, client_short.pubkey_hash(),
487
                          register_reverse_connection_to_sign(client_short, id_, ttl),
488
                          [=, print_id = print_id(), list = get_nearest_nodes(key_id, k_ * 2), SelfId = actor_id(this),
489
                           promise = std::move(promise)](td::Result<td::BufferSlice> R) mutable {
490
                            TRY_RESULT_PROMISE_PREFIX(promise, signature, std::move(R), "Failed to sign: ");
491
                            td::actor::send_closure(SelfId, &DhtMemberImpl::get_self_node,
492
                                                    [=, list = std::move(list), signature = std::move(signature),
493
                                                     promise = std::move(promise)](td::Result<DhtNode> R) mutable {
494
                                                      R.ensure();
495
                                                      td::actor::create_actor<DhtQueryRegisterReverseConnection>(
496
                                                          "RegisterReverseQuery", key_id, std::move(client), ttl,
497
                                                          std::move(signature), print_id, id_, std::move(list), k_, a_,
498
                                                          network_id_, R.move_as_ok(), client_only_, SelfId, adnl_,
499
                                                          std::move(promise))
500
                                                          .release();
501
                                                    });
502
                          });
503
}
504

505
void DhtMemberImpl::request_reverse_ping(adnl::AdnlNode target, adnl::AdnlNodeIdShort client,
506
                                         td::Promise<td::Unit> promise) {
507
  auto pubkey_hash = target.compute_short_id().pubkey_hash();
508
  td::BufferSlice to_sign = serialize_tl_object(target.tl(), true);
509
  td::actor::send_closure(keyring_, &keyring::Keyring::sign_message, pubkey_hash, std::move(to_sign),
510
                          [SelfId = actor_id(this), promise = std::move(promise), target = std::move(target),
511
                           client](td::Result<td::BufferSlice> R) mutable {
512
                            TRY_RESULT_PROMISE(promise, signature, std::move(R));
513
                            td::actor::send_closure(SelfId, &DhtMemberImpl::request_reverse_ping_cont,
514
                                                    std::move(target), std::move(signature), client,
515
                                                    std::move(promise));
516
                          });
517
}
518

519
void DhtMemberImpl::request_reverse_ping_cont(adnl::AdnlNode target, td::BufferSlice signature,
520
                                              adnl::AdnlNodeIdShort client, td::Promise<td::Unit> promise) {
521
  auto it = reverse_connections_.find(client);
522
  if (it != reverse_connections_.end()) {
523
    if (it->second.ttl_.is_in_past()) {
524
      reverse_connections_.erase(it);
525
    } else {
526
      td::actor::send_closure(adnl_, &adnl::Adnl::send_message, id_, it->second.dht_node_,
527
                              create_serialize_tl_object<ton_api::dht_requestReversePingCont>(
528
                                  target.tl(), std::move(signature), client.bits256_value()));
529
      promise.set_result(td::Unit());
530
      return;
531
    }
532
  }
533
  auto key_id = get_reverse_connection_key(client).compute_key_id();
534
  get_self_node([=, target = std::move(target), signature = std::move(signature), promise = std::move(promise),
535
                 SelfId = actor_id(this), print_id = print_id(), list = get_nearest_nodes(key_id, k_ * 2),
536
                 client_only = client_only_](td::Result<DhtNode> R) mutable {
537
    R.ensure();
538
    td::actor::create_actor<DhtQueryRequestReversePing>(
539
        "RequestReversePing", client, std::move(target), std::move(signature), print_id, id_, std::move(list), k_, a_,
540
        network_id_, R.move_as_ok(), client_only, SelfId, adnl_, std::move(promise))
541
        .release();
542
  });
543
}
544

545
void DhtMemberImpl::check() {
546
  VLOG(DHT_INFO) << this << ": ping=" << ping_queries_ << " fnode=" << find_node_queries_
547
                 << " fvalue=" << find_value_queries_ << " store=" << store_queries_
548
                 << " addrlist=" << get_addr_list_queries_;
549
  for (auto &bucket : buckets_) {
550
    bucket.check(client_only_, adnl_, actor_id(this), id_);
551
  }
552
  if (next_save_to_db_at_.is_in_past()) {
553
    save_to_db();
554
  }
555

556
  if (values_.size() > 0) {
557
    auto it = values_.lower_bound(last_check_key_);
558
    if (it != values_.end() && it->first == last_check_key_) {
559
      it++;
560
    }
561
    if (it == values_.end()) {
562
      it = values_.begin();
563
    }
564

565
    td::uint32 cnt = 0;
566
    auto s = last_check_key_;
567
    while (values_.size() > 0 && cnt < 1 && it->first != s) {
568
      last_check_key_ = it->first;
569
      cnt++;
570
      if (it->second.expired()) {
571
        it = values_.erase(it);
572

573
        // do not republish soon-to-be-expired values
574
      } else if (it->second.ttl() > td::Clocks::system() + 60) {
575
        auto dist = distance(it->first, k_ + 10);
576

577
        if (dist == 0) {
578
          if (it->second.key().update_rule()->need_republish()) {
579
            auto P = td::PromiseCreator::lambda([print_id = print_id()](td::Result<td::Unit> R) {
580
              if (R.is_error()) {
581
                VLOG(DHT_INFO) << print_id << ": failed to store: " << R.move_as_error();
582
              }
583
            });
584
            send_store(it->second.clone(), std::move(P));
585
          }
586
          it++;
587
        } else if (dist >= k_ + 10) {
588
          it = values_.erase(it);
589
        } else {
590
          it++;
591
        }
592
      } else {
593
        it++;
594
      }
595
      if (values_.size() == 0) {
596
        break;
597
      }
598
      if (it == values_.end()) {
599
        it = values_.begin();
600
      }
601
    }
602
  }
603
  if (reverse_connections_.size() > 0) {
604
    auto it = reverse_connections_.upper_bound(last_check_reverse_conn_);
605
    if (it == reverse_connections_.end()) {
606
      it = reverse_connections_.begin();
607
    }
608
    last_check_reverse_conn_ = it->first;
609
    if (it->second.ttl_.is_in_past()) {
610
      reverse_connections_.erase(it);
611
    }
612
  }
613

614
  if (republish_att_.is_in_past()) {
615
    auto it = our_values_.lower_bound(last_republish_key_);
616
    if (it != our_values_.end() && it->first == last_republish_key_) {
617
      it++;
618
    }
619
    if (it == our_values_.end()) {
620
      it = our_values_.begin();
621
    }
622
    if (it != our_values_.end()) {
623
      if (it->second.ttl() > td::Clocks::system() + 60) {
624
        auto P = td::PromiseCreator::lambda([print_id = print_id()](td::Result<td::Unit> R) {
625
          if (R.is_error()) {
626
            VLOG(DHT_INFO) << print_id << ": failed to store: " << R.move_as_error();
627
          }
628
        });
629
        send_store(it->second.clone(), std::move(P));
630
      }
631
      last_republish_key_ = it->first;
632
    }
633
    republish_att_ = td::Timestamp::in(10.0 + td::Random::fast(0, 1000) * 0.001);
634
  }
635

636
  if (fill_att_.is_in_past()) {
637
    auto promise = td::PromiseCreator::lambda([](td::Result<DhtNodesList> R) {
638
      if (R.is_error()) {
639
        VLOG(DHT_WARNING) << "failed find self query: " << R.move_as_error();
640
      }
641
    });
642

643
    td::Bits256 x;
644

645
    td::uint32 t = td::Random::fast(0, 6);
646
    td::uint32 b = 64 - td::Random::fast(0, 1 << t);
647
    td::Random::secure_bytes(x.as_slice());
648
    for (td::uint32 i = 0; i < b; i++) {
649
      x.bits()[i] = key_.get_bit(i);
650
    }
651

652
    DhtKeyId key{x};
653
    auto P = td::PromiseCreator::lambda([key, promise = std::move(promise), SelfId = actor_id(this),
654
                                         print_id = print_id(), adnl = adnl_, list = get_nearest_nodes(key, k_ * 2),
655
                                         k = k_, a = a_, network_id = network_id_, id = id_,
656
                                         client_only = client_only_](td::Result<DhtNode> R) mutable {
657
      R.ensure();
658
      td::actor::create_actor<DhtQueryFindNodes>("FindNodesQuery", key, print_id, id, std::move(list), k, a, network_id,
659
                                                 R.move_as_ok(), client_only, SelfId, adnl, std::move(promise))
660
          .release();
661
    });
662

663
    get_self_node(std::move(P));
664

665
    fill_att_ = td::Timestamp::in(10.0 + td::Random::fast(0, 100) * 0.1);
666
  }
667
}
668

669
void DhtMemberImpl::dump(td::StringBuilder &sb) const {
670
  for (auto &B : buckets_) {
671
    B.dump(sb);
672
  }
673
}
674

675
void DhtMemberImpl::send_store(DhtValue value, td::Promise<td::Unit> promise) {
676
  value.check().ensure();
677
  auto key_id = value.key_id();
678

679
  auto P = td::PromiseCreator::lambda([value = std::move(value), print_id = print_id(), id = id_,
680
                                       client_only = client_only_, list = get_nearest_nodes(key_id, k_ * 2), k = k_,
681
                                       a = a_, network_id = network_id_, SelfId = actor_id(this), adnl = adnl_,
682
                                       promise = std::move(promise)](td::Result<DhtNode> R) mutable {
683
    R.ensure();
684
    td::actor::create_actor<DhtQueryStore>("StoreQuery", std::move(value), print_id, id, std::move(list), k, a,
685
                                           network_id, R.move_as_ok(), client_only, SelfId, adnl, std::move(promise))
686
        .release();
687
  });
688

689
  get_self_node(std::move(P));
690
}
691

692
void DhtMemberImpl::get_self_node(td::Promise<DhtNode> promise) {
693
  auto P = td::PromiseCreator::lambda([promise = std::move(promise), print_id = print_id(), id = id_,
694
                                       keyring = keyring_, client_only = client_only_,
695
                                       network_id = network_id_](td::Result<adnl::AdnlNode> R) mutable {
696
    R.ensure();
697
    auto node = R.move_as_ok();
698
    auto version = static_cast<td::int32>(td::Clocks::system());
699
    td::BufferSlice B = serialize_tl_object(
700
        DhtNode{node.pub_id(), node.addr_list(), version, network_id, td::BufferSlice{}}.tl(), true);
701
    if (!client_only) {
702
      CHECK(node.addr_list().size() > 0);
703
    }
704
    auto P = td::PromiseCreator::lambda([promise = std::move(promise), node = std::move(node), version,
705
                                         network_id](td::Result<td::BufferSlice> R) mutable {
706
      R.ensure();
707
      DhtNode n{node.pub_id(), node.addr_list(), version, network_id, R.move_as_ok()};
708
      promise.set_result(std::move(n));
709
    });
710
    td::actor::send_closure(keyring, &keyring::Keyring::sign_message, id.pubkey_hash(), std::move(B), std::move(P));
711
  });
712
  td::actor::send_closure(adnl_, &adnl::Adnl::get_self_node, id_, std::move(P));
713
}
714

715
td::Result<std::shared_ptr<DhtGlobalConfig>> Dht::create_global_config(tl_object_ptr<ton_api::dht_config_Global> conf) {
716
  td::uint32 k = 0, a = 0;
717
  td::int32 network_id = -1;
718
  tl_object_ptr<ton_api::dht_nodes> static_nodes;
719
  ton_api::downcast_call(*conf, td::overloaded(
720
                                    [&](ton_api::dht_config_global &f) {
721
                                      k = f.k_;
722
                                      a = f.a_;
723
                                      network_id = -1;
724
                                      static_nodes = std::move(f.static_nodes_);
725
                                    },
726
                                    [&](ton_api::dht_config_global_v2 &f) {
727
                                      k = f.k_;
728
                                      a = f.a_;
729
                                      network_id = f.network_id_;
730
                                      static_nodes = std::move(f.static_nodes_);
731
                                    }));
732
  if (k == 0) {
733
    k = DhtMember::default_k();
734
  } else if (k > DhtMember::max_k()) {
735
    return td::Status::Error(ErrorCode::protoviolation, PSTRING() << "bad value k=" << k);
736
  }
737
  if (a == 0) {
738
    a = DhtMember::default_a();
739
  } else if (a > DhtMember::max_a()) {
740
    return td::Status::Error(ErrorCode::protoviolation, PSTRING() << "bad value a=" << a);
741
  }
742
  DhtNodesList l{std::move(static_nodes), network_id};
743
  return std::make_shared<DhtGlobalConfig>(k, a, network_id, std::move(l));
744
}
745

746
}  // namespace dht
747

748
}  // namespace ton
749

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.