Ton

Форк
0
/
storage.cpp 
1364 строки · 45.9 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19

20
#include "td/utils/benchmark.h"
21
#include "td/utils/crypto.h"
22
#include "td/utils/Container.h"
23
#include "td/utils/misc.h"
24
#include "td/utils/optional.h"
25
#include "td/utils/overloaded.h"
26
#include "td/utils/Status.h"
27
#include "td/utils/Span.h"
28
#include "td/utils/tests.h"
29
#include "td/utils/Timer.h"
30
#include "td/utils/Time.h"
31
#include "td/utils/tl_helpers.h"
32
#include "td/utils/UInt.h"
33
#include "td/utils/VectorQueue.h"
34
#include "td/utils/ThreadSafeCounter.h"
35

36
#include "td/utils/filesystem.h"
37
#include "td/utils/port/path.h"
38

39
#include "tl-utils/tl-utils.hpp"
40

41
#include "auto/tl/ton_api.h"
42
#include "auto/tl/ton_api.hpp"
43

44
#include "td/actor/actor.h"
45

46
#include "td/db/utils/CyclicBuffer.h"
47

48
#include "vm/boc.h"
49
#include "vm/cells.h"
50
#include "vm/cellslice.h"
51
#include "vm/cells/MerkleProof.h"
52
#include "vm/cells/CellString.h"
53

54
#include "fec/fec.h"
55

56
#include "rldp2/RldpConnection.h"
57
#include "rldp2/LossSender.h"
58

59
#include "Bitset.h"
60
#include "PeerState.h"
61
#include "Torrent.h"
62
#include "TorrentCreator.h"
63

64
#include "NodeActor.h"
65
#include "PeerActor.h"
66

67
#include "MerkleTree.h"
68

69
constexpr td::uint64 Byte = 1;
70
constexpr td::uint64 KiloByte = (1 << 10) * Byte;
71
constexpr td::uint64 MegaByte = (1 << 10) * KiloByte;
72

73
using namespace ton::rldp2;
74

75
extern "C" {
76
double ndtri(double y0);
77
double ndtri(double y0);
78
double nbdtr(int k, int n, double p);
79
double bdtr(int k, int n, double p);
80
double pdtr(int k, double y);
81
double pdtri(int k, double y);
82
}
83

84
BENCH(Loss, "Loss") {
85
  LossSender sender(0.5, 1e-10);
86
  td::uint64 res = 0;
87
  for (int i = 0; i < n; i++) {
88
    res += sender.send_n(100000);
89
  }
90
  td::do_not_optimize_away(res);
91
}
92

93
TEST(Rldp, Loss) {
94
  td::bench(LossBench());
95
  ASSERT_EQ(96, LossSender(0.1, 1e-10).send_n_exact(64));
96
  ASSERT_EQ(86, LossSender(0.05, 1e-10).send_n_exact(64));
97
  ASSERT_EQ(75, LossSender(0.01, 1e-10).send_n_exact(64));
98
  ASSERT_EQ(70, LossSender(0.001, 1e-10).send_n_exact(64));
99

100
  for (auto p1 : {1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9, 1e-10}) {
101
    //CHECK(fabs(ndtri_fast(p1) - ndtri(1 - p1)) < 1e-6);
102
    for (auto loss : {0.5, 0.1, 0.01, 0.001, 0.0001}) {
103
      LossSender sender(loss, p1);
104
      for (auto n : {1, 10, 20, 50, 100, 250, 500, 1000, 2000, 4000, 8000, 16000, 32000}) {
105
        auto exact_m = sender.send_n_exact(n);
106
        auto approx_m = sender.send_n_approx_nbd(n);
107
        CHECK(!sender.has_good_approx() || std::abs(exact_m - approx_m) <= 1);
108
        //std::cerr << "p=" << loss << "\tS1=" << p1 << "\tn=" << n << "\tdiff=" << exact_m - approx_m << "\t" << exact_m
109
        //<< " " << approx_m << std::endl;
110
      }
111
    }
112
  }
113
}
114

115
TEST(Rldp, sub_or_zero) {
116
  ASSERT_EQ(10u, sub_or_zero(20, 10));
117
  ASSERT_EQ(0u, sub_or_zero(10, 20));
118
}
119

120
TEST(Rldp, RttStats) {
121
  RttStats stats;
122
  ASSERT_TRUE(stats.smoothed_rtt < 0);
123

124
  td::Timestamp now;
125
  stats.on_rtt_sample(-1, 0, now);
126
  ASSERT_TRUE(stats.smoothed_rtt < 0);
127
  stats.on_rtt_sample(1, -1, now);
128
  ASSERT_TRUE(stats.smoothed_rtt < 0);
129

130
  stats.on_rtt_sample(1, 0, now);
131
  stats.on_rtt_sample(2, 0, now);
132
  stats.on_rtt_sample(1, 0, now);
133
  stats.on_rtt_sample(2, 0, now);
134
  stats.on_rtt_sample(1, 0, now);
135
  stats.on_rtt_sample(2, 0, now);
136
  ASSERT_TRUE(fabs(stats.last_rtt - 2) < 1e-9);
137
  ASSERT_TRUE(fabs(stats.min_rtt - 1) < 1e-9);
138
  ASSERT_TRUE(1 < stats.smoothed_rtt && stats.smoothed_rtt < 2);
139
  ASSERT_TRUE(0.1 < stats.rtt_var && stats.rtt_var < 0.9);
140
}
141

142
TEST(Rldp, Ack) {
143
  Ack ack;
144
  ASSERT_TRUE(ack.on_got_packet(5));
145
  ASSERT_TRUE(!ack.on_got_packet(5));
146
  ASSERT_EQ(5u, ack.max_seqno);
147
  ASSERT_EQ(1u, ack.received_count);
148
  ASSERT_EQ(1u, ack.received_mask);
149

150
  ASSERT_TRUE(ack.on_got_packet(3));
151
  ASSERT_TRUE(!ack.on_got_packet(3));
152
  ASSERT_EQ(5u, ack.max_seqno);
153
  ASSERT_EQ(2u, ack.received_count);
154
  ASSERT_EQ(5u, ack.received_mask);
155

156
  ASSERT_TRUE(ack.on_got_packet(7));
157
  ASSERT_TRUE(!ack.on_got_packet(7));
158
  ASSERT_EQ(7u, ack.max_seqno);
159
  ASSERT_EQ(3u, ack.received_count);
160
  ASSERT_EQ(21u, ack.received_mask);
161

162
  ASSERT_TRUE(ack.on_got_packet(100));
163
  ASSERT_TRUE(!ack.on_got_packet(100));
164
  ASSERT_TRUE(!ack.on_got_packet(8));
165
  ASSERT_TRUE(!ack.on_got_packet(7));
166
  ASSERT_EQ(4u, ack.received_count);
167
  ASSERT_EQ(1u, ack.received_mask);
168
}
169

170
TEST(Rldp, SenderPackets) {
171
  td::Random::Xorshift128plus rnd(123);
172

173
  for (int test_i = 0; test_i < 100; test_i++) {
174
    Ack ack;
175
    SenderPackets sender;
176
    std::vector<td::uint32> in_flight;
177
    std::set<td::uint32> in_flight_set;
178
    std::set<td::uint32> received;
179
    std::set<td::uint32> dropped;
180
    std::set<td::uint32> no_ack;
181

182
    td::int32 now = 0;
183
    td::uint32 last_seqno = 0;
184

185
    td::uint32 window = rnd.fast(1, 100);
186

187
    auto send_query = [&]() {
188
      if (sender.in_flight_count() > window) {
189
        return;
190
      }
191
      last_seqno++;
192
      auto seqno = sender.next_seqno();
193
      CHECK(seqno == last_seqno);
194
      SenderPackets::Packet packet;
195
      packet.is_in_flight = true;
196
      packet.sent_at = td::Timestamp::at(now);
197
      packet.seqno = seqno;
198
      packet.size = 0;
199
      sender.send(packet);
200

201
      in_flight.push_back(seqno);
202
      in_flight_set.insert(seqno);
203
    };
204

205
    auto extract_in_flight_query = [&]() -> td::optional<td::uint32> {
206
      if (in_flight_set.empty()) {
207
        return {};
208
      }
209
      while (true) {
210
        auto position = rnd.fast(0, (int)in_flight.size() - 1);
211
        std::swap(in_flight[position], in_flight.back());
212
        auto seqno = in_flight.back();
213
        in_flight.pop_back();
214
        if (!in_flight_set.count(seqno)) {
215
          continue;
216
        }
217
        in_flight_set.erase(seqno);
218
        return seqno;
219
      }
220
    };
221

222
    auto receive_query = [&]() {
223
      auto o_seqno = extract_in_flight_query();
224
      if (!o_seqno) {
225
        return;
226
      }
227
      auto seqno = o_seqno.unwrap();
228
      if (ack.on_got_packet(seqno)) {
229
        received.insert(seqno);
230
      }
231
      no_ack.insert(seqno);
232
    };
233

234
    auto drop_query = [&]() {
235
      auto o_seqno = extract_in_flight_query();
236
      if (!o_seqno) {
237
        return;
238
      }
239
      auto seqno = o_seqno.unwrap();
240
      dropped.insert(seqno);
241
    };
242

243
    auto send_ack = [&]() {
244
      sender.on_ack(ack);
245
      no_ack.clear();
246
      ASSERT_EQ(received.size(), sender.received_count());
247
      //ASSERT_EQ(no_ack.size() + in_flight_set.size() + dropped.size(), sender.in_flight_count());
248
      if (!received.empty()) {
249
        ASSERT_EQ(*received.rbegin(), sender.max_packet().seqno);
250
      }
251
    };
252

253
    auto apply_limits = [&]() {
254
      auto till_seqno = sub_or_zero(sender.max_packet().seqno, rnd.fast(3, 31));
255
      SenderPackets::Limits limits;
256
      limits.sent_at = td::Timestamp::at(0);
257
      limits.seqno = till_seqno;
258
      //ASSERT_EQ(no_ack.size() + in_flight_set.size() + dropped.size(), sender.in_flight_count());
259

260
      in_flight_set.erase(in_flight_set.begin(), in_flight_set.lower_bound(till_seqno));
261
      dropped.erase(dropped.begin(), dropped.lower_bound(till_seqno));
262
      no_ack.erase(no_ack.begin(), no_ack.lower_bound(till_seqno));
263

264
      sender.drop_packets(limits);
265
      //LOG(ERROR) << td::tag("max_seqno", sender.max_packet().seqno);
266
      //LOG(ERROR) << td::tag("till_seqno", till_seqno);
267
      //LOG(ERROR) << td::tag("no_ack", no_ack);
268
      //LOG(ERROR) << td::tag("in_flight", in_flight);
269
      //LOG(ERROR) << td::tag("dropped", dropped);
270
      ASSERT_EQ(no_ack.size() + in_flight_set.size() + dropped.size(), sender.in_flight_count());
271
    };
272

273
    std::vector<td::RandomSteps::Step> steps_vec{
274
        {send_query, 0}, {receive_query, 0}, {drop_query, 0}, {send_ack, 0}, {apply_limits, 0}};
275
    for (auto &step : steps_vec) {
276
      step.weight = rnd.fast(1, 10);
277
    }
278
    td::RandomSteps steps{std::move(steps_vec)};
279
    for (int i = 0; i < 1000; i++) {
280
      steps.step(rnd);
281
    }
282
  }
283
}
284

285
TEST(Rldp, FecHelper) {
286
  FecHelper helper;
287
  td::uint32 x = 5;
288
  td::uint32 y = 5;
289
  td::uint32 n = 10;
290
  helper.symbols_count = n;
291
  ASSERT_EQ(n + x, helper.get_fec_symbols_count());
292
  ASSERT_EQ(n + x, helper.get_left_fec_symbols_count());
293
  helper.received_symbols_count = n + 1;
294
  ASSERT_EQ(n + x, helper.get_fec_symbols_count());
295
  ASSERT_EQ(x - 1, helper.get_left_fec_symbols_count());
296
  helper.received_symbols_count = n + x;
297
  ASSERT_EQ(n + x + y, helper.get_fec_symbols_count());
298
  ASSERT_EQ(y, helper.get_left_fec_symbols_count());
299
  helper.received_symbols_count = n + x + 1;
300
  ASSERT_EQ(n + x + y, helper.get_fec_symbols_count());
301
  ASSERT_EQ(y - 1, helper.get_left_fec_symbols_count());
302
  helper.received_symbols_count = n + x + y;
303
  ASSERT_EQ(n + x + 2 * y, helper.get_fec_symbols_count());
304
  ASSERT_EQ(y, helper.get_left_fec_symbols_count());
305
};
306

307
TEST(Rldp2, Pacer) {
308
  Pacer::Options options;
309
  options.initial_capacity = 0;
310
  options.initial_speed = 100;
311
  options.max_capacity = 1;
312
  options.time_granularity = 0.1;
313
  CHECK(options.initial_speed * options.time_granularity > options.max_capacity * 4);
314

315
  Pacer pacer(options);
316

317
  // send 1000 packets
318
  auto now = td::Timestamp::at(123);
319
  auto start = now;
320
  for (auto it = 0; it < 1000; it++) {
321
    CHECK(pacer.wakeup_at().is_in_past(now));
322
    auto o_wakeup_at = pacer.send(1, now);
323
    if (o_wakeup_at) {
324
      now = td::Timestamp::in(td::Random::fast(0.001, 0.1), o_wakeup_at.unwrap());
325
    }
326
  }
327
  double passed = now.at() - start.at();
328
  LOG_CHECK(passed > 9.9 && passed < 10.1) << passed;
329
}
330

331
class Sleep : public td::actor::Actor {
332
 public:
333
  static void put_to_sleep(td::actor::ActorId<Sleep> sleep, td::Timestamp till, td::Promise<td::Unit> promise) {
334
    send_closure(sleep, &Sleep::do_put_to_sleep, till, std::move(promise));
335
  }
336

337
  static TD_WARN_UNUSED_RESULT auto create() {
338
    return td::actor::create_actor<Sleep>("Sleep");
339
  }
340

341
 private:
342
  std::multimap<td::Timestamp, td::Promise<td::Unit>> pending_;
343

344
  void do_put_to_sleep(td::Timestamp till, td::Promise<td::Unit> promise) {
345
    pending_.emplace(till, std::move(promise));
346
    alarm_timestamp() = pending_.begin()->first;
347
  }
348

349
  void loop() override {
350
    while (!pending_.empty() && pending_.begin()->first.is_in_past()) {
351
      pending_.begin()->second.set_value(td::Unit());
352
      pending_.erase(pending_.begin());
353
    }
354

355
    if (!pending_.empty()) {
356
      alarm_timestamp() = pending_.begin()->first;
357
    }
358
  }
359
};
360

361
class NetChannel : public td::actor::Actor {
362
 public:
363
  struct Options {
364
    double loss{0};
365
    double rtt{0.1};
366

367
    double buffer{128 * KiloByte};
368
    double speed{1 * MegaByte};
369

370
    double alive_begin = -1;
371
    double sleep_step = 0;
372
    double alive_step = 1;
373

374
    static constexpr double eps = 1e-9;
375

376
    bool is_sleeping(double now) {
377
      if (sleep_step < eps) {
378
        return false;
379
      }
380
      return alive_begin > now + eps;
381
    }
382

383
    double calc_data(double l, double r) {
384
      if (sleep_step < eps) {
385
        return (r - l) * speed;
386
      }
387

388
      if (alive_begin < 0) {
389
        alive_begin = l;
390
      }
391
      double res = 0;
392
      while (true) {
393
        double alive_end = alive_begin + alive_step;
394
        if (l < alive_begin) {
395
          l = alive_begin;
396
        }
397
        if (l + eps > r) {
398
          break;
399
        } else if (r < alive_begin + eps) {
400
          break;
401
        } else if (l > alive_end - eps) {
402
          alive_begin += alive_step + sleep_step;
403
        } else {
404
          double new_l = td::min(alive_end, r);
405
          res += (new_l - l) * speed;
406
          l = new_l;
407
        }
408
      }
409
      return res;
410
    }
411

412
    double calc_wait(double need, double now) {
413
      constexpr double eps = 1e-9;
414
      if (sleep_step < eps) {
415
        return need / speed;
416
      }
417
      if (now < alive_begin) {
418
        return alive_begin - now;
419
      }
420
      return need / speed;
421
    }
422

423
    Options with_loss(double loss) {
424
      this->loss = loss;
425
      return *this;
426
    }
427
    Options with_rtt(double rtt) {
428
      this->rtt = rtt;
429
      return *this;
430
    }
431
    Options with_speed(double speed) {
432
      this->speed = speed;
433
      return *this;
434
    }
435
    Options with_buffer(double buffer) {
436
      this->buffer = buffer;
437
      return *this;
438
    }
439
    Options with_sleep_alive(double sleep, double alive) {
440
      this->sleep_step = sleep;
441
      this->alive_step = alive;
442
      return *this;
443
    }
444

445
    static Options perfect_net() {
446
      return NetChannel::Options().with_buffer(300 * MegaByte).with_loss(0).with_rtt(0.01).with_speed(100 * MegaByte);
447
    }
448
    static Options lossy_perfect_net() {
449
      return perfect_net().with_loss(0.1);
450
    }
451
    static Options bad_net() {
452
      return NetChannel::Options().with_buffer(128 * KiloByte).with_loss(0.1).with_rtt(0.2).with_speed(128 * KiloByte);
453
    }
454
  };
455

456
  static TD_WARN_UNUSED_RESULT td::actor::ActorOwn<NetChannel> create(Options options,
457
                                                                      td::actor::ActorId<Sleep> sleep) {
458
    return td::actor::create_actor<NetChannel>("NetChannel", options, std::move(sleep));
459
  }
460

461
  NetChannel(Options options, td::actor::ActorId<Sleep> sleep) : options_(options), sleep_(std::move(sleep)) {
462
  }
463

464
  td::uint64 total_sent() const {
465
    return total_sent_;
466
  }
467

468
  void send(size_t size, td::Promise<td::Unit> promise) {
469
    total_sent_ += size;
470
    if (total_size_ + (double)size > options_.buffer) {
471
      LOG(ERROR) << "OVERFLOW";
472
      promise.set_error(td::Status::Error("buffer overflow"));
473
      return;
474
    }
475
    if (td::Random::fast(0.0, 1.0) < options_.loss) {
476
      //LOG(ERROR) << "LOST";
477
      promise.set_error(td::Status::Error("lost"));
478
      return;
479
    }
480
    in_cnt_++;
481
    queue_.push(Query{size, std::move(promise)});
482
    total_size_ += (double)size;
483
    //auto span = queue_.as_mutable_span();
484
    //std::swap(span[td::Random::fast(0, (int)span.size() - 1)], span.back());
485
    yield();
486
  }
487

488
 private:
489
  struct Query {
490
    size_t size;
491
    td::Promise<td::Unit> promise;
492
  };
493
  Options options_;
494
  td::VectorQueue<Query> queue_;
495
  double total_size_{0};
496

497
  td::uint64 total_sent_{0};
498

499
  td::uint64 in_cnt_{0};
500
  td::uint64 out_cnt_{0};
501

502
  double got_{0};
503
  td::Timestamp got_at_{};
504

505
  td::actor::ActorId<Sleep> sleep_;
506

507
  void loop() override {
508
    auto now = td::Timestamp::now();
509
    if (got_at_) {
510
      got_ += options_.calc_data(got_at_.at(), now.at());
511
    }
512
    got_at_ = now;
513

514
    if (options_.is_sleeping(now.at())) {
515
      queue_ = {};
516
    }
517

518
    while (!queue_.empty() && (double)queue_.front().size < got_) {
519
      auto query = queue_.pop();
520
      got_ -= (double)query.size;
521
      total_size_ -= (double)query.size;
522
      out_cnt_++;
523
      Sleep::put_to_sleep(sleep_, td::Timestamp::in(options_.rtt), std::move(query.promise));
524
    }
525

526
    if (queue_.empty()) {
527
      got_at_ = {};
528
      got_ = 0;
529
      return;
530
    }
531

532
    auto wait_bytes = ((double)queue_.front().size - got_);
533
    auto wait_duration = options_.calc_wait(wait_bytes, now.at());
534
    //LOG(ERROR) << "Wait " << td::format::as_size((td::size_t)wait_bytes) << " " << td::format::as_time(wait_duration)
535
    //<< " " << in_cnt_ << " " << out_cnt_ << " " << ok;
536
    alarm_timestamp() = td::Timestamp::in(wait_duration);
537
  }
538
};
539

540
class Rldp : public td::actor::Actor, public ConnectionCallback {
541
 public:
542
  struct Stats {
543
    td::uint64 received_bytes{0};
544
    td::uint64 sent_bytes{0};
545
    td::Timestamp last_received_packet_at{};
546
    td::Timestamp last_sent_packet_at{};
547
  };
548

549
  void receive_raw(td::BufferSlice raw) {
550
    stats_->received_bytes += raw.size();
551
    connection_.receive_raw(std::move(raw));
552
    yield();
553
  }
554

555
  void send(td::BufferSlice data, td::Promise<td::Unit> promise) {
556
    TransferId transfer_id;
557
    td::Random::secure_bytes(as_slice(transfer_id));
558
    connection_.send(transfer_id, std::move(data));
559
    queries_[transfer_id] = std::move(promise);
560
    yield();
561
  }
562

563
  void add_peer(td::actor::ActorId<Rldp> peer) {
564
    peer_ = peer;
565
    yield();
566
  }
567

568
  void send_raw(td::BufferSlice data) override {
569
    auto size = data.size();
570
    stats_->sent_bytes += size;
571
    send_closure(net_channel_, &NetChannel::send, size,
572
                 [data = std::move(data), peer = peer_](td::Result<td::Unit> res) mutable {
573
                   if (res.is_ok()) {
574
                     send_closure(peer, &Rldp::receive_raw, std::move(data));
575
                   }
576
                 });
577
  }
578
  void receive(TransferId, td::Result<td::BufferSlice> data) override {
579
    CHECK(data.is_ok());
580
    stats_->last_received_packet_at = td::Timestamp::now();
581
    //LOG(ERROR) << "GOT ";
582
  }
583

584
  void on_sent(TransferId query_id, td::Result<td::Unit> state) override {
585
    stats_->last_sent_packet_at = td::Timestamp::now();
586
    //LOG(ERROR) << "SENT " << query_id;
587
    auto it = queries_.find(query_id);
588
    CHECK(queries_.end() != it);
589
    it->second.set_result(std::move(state));
590
    queries_.erase(it);
591
  }
592

593
  explicit Rldp(td::actor::ActorOwn<NetChannel> net_channel, Stats *stats)
594
      : net_channel_(std::move(net_channel)), stats_(stats) {
595
    CHECK(stats_);
596
    connection_.set_default_mtu(1 << 31);
597
  }
598

599
 private:
600
  RldpConnection connection_;
601
  td::actor::ActorOwn<NetChannel> net_channel_;
602
  td::actor::ActorId<Rldp> peer_;
603
  std::map<TransferId, td::Promise<td::Unit>> queries_;
604
  Stats *stats_;
605

606
  void loop() override {
607
    alarm_timestamp() = connection_.run(*this);
608
  }
609
};
610

611
struct RldpBasicTest {
612
  struct Options {
613
    size_t count{10};
614
    size_t query_size{1000 * Byte};
615
    NetChannel::Options net_options;
616
    size_t concurrent_queries{1};
617

618
    Options with_concurrent_queries(size_t concurrent_queries) {
619
      this->concurrent_queries = concurrent_queries;
620
      return *this;
621
    }
622

623
    static Options create(size_t count, size_t query_size, NetChannel::Options net_options) {
624
      Options options;
625
      options.count = count;
626
      options.query_size = query_size;
627
      options.net_options = net_options;
628
      return options;
629
    }
630
  };
631

632
  class Test : public td::actor::Actor {
633
   public:
634
    Test(Options options, td::actor::ActorOwn<Rldp> alice, td::actor::ActorOwn<Rldp> bob,
635
         td::actor::ActorOwn<Sleep> sleep, Rldp::Stats *alice_stats, Rldp::Stats *bob_stats)
636
        : options_(options)
637
        , alice_(std::move(alice))
638
        , bob_(std::move(bob))
639
        , sleep_(std::move(sleep))
640
        , alice_stats_(alice_stats)
641
        , bob_stats_(bob_stats) {
642
    }
643

644
   private:
645
    Options options_;
646
    td::actor::ActorOwn<Rldp> alice_;
647
    td::actor::ActorOwn<Rldp> bob_;
648
    td::actor::ActorOwn<Sleep> sleep_;
649

650
    Rldp::Stats *alice_stats_;
651
    Rldp::Stats *bob_stats_;
652
    td::Timestamp start_at_;
653
    td::Timestamp last_query_at_;
654

655
    size_t query_id_{0};
656
    size_t got_query_id_{0};
657

658
    int cnt_{0};
659
    void close(td::actor::ActorOwn<td::actor::Actor> actor) {
660
      auto actor_copy = actor.get();
661
      actor.reset();
662
      send_lambda(actor_copy,
663
                  [x = td::create_destructor([self = actor_id(this)] { send_closure(self, &Test::on_closed); })]() {});
664
    }
665
    void on_closed() {
666
      cnt_--;
667
      if (cnt_ == 0) {
668
        td::actor::SchedulerContext::get()->stop();
669
        //LOG(ERROR) << "STOP";
670
        stop();
671
      }
672
    }
673

674
    void start_up() override {
675
      start_at_ = td::Timestamp::now();
676
      for (size_t i = 0; i < options_.concurrent_queries; i++) {
677
        try_send_query();
678
      }
679
    }
680

681
    void tear_down() override {
682
      td::StringBuilder sb;
683
      sb << "\n";
684
      sb << "Sent " << options_.count << " * " << td::format::as_size(options_.query_size) << " = "
685
         << td::format::as_size(options_.query_size * options_.count) << "\n";
686
      sb << "Time: " << td::format::as_time(alice_stats_->last_sent_packet_at.at() - start_at_.at()) << "\n";
687
      sb << "Extra time: "
688
         << td::format::as_time(alice_stats_->last_sent_packet_at.at() - bob_stats_->last_received_packet_at.at())
689
         << "\n";
690
      sb << "Data overhead: " << alice_stats_->sent_bytes - (options_.query_size * options_.count) << "\n";
691
      sb << "Data overhead: " << (double)alice_stats_->sent_bytes / (double)(options_.query_size * options_.count)
692
         << "\n";
693
      LOG(ERROR) << sb.as_cslice();
694
    }
695

696
    void try_send_query(td::Result<td::Unit> = {}) {
697
      if (query_id_ >= options_.count) {
698
        return;
699
      }
700
      query_id_++;
701
      //LOG(ERROR) << "Create " << query_id_;
702
      last_query_at_ = td::Timestamp::now();
703
      td::BufferSlice query(options_.query_size);
704
      query.as_slice().fill('A');
705
      //LOG(ERROR) << "SEND";
706
      send_closure(alice_, &Rldp::send, std::move(query),
707
                   [self = actor_id(this)](auto x) { send_closure(self, &Test::on_query_finished); });
708
    }
709
    void on_query_finished() {
710
      try_send_query();
711
      //Sleep::put_to_sleep(sleep_.get(), td::Timestamp::in(20),
712
      //td::promise_send_closure(actor_id(this), &Test::try_send_query));
713
      got_query_id_++;
714
      //LOG(ERROR) << "Finished " << got_query_id_;
715
      if (got_query_id_ < options_.count) {
716
        return;
717
      }
718
      if (cnt_ == 0) {
719
        cnt_ = 3;
720
        close(std::move(alice_));
721
        close(std::move(bob_));
722
        close(std::move(sleep_));
723
      }
724
      return;
725
    }
726
  };
727

728
  static void run(Options options) {
729
    td::actor::Scheduler scheduler({0}, true);
730
    auto alice_stats = std::make_unique<Rldp::Stats>();
731
    auto bob_stats = std::make_unique<Rldp::Stats>();
732

733
    scheduler.run_in_context([&] {
734
      auto sleep = Sleep::create();
735
      auto alice_to_bob = NetChannel::create(options.net_options, sleep.get());
736
      auto bob_to_alice = NetChannel::create(options.net_options, sleep.get());
737

738
      auto alice = td::actor::create_actor<Rldp>("Alice", std::move(alice_to_bob), alice_stats.get());
739
      auto bob = td::actor::create_actor<Rldp>("Bob", std::move(bob_to_alice), bob_stats.get());
740
      send_closure(alice, &Rldp::add_peer, bob.get());
741
      send_closure(bob, &Rldp::add_peer, alice.get());
742
      td::actor::create_actor<Test>("Test", options, std::move(alice), std::move(bob), std::move(sleep),
743
                                    alice_stats.get(), bob_stats.get())
744
          .release();
745
    });
746
    scheduler.run();
747
  }
748
};
749

750
TEST(Rldp, Main) {
751
  using Options = RldpBasicTest::Options;
752
  RldpBasicTest::run(Options::create(10, 10 * MegaByte, NetChannel::Options::perfect_net()));
753
  RldpBasicTest::run(Options::create(10 * 80, 10 * MegaByte / 80, NetChannel::Options::perfect_net()));
754
  RldpBasicTest::run(
755
      Options::create(10 * 80, 10 * MegaByte / 80, NetChannel::Options::perfect_net()).with_concurrent_queries(20));
756
  return;
757

758
  RldpBasicTest::run(
759
      Options::create(10, 10 * MegaByte, NetChannel::Options::perfect_net()).with_concurrent_queries(10));
760
  RldpBasicTest::run(Options::create(10, 10 * MegaByte, NetChannel::Options::perfect_net()));
761
  return;
762
  RldpBasicTest::run(Options::create(10, 10 * MegaByte, NetChannel::Options::bad_net()));
763
  RldpBasicTest::run(Options::create(10, 10 * MegaByte, NetChannel::Options::bad_net()).with_concurrent_queries(10));
764
  //RldpBasicTest::run(Options::create(10, 100 * MegaByte, NetChannel::Options::perfect_net().with_sleep_alive(10, 1)));
765
  return;
766

767
  RldpBasicTest::run(Options::create(1000, 1 * Byte, NetChannel::Options::lossy_perfect_net()));
768
  RldpBasicTest::run(Options::create(1, 100 * MegaByte, NetChannel::Options::lossy_perfect_net()));
769

770
  RldpBasicTest::run(Options::create(100, 1 * MegaByte, NetChannel::Options::bad_net()));
771

772
  RldpBasicTest::run(Options::create(1, 1 * Byte, NetChannel::Options::perfect_net()));
773
  RldpBasicTest::run(Options::create(1, 1 * MegaByte, NetChannel::Options::perfect_net()));
774

775
  RldpBasicTest::run(Options::create(1, 100 * MegaByte, NetChannel::Options::perfect_net()));
776
}
777
/*
778
TEST(MerkleTree, Manual) {
779
  td::Random::Xorshift128plus rnd(123);
780
  // create big random file
781
  size_t chunk_size = 768;
782
  // for simplicity numer of chunks in a file is a power of two
783
  size_t chunks_count = (1 << 16) + 1;
784
  size_t file_size = chunk_size * chunks_count;
785
  td::Timer timer;
786
  LOG(INFO) << "Generate random string";
787
  const auto file = td::rand_string('a', 'z', td::narrow_cast<int>(file_size));
788
  LOG(INFO) << timer;
789

790
  timer = {};
791
  LOG(INFO) << "Calculate all hashes";
792
  std::vector<td::Bits256> hashes(chunks_count);
793
  td::Bits256 bad_hash{};
794
  for (size_t i = 0; i < chunks_count; i++) {
795
    td::sha256(td::Slice(file).substr(i * chunk_size, chunk_size), hashes[i].as_slice());
796
  }
797
  LOG(INFO) << timer;
798

799
  timer = {};
800
  LOG(INFO) << "Init merkle tree";
801
  size_t i = 0;
802
  ton::MerkleTree tree(td::transform(hashes, [&i](auto &x) { return ton::MerkleTree::Piece{i++, x}; }));
803
  LOG(INFO) << timer;
804

805
  auto root_proof = tree.gen_proof(0, chunks_count - 1).move_as_ok();
806
  auto root_hash = tree.get_root_hash();
807

808
  // first download each chunk one by one
809

810
  for (size_t stride : {1 << 6, 1}) {
811
    timer = {};
812
    LOG(INFO) << "Gen all proofs, stride = " << stride;
813
    for (size_t i = 0; i < chunks_count; i += stride) {
814
      tree.gen_proof(i, i + stride - 1).move_as_ok();
815
    }
816
    LOG(INFO) << timer;
817
    timer = {};
818
    LOG(INFO) << "Proof size: " << vm::std_boc_serialize(tree.gen_proof(0, stride - 1).move_as_ok()).ok().size();
819
    LOG(INFO) << "Download file, stride = " << stride;
820
    {
821
      ton::MerkleTree new_tree(chunks_count, root_hash);
822
      ton::MerkleTree other_new_tree(chunks_count, root_hash);
823
      for (size_t i = 0; i < chunks_count; i += stride) {
824
        new_tree.gen_proof(i, i + stride - 1).ignore();
825
        new_tree.add_proof(tree.gen_proof(i, i + stride - 1).move_as_ok()).ensure();
826
        other_new_tree.add_proof(tree.gen_proof(i, i + stride - 1).move_as_ok()).ensure();
827
        other_new_tree.gen_proof(i, i + stride - 1).ensure();
828
        other_new_tree.get_root(2);
829
        std::vector<ton::MerkleTree::Piece> chunks;
830
        for (size_t j = 0; j < stride && i + j < chunks_count; j++) {
831
          chunks.push_back({i + j, hashes.at(i + j)});
832
        }
833
        new_tree.try_add_pieces(chunks).ensure();
834
      }
835

836
      if (stride == 1) {
837
        std::vector<ton::MerkleTree::Piece> chunks;
838

839
        for (size_t i = 0; i < chunks_count; i++) {
840
          if (rnd.fast(0, 1) == 1) {
841
            chunks.push_back({i, hashes[i]});
842
          } else {
843
            chunks.push_back({i, bad_hash});
844
          }
845
        }
846
        td::Bitset bitmask;
847
        other_new_tree.add_pieces(chunks, bitmask);
848
        for (size_t i = 0; i < chunks_count; i++) {
849
          auto expected = chunks[i].hash == hashes[i];
850
          auto got = bitmask.get(i);
851
          LOG_CHECK(expected == got) << expected << " " << got << " " << i;
852
        }
853
      }
854
    }
855
    LOG(INFO) << timer;
856
  }
857
}
858

859
TEST(MerkleTree, Stress) {
860
  td::Random::Xorshift128plus rnd(123);
861

862
  for (int t = 0; t < 100; t++) {
863
    td::Bits256 bad_hash{};
864
    size_t chunks_count = rnd.fast(5, 10);
865
    std::vector<td::Bits256> hashes(chunks_count);
866
    for (auto &hash : hashes) {
867
      char x = (char)rnd.fast(0, 255);
868
      for (auto &c : hash.as_slice()) {
869
        c = x;
870
      }
871
    }
872
    size_t i = 0;
873
    ton::MerkleTree tree(td::transform(hashes, [&i](auto &x) { return ton::MerkleTree::Piece{i++, x}; }));
874
    for (int t2 = 0; t2 < 1000; t2++) {
875
      std::vector<ton::MerkleTree::Piece> chunks;
876

877
      int mask = rnd.fast(0, (1 << chunks_count) - 1);
878
      for (size_t i = 0; i < chunks_count; i++) {
879
        if ((mask >> i) & 1) {
880
          chunks.push_back({i, hashes[i]});
881
        } else {
882
          chunks.push_back({i, bad_hash});
883
        }
884
      }
885
      td::Bitset bitmask_strict;
886
      td::Bitset bitmask;
887
      ton::MerkleTree new_tree(chunks_count, tree.get_root(rnd.fast(1, 5)));
888
      tree.add_pieces(chunks, bitmask_strict);
889
      new_tree.add_pieces(chunks, bitmask);
890
      for (size_t i = 0; i < chunks_count; i++) {
891
        auto expected = chunks[i].hash == hashes[i];
892
        auto strict_got = bitmask_strict.get(i);
893
        LOG_CHECK(strict_got == expected) << expected << " " << strict_got << " " << i;
894
        auto got = bitmask.get(i);
895
        // got => expected
896
        LOG_CHECK(!got || expected) << expected << " " << got << " " << i;
897
      }
898
    }
899
  }
900
};*/
901

902
struct TorrentMetas {
903
  td::optional<ton::Torrent> torrent;
904
  struct File {
905
    std::string name;
906
    td::BlobView buffer;
907
  };
908
  std::vector<File> files;
909
};
910

911
TorrentMetas create_random_torrent(td::Random::Xorshift128plus &rnd, td::int64 total_size = 0,
912
                                   td::int32 piece_size = 0) {
913
  ton::Torrent::Creator::Options options;
914
  if (piece_size == 0) {
915
    options.piece_size = rnd.fast(1, 1024);
916
  } else {
917
    options.piece_size = piece_size;
918
  }
919
  if (total_size == 0) {
920
    total_size = rnd.fast(100, 40000);
921
  }
922
  ton::Torrent::Creator creator{options};
923

924
  TorrentMetas res;
925
  auto files_n = rnd.fast(0, 40);
926
  for (int i = 0; i < files_n; i++) {
927
    auto name = PSTRING() << "#" << i << ".txt";
928
    td::int64 n = 0;
929
    auto left = files_n - i;
930
    if (left == 1) {
931
      n = total_size;
932
    } else {
933
      n = rnd.fast64(total_size / (left * 2), 2 * total_size / left);
934
    }
935
    total_size -= n;
936
    LOG(INFO) << i << "/" << files_n << " " << n;
937
    std::string data;
938
    size_t len = td::min(n, td::int64(1027));
939
    data.reserve(len);
940
    for (size_t i = 0; i < len; i++) {
941
      data += static_cast<char>(rnd.fast('a', 'z'));
942
    }
943
    res.files.emplace_back(TorrentMetas::File{name, td::CycicBlobView::create(td::BufferSlice(data), n).move_as_ok()});
944
    creator.add_blob(name, td::CycicBlobView::create(td::BufferSlice(data), n).move_as_ok()).ensure();
945
  }
946
  LOG(INFO) << "Finalize...";
947
  res.torrent = creator.finalize().move_as_ok();
948
  ton::Torrent::GetMetaOptions opt;
949
  LOG(INFO) << "Meta size (full): " << res.torrent.value().get_meta_str(ton::Torrent::GetMetaOptions()).size();
950
  LOG(INFO) << "Meta size (only proof): "
951
            << res.torrent.value().get_meta_str(ton::Torrent::GetMetaOptions().without_header()).size();
952
  LOG(INFO) << "Meta size (only small proof): "
953
            << res.torrent.value()
954
                   .get_meta_str(ton::Torrent::GetMetaOptions().without_header().with_proof_depth_limit(10))
955
                   .size();
956
  LOG(INFO) << "Meta size (only header): "
957
            << res.torrent.value().get_meta_str(ton::Torrent::GetMetaOptions().without_proof()).size();
958
  LOG(INFO) << "Meta size (min): "
959
            << res.torrent.value().get_meta_str(ton::Torrent::GetMetaOptions().without_proof().without_header()).size();
960
  return res;
961
}
962

963
TEST(Torrent, Meta) {
964
  td::Random::Xorshift128plus rnd(123);
965
  for (int test_i = 0; test_i < 100; test_i++) {
966
    auto torrent_files = create_random_torrent(rnd);
967
    auto torrent = torrent_files.torrent.unwrap();
968
    auto files = std::move(torrent_files.files);
969

970
    // test TorrentMeta
971
    auto torrent_str = torrent.get_meta_str();
972

973
    auto torrent_file = ton::TorrentMeta::deserialize(torrent_str).move_as_ok();
974
    CHECK(torrent_file.serialize() == torrent_str);
975
    torrent_str.back()++;
976
    ton::TorrentMeta::deserialize(torrent_str).ensure_error();
977
    CHECK(torrent.get_info().get_hash() == torrent_file.info.get_hash());
978

979
    ton::Torrent::Options options;
980
    options.in_memory = true;
981
    torrent_file.header = {};
982
    torrent_file.root_proof = {};
983
    auto new_torrent = ton::Torrent::open(options, torrent_file).move_as_ok();
984
    new_torrent.enable_write_to_files();
985

986
    std::vector<size_t> order;
987
    for (size_t i = 0; i < torrent.get_info().pieces_count(); i++) {
988
      order.push_back(i);
989
    }
990
    CHECK(!new_torrent.is_completed());
991
    auto header_parts =
992
        (torrent.get_info().header_size + torrent.get_info().piece_size - 1) / torrent.get_info().piece_size;
993
    random_shuffle(td::MutableSpan<size_t>(order).substr(header_parts), rnd);
994
    random_shuffle(td::MutableSpan<size_t>(order).truncate(header_parts + 10), rnd);
995
    for (auto piece_i : order) {
996
      auto piece_data = torrent.get_piece_data(piece_i).move_as_ok();
997
      auto piece_proof = torrent.get_piece_proof(piece_i).move_as_ok();
998
      new_torrent.add_piece(piece_i, std::move(piece_data), std::move(piece_proof)).ensure();
999
    }
1000
    CHECK(new_torrent.is_completed());
1001
    new_torrent.validate();
1002
    CHECK(new_torrent.is_completed());
1003
    for (auto &name_data : files) {
1004
      ASSERT_EQ(name_data.buffer.to_buffer_slice().move_as_ok(),
1005
                new_torrent.read_file(name_data.name).move_as_ok().as_slice());
1006
    }
1007
  }
1008
};
1009

1010
TEST(Torrent, OneFile) {
1011
  td::rmrf("first").ignore();
1012
  td::rmrf("second").ignore();
1013

1014
  td::mkdir("first").ensure();
1015
  td::mkdir("second").ensure();
1016

1017
  td::write_file("first/hello.txt", "Hello world!").ensure();
1018
  ton::Torrent::Creator::Options options;
1019
  //options.dir_name = "first/";
1020
  options.piece_size = 1024;
1021
  auto torrent = ton::Torrent::Creator::create_from_path(options, "first/hello.txt").move_as_ok();
1022
  auto meta = ton::TorrentMeta::deserialize(torrent.get_meta().serialize()).move_as_ok();
1023
  CHECK(torrent.is_completed());
1024

1025
  {
1026
    ton::Torrent::Options options;
1027
    options.root_dir = "first/";
1028
    auto other_torrent = ton::Torrent::open(options, meta).move_as_ok();
1029
    CHECK(!other_torrent.is_completed());
1030
    other_torrent.validate();
1031
    CHECK(other_torrent.is_completed());
1032
    CHECK(td::read_file("first/hello.txt").move_as_ok() == "Hello world!");
1033
  }
1034

1035
  {
1036
    ton::Torrent::Options options;
1037
    options.root_dir = "second/";
1038
    auto other_torrent = ton::Torrent::open(options, meta).move_as_ok();
1039
    other_torrent.enable_write_to_files();
1040
    CHECK(!other_torrent.is_completed());
1041
    other_torrent.add_piece(0, torrent.get_piece_data(0).move_as_ok(), torrent.get_piece_proof(0).move_as_ok())
1042
        .ensure();
1043
    CHECK(other_torrent.is_completed());
1044
    CHECK(td::read_file("second/hello.txt").move_as_ok() == "Hello world!");
1045
  }
1046
};
1047

1048
TEST(Torrent, PartsHelper) {
1049
  int parts_count = 100;
1050
  ton::PartsHelper parts(parts_count);
1051

1052
  auto a_token = parts.register_peer(1);
1053
  auto b_token = parts.register_peer(2);
1054
  auto c_token = parts.register_peer(3);
1055

1056
  parts.on_peer_part_ready(a_token, 1);
1057
  parts.on_peer_part_ready(a_token, 2);
1058
  parts.on_peer_part_ready(a_token, 3);
1059
  parts.on_peer_part_ready(b_token, 1);
1060
  parts.on_peer_part_ready(b_token, 2);
1061
  parts.on_peer_part_ready(c_token, 1);
1062
  ASSERT_EQ(0u, parts.get_rarest_parts(10).size());
1063

1064
  parts.set_peer_limit(a_token, 1);
1065
  ASSERT_EQ(1u, parts.get_rarest_parts(10).size());
1066
  parts.set_peer_limit(a_token, 2);
1067
  ASSERT_EQ(2u, parts.get_rarest_parts(10).size());
1068
  parts.set_peer_limit(a_token, 3);
1069
  ASSERT_EQ(3u, parts.get_rarest_parts(10).size());
1070
}
1071

1072
void print_debug(ton::Torrent *torrent) {
1073
  LOG(ERROR) << torrent->get_stats_str();
1074
}
1075

1076
TEST(Torrent, Peer) {
1077
  class PeerManager : public td::actor::Actor {
1078
   public:
1079
    void send_query(ton::PeerId src, ton::PeerId dst, td::BufferSlice query, td::Promise<td::BufferSlice> promise) {
1080
      send_closure(get_outbound_channel(src), &NetChannel::send, query.size(),
1081
                   promise.send_closure(actor_id(this), &PeerManager::do_send_query, src, dst, std::move(query)));
1082
    }
1083

1084
    void do_send_query(ton::PeerId src, ton::PeerId dst, td::BufferSlice query, td::Result<td::Unit> res,
1085
                       td::Promise<td::BufferSlice> promise) {
1086
      TRY_RESULT_PROMISE(promise, x, std::move(res));
1087
      (void)x;
1088
      send_closure(get_inbound_channel(dst), &NetChannel::send, query.size(),
1089
                   promise.send_closure(actor_id(this), &PeerManager::execute_query, src, dst, std::move(query)));
1090
    }
1091

1092
    void execute_query(ton::PeerId src, ton::PeerId dst, td::BufferSlice query, td::Result<td::Unit> res,
1093
                       td::Promise<td::BufferSlice> promise) {
1094
      TRY_RESULT_PROMISE(promise, x, std::move(res));
1095
      (void)x;
1096
      promise = promise.send_closure(actor_id(this), &PeerManager::send_response, src, dst);
1097
      auto it = peers_.find(std::make_pair(dst, src));
1098
      if (it == peers_.end()) {
1099
        LOG(ERROR) << "No such peer";
1100
        auto node_it = nodes_.find(dst);
1101
        if (node_it == nodes_.end()) {
1102
          LOG(ERROR) << "Unknown query destination";
1103
          promise.set_error(td::Status::Error("Unknown query destination"));
1104
          return;
1105
        }
1106
        send_closure(node_it->second, &ton::NodeActor::start_peer, src,
1107
                     [promise = std::move(promise),
1108
                      query = std::move(query)](td::Result<td::actor::ActorId<ton::PeerActor>> r_peer) mutable {
1109
                       TRY_RESULT_PROMISE(promise, peer, std::move(r_peer));
1110
                       send_closure(peer, &ton::PeerActor::execute_query, std::move(query), std::move(promise));
1111
                     });
1112
        return;
1113
      }
1114
      send_closure(it->second, &ton::PeerActor::execute_query, std::move(query), std::move(promise));
1115
    }
1116

1117
    void send_response(ton::PeerId src, ton::PeerId dst, td::Result<td::BufferSlice> r_response,
1118
                       td::Promise<td::BufferSlice> promise) {
1119
      TRY_RESULT_PROMISE(promise, response, std::move(r_response));
1120
      send_closure(get_outbound_channel(dst), &NetChannel::send, response.size(),
1121
                   promise.send_closure(actor_id(this), &PeerManager::do_send_response, src, dst, std::move(response)));
1122
    }
1123

1124
    void do_send_response(ton::PeerId src, ton::PeerId dst, td::BufferSlice response, td::Result<td::Unit> res,
1125
                          td::Promise<td::BufferSlice> promise) {
1126
      TRY_RESULT_PROMISE(promise, x, std::move(res));
1127
      (void)x;
1128
      send_closure(
1129
          get_inbound_channel(src), &NetChannel::send, response.size(),
1130
          promise.send_closure(actor_id(this), &PeerManager::do_execute_response, src, dst, std::move(response)));
1131
    }
1132

1133
    void do_execute_response(ton::PeerId src, ton::PeerId dst, td::BufferSlice response, td::Result<td::Unit> res,
1134
                             td::Promise<td::BufferSlice> promise) {
1135
      TRY_RESULT_PROMISE(promise, x, std::move(res));
1136
      (void)x;
1137
      promise.set_value(std::move(response));
1138
    }
1139

1140
    void register_peer(ton::PeerId src, ton::PeerId dst, td::actor::ActorId<ton::PeerActor> peer) {
1141
      peers_[std::make_pair(src, dst)] = std::move(peer);
1142
    }
1143

1144
    void register_node(ton::PeerId src, td::actor::ActorId<ton::NodeActor> node) {
1145
      nodes_[src] = std::move(node);
1146
    }
1147
    ~PeerManager() {
1148
      for (auto &it : inbound_channel_) {
1149
        LOG(ERROR) << it.first << " received " << td::format::as_size(it.second.get_actor_unsafe().total_sent());
1150
      }
1151
      for (auto &it : outbound_channel_) {
1152
        LOG(ERROR) << it.first << " sent " << td::format::as_size(it.second.get_actor_unsafe().total_sent());
1153
      }
1154
    }
1155

1156
   private:
1157
    std::map<std::pair<ton::PeerId, ton::PeerId>, td::actor::ActorId<ton::PeerActor>> peers_;
1158
    std::map<ton::PeerId, td::actor::ActorId<ton::NodeActor>> nodes_;
1159
    std::map<ton::PeerId, td::actor::ActorOwn<NetChannel>> inbound_channel_;
1160
    std::map<ton::PeerId, td::actor::ActorOwn<NetChannel>> outbound_channel_;
1161

1162
    td::actor::ActorOwn<Sleep> sleep_;
1163
    void start_up() override {
1164
      sleep_ = Sleep::create();
1165
    }
1166

1167
    td::actor::ActorId<NetChannel> get_outbound_channel(ton::PeerId peer_id) {
1168
      auto &res = outbound_channel_[peer_id];
1169
      if (res.empty()) {
1170
        NetChannel::Options options;
1171
        options.speed = 1000 * MegaByte;
1172
        options.buffer = 1000 * MegaByte;
1173
        options.rtt = 0;
1174
        res = NetChannel::create(options, sleep_.get());
1175
      }
1176
      return res.get();
1177
    }
1178
    td::actor::ActorId<NetChannel> get_inbound_channel(ton::PeerId peer_id) {
1179
      auto &res = inbound_channel_[peer_id];
1180
      if (res.empty()) {
1181
        NetChannel::Options options;
1182
        options.speed = 1000 * MegaByte;
1183
        options.buffer = 1000 * MegaByte;
1184
        options.rtt = 0;
1185
        res = NetChannel::create(options, sleep_.get());
1186
      }
1187
      return res.get();
1188
    }
1189
  };
1190

1191
  class PeerCreator : public ton::NodeActor::NodeCallback {
1192
   public:
1193
    PeerCreator(td::actor::ActorId<PeerManager> peer_manager, ton::PeerId self_id, std::vector<ton::PeerId> peers)
1194
        : peer_manager_(std::move(peer_manager)), peers_(std::move(peers)), self_id_(self_id) {
1195
    }
1196
    void get_peers(ton::PeerId src, td::Promise<std::vector<ton::PeerId>> promise) override {
1197
      auto peers = peers_;
1198
      promise.set_value(std::move(peers));
1199
    }
1200
    void register_self(td::actor::ActorId<ton::NodeActor> self) override {
1201
      self_ = self;
1202
      send_closure(peer_manager_, &PeerManager::register_node, self_id_, self_);
1203
    }
1204
    td::actor::ActorOwn<ton::PeerActor> create_peer(ton::PeerId self_id, ton::PeerId peer_id,
1205
                                                    std::shared_ptr<ton::PeerState> state) override {
1206
      class PeerCallback : public ton::PeerActor::Callback {
1207
       public:
1208
        PeerCallback(ton::PeerId self_id, ton::PeerId peer_id, td::actor::ActorId<PeerManager> peer_manager)
1209
            : self_id_{self_id}, peer_id_{peer_id}, peer_manager_(peer_manager) {
1210
        }
1211
        void register_self(td::actor::ActorId<ton::PeerActor> self) override {
1212
          self_ = std::move(self);
1213
          send_closure(peer_manager_, &PeerManager::register_peer, self_id_, peer_id_, self_);
1214
        }
1215
        void send_query(td::uint64 query_id, td::BufferSlice query) override {
1216
          CHECK(!self_.empty());
1217
          class X : public td::actor::Actor {
1218
           public:
1219
            void start_up() override {
1220
              //LOG(ERROR) << "start";
1221
              alarm_timestamp() = td::Timestamp::in(4);
1222
            }
1223
            void tear_down() override {
1224
              //LOG(ERROR) << "finish";
1225
            }
1226
            void alarm() override {
1227
              //LOG(FATAL) << "WTF?";
1228
              alarm_timestamp() = td::Timestamp::in(4);
1229
            }
1230
          };
1231
          send_closure(
1232
              peer_manager_, &PeerManager::send_query, self_id_, peer_id_, std::move(query),
1233
              [self = self_, query_id,
1234
               tmp = td::actor::create_actor<X>(PSLICE() << self_id_ << "->" << peer_id_ << " : " << query_id)](
1235
                  auto x) { promise_send_closure(self, &ton::PeerActor::on_query_result, query_id)(std::move(x)); });
1236
        }
1237

1238
       private:
1239
        ton::PeerId self_id_;
1240
        ton::PeerId peer_id_;
1241
        td::actor::ActorId<ton::PeerActor> self_;
1242
        td::actor::ActorId<PeerManager> peer_manager_;
1243
      };
1244

1245
      return td::actor::create_actor<ton::PeerActor>(PSLICE() << "ton::PeerActor " << self_id << "->" << peer_id,
1246
                                                     td::make_unique<PeerCallback>(self_id, peer_id, peer_manager_),
1247
                                                     std::move(state));
1248
    }
1249

1250
   private:
1251
    td::actor::ActorId<PeerManager> peer_manager_;
1252
    std::vector<ton::PeerId> peers_;
1253
    ton::PeerId self_id_;
1254
    td::actor::ActorId<ton::NodeActor> self_;
1255
  };
1256

1257
  class TorrentCallback : public ton::NodeActor::Callback {
1258
   public:
1259
    TorrentCallback(std::shared_ptr<td::Destructor> stop_watcher, std::shared_ptr<td::Destructor> complete_watcher)
1260
        : stop_watcher_(stop_watcher), complete_watcher_(complete_watcher) {
1261
    }
1262

1263
    void on_completed() override {
1264
      complete_watcher_.reset();
1265
    }
1266

1267
    void on_closed(ton::Torrent torrent) override {
1268
      CHECK(torrent.is_completed());
1269
      //TODO: validate torrent
1270
      stop_watcher_.reset();
1271
    }
1272

1273
   private:
1274
    std::shared_ptr<td::Destructor> stop_watcher_;
1275
    std::shared_ptr<td::Destructor> complete_watcher_;
1276
  };
1277

1278
  size_t peers_n = 20;
1279
  td::uint64 file_size = 200 * MegaByte;
1280
  td::Random::Xorshift128plus rnd(123);
1281
  LOG(INFO) << "Start create random_torrent of size " << file_size;
1282
  auto torrent = create_random_torrent(rnd, file_size, 128 * KiloByte).torrent.unwrap();
1283
  LOG(INFO) << "Random torrent is created";
1284

1285
  std::vector<ton::PeerId> peers;
1286
  for (size_t i = 1; i <= peers_n; i++) {
1287
    peers.push_back(i);
1288
  }
1289
  auto gen_peers = [&](size_t self_id, size_t n) {
1290
    std::vector<ton::PeerId> peers;
1291
    if (n > peers_n - 1) {
1292
      n = peers_n - 1;
1293
    }
1294
    while (n != 0) {
1295
      size_t id = rnd.fast(1, td::narrow_cast<int>(peers_n));
1296
      if (id == self_id) {
1297
        continue;
1298
      }
1299
      if (std::find(peers.begin(), peers.end(), id) != peers.end()) {
1300
        continue;
1301
      }
1302
      n--;
1303
      peers.push_back(id);
1304
    }
1305
    return peers;
1306
  };
1307

1308
  struct StatsActor : public td::actor::Actor {
1309
   public:
1310
    StatsActor(td::actor::ActorId<ton::NodeActor> node_actor) : node_actor_(node_actor) {
1311
    }
1312

1313
   private:
1314
    td::actor::ActorId<ton::NodeActor> node_actor_;
1315
    void start_up() override {
1316
      alarm_timestamp() = td::Timestamp::in(1);
1317
    }
1318
    void alarm() override {
1319
      send_closure(node_actor_, &ton::NodeActor::with_torrent, [](td::Result<ton::NodeActor::NodeState> r_state) {
1320
        if (r_state.is_error()) {
1321
          return;
1322
        }
1323
        print_debug(&r_state.ok().torrent);
1324
      });
1325
      alarm_timestamp() = td::Timestamp::in(4);
1326
    }
1327
  };
1328

1329
  auto info = torrent.get_info();
1330

1331
  auto stop_watcher = td::create_shared_destructor([] { td::actor::SchedulerContext::get()->stop(); });
1332
  auto guard = std::make_shared<std::vector<td::actor::ActorOwn<>>>();
1333
  auto complete_watcher = td::create_shared_destructor([guard] {});
1334

1335
  td::actor::Scheduler scheduler({0}, true);
1336

1337
  scheduler.run_in_context([&] {
1338
    auto peer_manager = td::actor::create_actor<PeerManager>("PeerManager");
1339
    guard->push_back(td::actor::create_actor<ton::NodeActor>(
1340
        "Node#1", 1, std::move(torrent),
1341
        td::make_unique<TorrentCallback>(stop_watcher, complete_watcher),
1342
        td::make_unique<PeerCreator>(peer_manager.get(), 1, gen_peers(1, 2)), nullptr, ton::SpeedLimiters{}));
1343
    for (size_t i = 2; i <= peers_n; i++) {
1344
      ton::Torrent::Options options;
1345
      options.in_memory = true;
1346
      auto other_torrent = ton::Torrent::open(options, ton::TorrentMeta(info)).move_as_ok();
1347
      auto node_actor = td::actor::create_actor<ton::NodeActor>(
1348
          PSLICE() << "Node#" << i, i, std::move(other_torrent),
1349
          td::make_unique<TorrentCallback>(stop_watcher, complete_watcher),
1350
          td::make_unique<PeerCreator>(peer_manager.get(), i, gen_peers(i, 2)),
1351
          nullptr, ton::SpeedLimiters{});
1352

1353
      if (i == 3) {
1354
        td::actor::create_actor<StatsActor>("StatsActor", node_actor.get()).release();
1355
      }
1356
      guard->push_back(std::move(node_actor));
1357
    }
1358
    guard->push_back(std::move(peer_manager));
1359
  });
1360
  stop_watcher.reset();
1361
  guard.reset();
1362
  complete_watcher.reset();
1363
  scheduler.run();
1364
}
1365

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.