Ton

Форк
0
/
actors_core.cpp 
1105 строк · 34.0 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19
#include "td/actor/core/ActorLocker.h"
20
#include "td/actor/actor.h"
21
#include "td/actor/PromiseFuture.h"
22

23
#include "td/utils/format.h"
24
#include "td/utils/logging.h"
25
#include "td/utils/port/thread.h"
26
#include "td/utils/Random.h"
27
#include "td/utils/Slice.h"
28
#include "td/utils/StringBuilder.h"
29
#include "td/utils/tests.h"
30
#include "td/utils/Time.h"
31

32
#include <array>
33
#include <atomic>
34
#include <deque>
35
#include <memory>
36

37
TEST(Actor2, signals) {
38
  using td::actor::core::ActorSignals;
39
  ActorSignals signals;
40
  signals.add_signal(ActorSignals::Wakeup);
41
  signals.add_signal(ActorSignals::Cpu);
42
  signals.add_signal(ActorSignals::Kill);
43
  signals.clear_signal(ActorSignals::Cpu);
44

45
  bool was_kill = false;
46
  bool was_wakeup = false;
47
  while (!signals.empty()) {
48
    auto s = signals.first_signal();
49
    if (s == ActorSignals::Kill) {
50
      was_kill = true;
51
    } else if (s == ActorSignals::Wakeup) {
52
      was_wakeup = true;
53
    } else {
54
      UNREACHABLE();
55
    }
56
    signals.clear_signal(s);
57
  }
58
  CHECK(was_kill && was_wakeup);
59
}
60

61
TEST(Actors2, flags) {
62
  using namespace td::actor::core;
63
  ActorState::Flags flags;
64
  CHECK(!flags.is_locked());
65
  flags.set_locked(true);
66
  CHECK(flags.is_locked());
67
  flags.set_locked(false);
68
  CHECK(!flags.is_locked());
69

70
  flags.set_scheduler_id(SchedulerId{123});
71

72
  auto signals = flags.get_signals();
73
  CHECK(signals.empty());
74
  signals.add_signal(ActorSignals::Cpu);
75
  signals.add_signal(ActorSignals::Kill);
76
  CHECK(signals.has_signal(ActorSignals::Cpu));
77
  CHECK(signals.has_signal(ActorSignals::Kill));
78
  flags.set_signals(signals);
79
  LOG_CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw();
80

81
  auto wakeup = ActorSignals{};
82
  wakeup.add_signal(ActorSignals::Wakeup);
83

84
  flags.add_signals(wakeup);
85
  signals.add_signal(ActorSignals::Wakeup);
86
  CHECK(flags.get_signals().raw() == signals.raw());
87

88
  flags.clear_signals();
89
  CHECK(flags.get_signals().empty());
90

91
  flags.add_signals(ActorSignals::one(ActorSignals::Pause));
92
  CHECK(flags.get_scheduler_id().value() == 123);
93
  CHECK(flags.get_signals().has_signal(ActorSignals::Pause));
94
}
95

96
TEST(Actor2, locker) {
97
  using namespace td::actor::core;
98
  ActorState state;
99

100
  ActorSignals kill_signal;
101
  kill_signal.add_signal(ActorSignals::Kill);
102

103
  ActorSignals wakeup_signal;
104
  wakeup_signal.add_signal(ActorSignals::Wakeup);
105

106
  ActorSignals cpu_signal;
107
  cpu_signal.add_signal(ActorSignals::Cpu);
108

109
  {
110
    ActorLocker lockerA(&state);
111
    ActorLocker lockerB(&state);
112
    ActorLocker lockerC(&state);
113

114
    CHECK(lockerA.try_lock());
115
    CHECK(lockerA.own_lock());
116
    auto flagsA = lockerA.flags();
117
    CHECK(lockerA.try_unlock(flagsA));
118
    CHECK(!lockerA.own_lock());
119

120
    CHECK(lockerA.try_lock());
121
    CHECK(!lockerB.try_lock());
122
    CHECK(!lockerC.try_lock());
123

124
    CHECK(lockerB.try_add_signals(kill_signal));
125
    CHECK(!lockerC.try_add_signals(wakeup_signal));
126
    CHECK(lockerC.try_add_signals(wakeup_signal));
127
    CHECK(!lockerC.add_signals(cpu_signal));
128
    CHECK(!lockerA.flags().has_signals());
129
    CHECK(!lockerA.try_unlock(lockerA.flags()));
130
    {
131
      auto flags = lockerA.flags();
132
      auto signals = flags.get_signals();
133
      bool was_kill = false;
134
      bool was_wakeup = false;
135
      bool was_cpu = false;
136
      while (!signals.empty()) {
137
        auto s = signals.first_signal();
138
        if (s == ActorSignals::Kill) {
139
          was_kill = true;
140
        } else if (s == ActorSignals::Wakeup) {
141
          was_wakeup = true;
142
        } else if (s == ActorSignals::Cpu) {
143
          was_cpu = true;
144
        } else {
145
          UNREACHABLE();
146
        }
147
        signals.clear_signal(s);
148
      }
149
      CHECK(was_kill && was_wakeup && was_cpu);
150
      flags.clear_signals();
151
      CHECK(lockerA.try_unlock(flags));
152
    }
153
  }
154

155
  {
156
    ActorLocker lockerB(&state);
157
    CHECK(lockerB.try_lock());
158
    CHECK(lockerB.try_unlock(lockerB.flags()));
159
    CHECK(lockerB.add_signals(kill_signal));
160
    CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
161
    auto flags = lockerB.flags();
162
    flags.clear_signals();
163
    ActorLocker lockerA(&state);
164
    CHECK(!lockerA.add_signals(kill_signal));
165
    CHECK(!lockerB.try_unlock(flags));
166
    CHECK(!lockerA.add_signals(kill_signal));  // do not loose this signal!
167
    CHECK(!lockerB.try_unlock(flags));
168
    CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
169
    CHECK(lockerB.try_unlock(flags));
170
  }
171

172
  {
173
    ActorLocker lockerA(&state);
174
    CHECK(lockerA.try_lock());
175
    auto flags = lockerA.flags();
176
    flags.add_signals(ActorSignals::one(ActorSignals::Pause));
177
    CHECK(lockerA.try_unlock(flags));
178
    //We have to lock, though we can't execute.
179
    CHECK(lockerA.add_signals(wakeup_signal));
180
  }
181
}
182

183
#if !TD_THREAD_UNSUPPORTED
184
TEST(Actor2, locker_stress) {
185
  using namespace td::actor::core;
186
  ActorState state;
187

188
  constexpr size_t threads_n = 5;
189
  auto stage = [&](std::atomic<int> &value, int need) {
190
    value.fetch_add(1, std::memory_order_release);
191
    while (value.load(std::memory_order_acquire) < need) {
192
      td::this_thread::yield();
193
    }
194
  };
195

196
  struct Node {
197
    std::atomic<td::uint32> request{0};
198
    td::uint32 response = 0;
199
    char pad[64];
200
  };
201
  std::array<Node, threads_n> nodes;
202
  auto do_work = [&]() {
203
    for (auto &node : nodes) {
204
      auto query = node.request.load(std::memory_order_acquire);
205
      if (query) {
206
        node.response = query * query;
207
        node.request.store(0, std::memory_order_relaxed);
208
      }
209
    }
210
  };
211

212
  std::atomic<int> begin{0};
213
  std::atomic<int> ready{0};
214
  std::atomic<int> check{0};
215
  std::vector<td::thread> threads;
216
  for (size_t i = 0; i < threads_n; i++) {
217
    threads.push_back(td::thread([&, id = i] {
218
      for (size_t i = 1; i < 1000000; i++) {
219
        ActorLocker locker(&state);
220
        auto need = static_cast<int>(threads_n * i);
221
        auto query = static_cast<td::uint32>(id + need);
222
        stage(begin, need);
223
        nodes[id].request = 0;
224
        nodes[id].response = 0;
225
        stage(ready, need);
226
        if (locker.try_lock()) {
227
          nodes[id].response = query * query;
228
        } else {
229
          auto cpu = ActorSignals::one(ActorSignals::Cpu);
230
          nodes[id].request.store(query, std::memory_order_release);
231
          locker.add_signals(cpu);
232
        }
233
        while (locker.own_lock()) {
234
          auto flags = locker.flags();
235
          auto signals = flags.get_signals();
236
          if (!signals.empty()) {
237
            do_work();
238
          }
239
          flags.clear_signals();
240
          locker.try_unlock(flags);
241
        }
242

243
        stage(check, need);
244
        if (id == 0) {
245
          CHECK(locker.add_signals(ActorSignals{}));
246
          CHECK(!locker.flags().has_signals());
247
          CHECK(locker.try_unlock(locker.flags()));
248
          for (size_t thread_id = 0; thread_id < threads_n; thread_id++) {
249
            LOG_CHECK(nodes[thread_id].response ==
250
                      static_cast<td::uint32>(thread_id + need) * static_cast<td::uint32>(thread_id + need))
251
                << td::tag("thread", thread_id) << " " << nodes[thread_id].response << " "
252
                << nodes[thread_id].request.load();
253
          }
254
        }
255
      }
256
    }));
257
  }
258
  for (auto &thread : threads) {
259
    thread.join();
260
  }
261
}
262

263
namespace {
264
const size_t BUF_SIZE = 1024 * 1024;
265
char buf[BUF_SIZE];
266
td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1));
267
}  // namespace
268

269
TEST(Actor2, executor_simple) {
270
  using namespace td::actor::core;
271
  using namespace td::actor;
272
  using td::actor::detail::ActorMessageCreator;
273
  struct Dispatcher : public SchedulerDispatcher {
274
    void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override {
275
      queue.push_back(std::move(ptr));
276
    }
277
    void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr) override {
278
      UNREACHABLE();
279
    }
280
    SchedulerId get_scheduler_id() const override {
281
      return SchedulerId{0};
282
    }
283
    std::deque<ActorInfoPtr> queue;
284
  };
285
  Dispatcher dispatcher;
286

287
  class TestActor : public Actor {
288
   public:
289
    void close() {
290
      stop();
291
    }
292

293
   private:
294
    void start_up() override {
295
      sb << "StartUp";
296
    }
297
    void tear_down() override {
298
      sb << "TearDown";
299
    }
300
  };
301
  {
302
    ActorInfoCreator actor_info_creator;
303
    auto actor = actor_info_creator.create(
304
        std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor"));
305
    dispatcher.add_to_queue(actor, SchedulerId{0}, false);
306

307
    {
308
      ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
309
      CHECK(!executor.is_closed());
310
      CHECK(executor.can_send_immediate());
311
      LOG_CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice();
312
      sb.clear();
313
      executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
314
      LOG_CHECK(sb.as_cslice() == "A") << sb.as_cslice();
315
      sb.clear();
316
      auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; });
317
      big_message.set_big();
318
      executor.send(std::move(big_message));
319
      LOG_CHECK(sb.as_cslice() == "") << sb.as_cslice();
320
      executor.send(ActorMessageCreator::lambda([&] { sb << "B"; }));
321
      LOG_CHECK(sb.as_cslice() == "") << sb.as_cslice();
322
    }
323
    CHECK(dispatcher.queue.size() == 1);
324
    { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); }
325
    CHECK(dispatcher.queue.size() == 1);
326
    dispatcher.queue.clear();
327
    LOG_CHECK(sb.as_cslice() == "bigB") << sb.as_cslice();
328
    sb.clear();
329
    {
330
      ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
331
      executor.send(
332
          ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
333
    }
334
    LOG_CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice();
335
    sb.clear();
336
    CHECK(!actor->has_actor());
337
    {
338
      ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
339
      executor.send(
340
          ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
341
    }
342
    CHECK(dispatcher.queue.empty());
343
    CHECK(sb.as_cslice() == "");
344
  }
345

346
  {
347
    ActorInfoCreator actor_info_creator;
348
    auto actor = actor_info_creator.create(
349
        std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor"));
350
    dispatcher.add_to_queue(actor, SchedulerId{0}, false);
351
    {
352
      ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
353
      CHECK(!executor.is_closed());
354
      CHECK(executor.can_send_immediate());
355
      LOG_CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice();
356
      sb.clear();
357
      auto a_msg = ActorMessageCreator::lambda([&] {
358
        sb << "big pause";
359
        ActorExecuteContext::get()->set_pause();
360
      });
361
      a_msg.set_big();
362
      executor.send(std::move(a_msg));
363
      executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
364
      LOG_CHECK(sb.as_cslice() == "") << sb.as_cslice();
365
    }
366
    {
367
      CHECK(dispatcher.queue.size() == 1);
368
      dispatcher.queue.clear();
369
      ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue());
370
      CHECK(!executor.is_closed());
371
      CHECK(!executor.can_send_immediate());
372
      LOG_CHECK(sb.as_cslice() == "big pause") << sb.as_cslice();
373
      sb.clear();
374
    }
375
    {
376
      CHECK(dispatcher.queue.size() == 1);
377
      dispatcher.queue.clear();
378
      ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue());
379
      CHECK(!executor.is_closed());
380
      CHECK(executor.can_send_immediate());
381
      LOG_CHECK(sb.as_cslice() == "A") << sb.as_cslice();
382
      sb.clear();
383
    }
384
    {
385
      ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
386
      executor.send(
387
          ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
388
    }
389
    LOG_CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice();
390
    sb.clear();
391
    dispatcher.queue.clear();
392
  }
393
}
394

395
using namespace td::actor;
396
using td::uint32;
397
static std::atomic<int> global_cnt;
398
class Worker : public Actor {
399
 public:
400
  void query(uint32 x, core::ActorInfoPtr master);
401
  void close() {
402
    stop();
403
  }
404
};
405
class Master : public Actor {
406
 public:
407
  void on_result(uint32 x, uint32 y) {
408
    loop();
409
  }
410

411
 private:
412
  uint32 l = 0;
413
  uint32 r = 1000;
414
  core::ActorInfoPtr worker;
415
  void start_up() override {
416
    worker = detail::create_actor<Worker>(ActorOptions().with_name("Master"));
417
    loop();
418
  }
419
  void loop() override {
420
    l++;
421
    if (l == r) {
422
      if (!--global_cnt) {
423
        SchedulerContext::get()->stop();
424
      }
425
      detail::send_closure(*worker, &Worker::close);
426
      stop();
427
      return;
428
    }
429
    detail::send_lambda(*worker,
430
                        [x = l, self = get_actor_info_ptr()] { detail::current_actor<Worker>().query(x, self); });
431
  }
432
};
433

434
void Worker::query(uint32 x, core::ActorInfoPtr master) {
435
  auto y = x;
436
  for (int i = 0; i < 100; i++) {
437
    y = y * y;
438
  }
439
  detail::send_lambda(*master, [result = y, x] { detail::current_actor<Master>().on_result(x, result); });
440
}
441

442
TEST(Actor2, scheduler_simple) {
443
  auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
444
  core::Scheduler scheduler{group_info, SchedulerId{0}, 2};
445
  scheduler.start();
446
  scheduler.run_in_context([] {
447
    global_cnt = 1000;
448
    for (int i = 0; i < global_cnt; i++) {
449
      detail::create_actor<Master>(ActorOptions().with_name("Master"));
450
    }
451
  });
452
  while (scheduler.run(1000)) {
453
  }
454
  core::Scheduler::close_scheduler_group(*group_info);
455
}
456

457
TEST(Actor2, actor_id_simple) {
458
  auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
459
  core::Scheduler scheduler{group_info, SchedulerId{0}, 2};
460
  sb.clear();
461
  scheduler.start();
462

463
  scheduler.run_in_context([] {
464
    class A : public Actor {
465
     public:
466
      A(int value) : value_(value) {
467
        sb << "A" << value_;
468
      }
469
      void hello() {
470
        sb << "hello";
471
      }
472
      ~A() {
473
        sb << "~A";
474
        if (--global_cnt <= 0) {
475
          SchedulerContext::get()->stop();
476
        }
477
      }
478

479
     private:
480
      int value_;
481
    };
482
    global_cnt = 1;
483
    auto id = create_actor<A>("A", 123);
484
    CHECK(sb.as_cslice() == "A123");
485
    sb.clear();
486
    send_closure(id, &A::hello);
487
  });
488
  while (scheduler.run(1000)) {
489
  }
490
  CHECK(sb.as_cslice() == "hello~A");
491
  core::Scheduler::close_scheduler_group(*group_info);
492
  sb.clear();
493
}
494

495
TEST(Actor2, actor_creation) {
496
  auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
497
  core::Scheduler scheduler{group_info, SchedulerId{0}, 1};
498
  scheduler.start();
499

500
  scheduler.run_in_context([] {
501
    class B;
502
    class A : public Actor {
503
     public:
504
      void f() {
505
        check();
506
        stop();
507
      }
508

509
     private:
510
      void start_up() override {
511
        check();
512
        create_actor<B>("Simple", actor_id(this)).release();
513
      }
514

515
      void check() {
516
        auto &context = *SchedulerContext::get();
517
        CHECK(context.has_poll());
518
        context.get_poll();
519
      }
520

521
      void tear_down() override {
522
        if (--global_cnt <= 0) {
523
          SchedulerContext::get()->stop();
524
        }
525
      }
526
    };
527

528
    class B : public Actor {
529
     public:
530
      B(ActorId<A> a) : a_(a) {
531
      }
532

533
     private:
534
      void start_up() override {
535
        auto &context = *SchedulerContext::get();
536
        CHECK(!context.has_poll());
537
        send_closure(a_, &A::f);
538
        stop();
539
      }
540
      void tear_down() override {
541
        if (--global_cnt <= 0) {
542
          SchedulerContext::get()->stop();
543
        }
544
      }
545
      ActorId<A> a_;
546
    };
547
    global_cnt = 2;
548
    create_actor<A>(ActorOptions().with_name("Poll").with_poll()).release();
549
  });
550
  while (scheduler.run(1000)) {
551
  }
552
  scheduler.stop();
553
  core::Scheduler::close_scheduler_group(*group_info);
554
}
555

556
TEST(Actor2, actor_timeout_simple) {
557
  auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
558
  core::Scheduler scheduler{group_info, SchedulerId{0}, 2};
559
  sb.clear();
560
  scheduler.start();
561

562
  auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
563
  scheduler.run_in_context([watcher = std::move(watcher)] {
564
    class A : public Actor {
565
     public:
566
      A(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
567
      }
568
      void start_up() override {
569
        set_timeout();
570
      }
571
      void alarm() override {
572
        double diff = td::Time::now() - expected_timeout_;
573
        LOG_CHECK(-0.001 < diff && diff < 0.1) << diff;
574
        if (cnt_-- > 0) {
575
          set_timeout();
576
        } else {
577
          stop();
578
        }
579
      }
580

581
     private:
582
      std::shared_ptr<td::Destructor> watcher_;
583
      double expected_timeout_;
584
      int cnt_ = 5;
585
      void set_timeout() {
586
        auto wakeup_timestamp = td::Timestamp::in(0.1);
587
        expected_timeout_ = wakeup_timestamp.at();
588
        alarm_timestamp() = wakeup_timestamp;
589
      }
590
    };
591
    create_actor<A>(core::ActorInfoCreator::Options().with_name("A").with_poll(), watcher).release();
592
    create_actor<A>(core::ActorInfoCreator::Options().with_name("B"), watcher).release();
593
  });
594
  watcher.reset();
595
  while (scheduler.run(1000)) {
596
  }
597
  core::Scheduler::close_scheduler_group(*group_info);
598
  sb.clear();
599
}
600

601
TEST(Actor2, actor_timeout_simple2) {
602
  auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
603
  core::Scheduler scheduler{group_info, SchedulerId{0}, 2};
604
  sb.clear();
605
  scheduler.start();
606

607
  auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
608
  scheduler.run_in_context([watcher = std::move(watcher)] {
609
    class A : public Actor {
610
     public:
611
      A(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
612
      }
613
      void start_up() override {
614
        set_timeout();
615
      }
616
      void alarm() override {
617
        set_timeout();
618
      }
619

620
     private:
621
      std::shared_ptr<td::Destructor> watcher_;
622
      void set_timeout() {
623
        auto wakeup_timestamp = td::Timestamp::in(0.001);
624
        alarm_timestamp() = wakeup_timestamp;
625
      }
626
    };
627
    class B : public Actor {
628
     public:
629
      B(std::shared_ptr<td::Destructor> watcher, ActorOwn<> actor_own)
630
          : watcher_(std::move(watcher)), actor_own_(std::move(actor_own)) {
631
      }
632
      void start_up() override {
633
        set_timeout();
634
      }
635
      void alarm() override {
636
        stop();
637
      }
638

639
     private:
640
      std::shared_ptr<td::Destructor> watcher_;
641
      ActorOwn<> actor_own_;
642
      void set_timeout() {
643
        auto wakeup_timestamp = td::Timestamp::in(0.005);
644
        alarm_timestamp() = wakeup_timestamp;
645
      }
646
    };
647
    auto actor_own = create_actor<A>(core::ActorInfoCreator::Options().with_name("A").with_poll(), watcher);
648
    create_actor<B>(core::ActorInfoCreator::Options().with_name("B"), watcher, std::move(actor_own)).release();
649
  });
650
  watcher.reset();
651
  while (scheduler.run(1000)) {
652
  }
653
  core::Scheduler::close_scheduler_group(*group_info);
654
  sb.clear();
655
}
656

657
TEST(Actor2, actor_function_result) {
658
  auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
659
  core::Scheduler scheduler{group_info, SchedulerId{0}, 2};
660
  sb.clear();
661
  scheduler.start();
662

663
  auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
664
  scheduler.run_in_context([watcher = std::move(watcher)] {
665
    class B : public Actor {
666
     public:
667
      uint32 query(uint32 x) {
668
        return x * x;
669
      }
670
      void query_async(uint32 x, td::Promise<uint32> promise) {
671
        promise(x * x);
672
      }
673
    };
674
    class A : public Actor {
675
     public:
676
      A(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
677
      }
678
      void on_result(uint32 x, td::Result<uint32> r_y) {
679
        auto y = r_y.move_as_ok();
680
        LOG_CHECK(x * x == y) << x << " " << y;
681
        if (--cnt_ == 0) {
682
          stop();
683
        }
684
      }
685
      void start_up() {
686
        b_ = create_actor<B>(ActorOptions().with_name("B"));
687
        cnt_ = 5;
688
        send_closure(b_, &B::query, 3, [a = std::make_unique<int>(), self = actor_id(this)](td::Result<uint32> y) {
689
          LOG_IF(ERROR, y.is_error()) << y.error();
690
          send_closure(self, &A::on_result, 3, y.ok());
691
        });
692
        send_closure(b_, &B::query_async, 2, [self = actor_id(this)](uint32 y) {
693
          CHECK(!self.empty());
694
          send_closure(self, &A::on_result, 2, y);
695
        });
696
        send_closure_later(b_, &B::query_async, 5, [self = actor_id(this)](uint32 y) {
697
          CHECK(!self.empty());
698
          send_closure(self, &A::on_result, 5, y);
699
        });
700
        auto future = future_send_closure(b_, &B::query, 7);
701
        future.finish(td::promise_send_closure(actor_id(this), &A::on_result, 7));
702
        //TODO: deduce Future type (i.e. Future<td::uint32>)
703
        auto future2 = future_send_closure<td::uint32>(b_, &B::query_async, 7);
704
        future2.finish(td::promise_send_closure(actor_id(this), &A::on_result, 7));
705
      }
706

707
     private:
708
      int cnt_{0};
709
      std::shared_ptr<td::Destructor> watcher_;
710
      td::actor::ActorOwn<B> b_;
711
    };
712
    create_actor<A>(core::ActorInfoCreator::Options().with_name("A").with_poll(), watcher).release();
713
    create_actor<A>(core::ActorInfoCreator::Options().with_name("B"), watcher).release();
714
  });
715
  watcher.reset();
716
  while (scheduler.run(1000)) {
717
  }
718
  core::Scheduler::close_scheduler_group(*group_info);
719
  sb.clear();
720
}
721

722
TEST(Actor2, actor_ping_pong) {
723
  Scheduler scheduler{{3}, false, Scheduler::Paused};
724
  sb.clear();
725
  scheduler.start();
726

727
  auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
728
  td::actor::set_debug(true);
729
  for (int i = 0; i < 2000; i++) {
730
    scheduler.run_in_context([watcher] {
731
      class PingPong : public Actor {
732
       public:
733
        PingPong(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
734
        }
735
        void query(td::int32 left, ActorOwn<> data) {
736
          if (td::Random::fast(0, 4) == 0) {
737
            alarm_timestamp() = td::Timestamp::in(0.01 * td::Random::fast(0, 10));
738
          }
739
          if (left <= 0) {
740
            return;
741
          }
742
          auto dest = td::Random::fast(0, (int)next_.size() - 1);
743
          if (td::Random::fast(0, 1) == 0) {
744
            send_closure(next_[dest], &PingPong::query, left - 1, std::move(data));
745
          } else {
746
            send_closure_later(next_[dest], &PingPong::query, left - 1, std::move(data));
747
          }
748
        }
749
        void add_next(ActorId<PingPong> p) {
750
          next_.push_back(std::move(p));
751
        }
752
        void start_up() override {
753
        }
754
        void store_data(ActorOwn<> data) {
755
          data_.push_back(std::move(data));
756
        }
757

758
       private:
759
        std::vector<ActorId<PingPong>> next_;
760
        std::vector<ActorOwn<>> data_;
761
        std::shared_ptr<td::Destructor> watcher_;
762
      };
763

764
      int N = td::Random::fast(2, 100);
765
      //N = 2;
766
      std::vector<ActorOwn<PingPong>> actors;
767
      for (int i = 0; i < N; i++) {
768
        actors.push_back(
769
            create_actor<PingPong>(core::ActorInfoCreator::Options().with_name(PSLICE() << "Worker#" << i), watcher));
770
      }
771
      for (int i = 0; i < N; i++) {
772
        for (int j = 0; j < N; j++) {
773
          send_closure(actors[i], &PingPong::add_next, actors[j].get());
774
        }
775
      }
776
      int nn = td::Random::fast(1, N);
777
      //nn = 2;
778
      auto first = actors[0].get();
779
      for (int i = 0; i < N; i++) {
780
        auto to = actors[i].get();
781
        if (i < nn) {
782
          send_closure(to, &PingPong::query, td::Random::fast(10, 1000), std::move(actors[i]));
783
        } else {
784
          send_closure(first, &PingPong::store_data, std::move(actors[i]));
785
        }
786
      }
787
    });
788
  }
789
  watcher.reset();
790
  while (scheduler.run(0.1)) {
791
    //scheduler.get_debug().dump();
792
  }
793
  sb.clear();
794
}
795

796
TEST(Actor2, Schedulers) {
797
  for (auto mode : {Scheduler::Running, Scheduler::Paused}) {
798
    for (auto start_count : {0, 1, 2}) {
799
      for (auto run_count : {0, 1, 2}) {
800
        for (auto stop_count : {0, 1, 2}) {
801
          for (size_t threads : {0, 1}) {
802
            Scheduler scheduler({threads}, false, mode);
803
            for (int i = 0; i < start_count; i++) {
804
              scheduler.start();
805
            }
806
            for (int i = 0; i < run_count; i++) {
807
              scheduler.run(0);
808
            }
809
            for (int i = 0; i < stop_count; i++) {
810
              scheduler.stop();
811
            }
812
          }
813
        }
814
      }
815
    }
816
  }
817
}
818
TEST(Actor2, SchedulerZeroCpuThreads) {
819
  Scheduler scheduler({0});
820
  scheduler.run_in_context([] {
821
    class A : public Actor {
822
      void start_up() override {
823
        SchedulerContext::get()->stop();
824
      }
825
    };
826
    create_actor<A>(ActorOptions().with_name("A").with_poll(false)).release();
827
  });
828
  scheduler.run();
829
}
830
TEST(Actor2, SchedulerTwo) {
831
  Scheduler scheduler({0, 0});
832
  scheduler.run_in_context([] {
833
    class B : public Actor {
834
     public:
835
      void start_up() override {
836
        CHECK(SchedulerContext::get()->get_scheduler_id() == SchedulerId{1});
837
      }
838
      void close() {
839
        CHECK(SchedulerContext::get()->get_scheduler_id() == SchedulerId{1});
840
        SchedulerContext::get()->stop();
841
      }
842
    };
843
    class A : public Actor {
844
      void start_up() override {
845
        CHECK(SchedulerContext::get()->get_scheduler_id() == SchedulerId{0});
846
        auto id =
847
            create_actor<B>(ActorOptions().with_name("B").with_poll(false).on_scheduler(SchedulerId{1})).release();
848
        send_closure(id, &B::close);
849
      }
850
    };
851
    create_actor<A>(ActorOptions().with_name("A").with_poll(false).on_scheduler(SchedulerId{0})).release();
852
  });
853
  scheduler.run();
854
}
855
TEST(Actor2, ActorIdDynamicCast) {
856
  Scheduler scheduler({0});
857
  scheduler.run_in_context([] {
858
    class A : public Actor {
859
     public:
860
      void close() {
861
        CHECK(actor_id().actor_info_ptr() == get_actor_info_ptr());
862
        SchedulerContext::get()->stop();
863
      }
864
    };
865
    auto actor_own_a = create_actor<A>(ActorOptions().with_name("A").with_poll(false));
866
    auto actor = &actor_own_a.get_actor_unsafe();
867
    ActorOwn<> actor_own = actor_dynamic_cast<Actor>(std::move(actor_own_a));
868
    CHECK(actor_own_a.empty());
869
    actor_own_a = actor_dynamic_cast<A>(std::move(actor_own));
870
    CHECK(actor_own.empty());
871
    CHECK(&actor_own_a.get_actor_unsafe() == actor);
872

873
    auto actor_id_a = actor_own_a.release();
874
    ActorId<> actor_id = actor_dynamic_cast<Actor>(actor_id_a);
875
    actor_id_a = actor_dynamic_cast<A>(actor_id);
876
    CHECK(&actor_id_a.get_actor_unsafe() == actor);
877

878
    auto actor_shared_a = ActorShared<A>(actor_id_a, 123);
879
    ActorShared<> actor_shared = actor_dynamic_cast<Actor>(std::move(actor_shared_a));
880
    CHECK(actor_shared_a.empty());
881
    CHECK(actor_shared.token() == 123);
882
    actor_shared_a = actor_dynamic_cast<A>(std::move(actor_shared));
883
    CHECK(actor_shared.empty());
884
    CHECK(&actor_shared_a.get_actor_unsafe() == actor);
885
    CHECK(actor_shared_a.token() == 123);
886

887
    send_closure(actor_shared_a, &A::close);
888
  });
889
  scheduler.run();
890
}
891

892
TEST(Actor2, send_vs_close) {
893
  for (int it = 0; it < 100; it++) {
894
    Scheduler scheduler({8});
895

896
    auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
897
    scheduler.run_in_context([watcher = std::move(watcher)] {
898
      class To : public Actor {
899
       public:
900
        class Callback {
901
         public:
902
          virtual ~Callback() {
903
          }
904
          virtual void on_closed(ActorId<To> to) = 0;
905
        };
906
        To(int cnt, std::shared_ptr<td::Destructor> watcher) : cnt_(cnt), watcher_(std::move(watcher)) {
907
        }
908
        void on_event() {
909
          if (--cnt_ <= 0) {
910
            stop();
911
          }
912
        }
913
        void add_callback(std::unique_ptr<Callback> callback) {
914
          callbacks_.push_back(std::move(callback));
915
        }
916
        void start_up() override {
917
          alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 4) * 0.001);
918
        }
919
        void tear_down() override {
920
          if (td::Random::fast(0, 4) == 0) {
921
            send_closure(actor_id(this), &To::self_ref, actor_id(this));
922
          }
923
          for (auto &callback : callbacks_) {
924
            callback->on_closed(actor_id(this));
925
          }
926
        }
927
        void self_ref(ActorId<To>) {
928
        }
929
        void alarm() override {
930
          stop();
931
        }
932

933
       private:
934
        int cnt_;
935
        std::shared_ptr<td::Destructor> watcher_;
936
        std::vector<std::unique_ptr<Callback>> callbacks_;
937
      };
938
      class From : public Actor {
939
       public:
940
        From(std::vector<ActorId<To>> to, std::shared_ptr<td::Destructor> watcher)
941
            : to_(std::move(to)), watcher_(std::move(watcher)) {
942
        }
943
        void start_up() override {
944
          yield();
945
        }
946
        void on_closed(ActorId<To>) {
947
        }
948
        void loop() override {
949
          while (!to_.empty()) {
950
            if (td::Random::fast(0, 3) == 0) {
951
              break;
952
            }
953
            auto id = to_.back();
954
            to_.pop_back();
955
            if (td::Random::fast(0, 4) == 0) {
956
              class Callback : public To::Callback {
957
               public:
958
                Callback(ActorId<From> from) : from_(std::move(from)) {
959
                }
960
                void on_closed(ActorId<To> id) override {
961
                  send_closure(from_, &From::on_closed, std::move(id));
962
                }
963

964
               private:
965
                ActorId<From> from_;
966
              };
967
              send_closure(id, &To::add_callback, std::make_unique<Callback>(actor_id(this)));
968
            }
969
            send_closure(id, &To::on_event);
970
          }
971
          if (to_.empty()) {
972
            stop();
973
          } else {
974
            yield();
975
          }
976
        }
977

978
       private:
979
        std::vector<ActorId<To>> to_;
980
        std::shared_ptr<td::Destructor> watcher_;
981
      };
982

983
      class Master : public Actor {
984
       public:
985
        Master(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
986
        }
987

988
       private:
989
        std::shared_ptr<td::Destructor> watcher_;
990
        int cnt_ = 10;
991
        void loop() override {
992
          if (cnt_-- < 0) {
993
            return stop();
994
          }
995
          int from_n = 5;
996
          int to_n = 5;
997
          std::vector<std::vector<ActorId<To>>> from(from_n);
998
          for (int i = 0; i < to_n; i++) {
999
            int cnt = td::Random::fast(1, 10);
1000
            int to_cnt = td::Random::fast(1, cnt);
1001
            auto to =
1002
                td::actor::create_actor<To>(
1003
                    td::actor::ActorOptions().with_name(PSLICE() << "To#" << i).with_poll(td::Random::fast(0, 4) == 0),
1004
                    to_cnt, watcher_)
1005
                    .release();
1006
            for (int j = 0; j < cnt; j++) {
1007
              auto from_i = td::Random::fast(0, from_n - 1);
1008
              from[from_i].push_back(to);
1009
            }
1010
          }
1011
          for (int i = 0; i < from_n; i++) {
1012
            td::actor::create_actor<From>(
1013
                td::actor::ActorOptions().with_name(PSLICE() << "From#" << i).with_poll(td::Random::fast(0, 4) == 0),
1014
                std::move(from[i]), watcher_)
1015
                .release();
1016
          }
1017
          alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 10) * 0.01 / 30);
1018
        }
1019
      };
1020
      td::actor::create_actor<Master>("Master", watcher).release();
1021
    });
1022

1023
    scheduler.run();
1024
  }
1025
}
1026
TEST(Actor2, send_vs_close2) {
1027
  for (int it = 0; it < 100; it++) {
1028
    Scheduler scheduler({8});
1029

1030
    auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
1031
    //std::shared_ptr<td::Destructor> watcher;
1032
    scheduler.run_in_context([watcher = std::move(watcher)] {
1033
      class To : public Actor {
1034
       public:
1035
        To(int cnt, std::shared_ptr<td::Destructor> watcher) : cnt_(cnt), watcher_(std::move(watcher)) {
1036
        }
1037
        void start_up() override {
1038
          alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 4) * 0.001 / 30);
1039
        }
1040
        void alarm() override {
1041
          stop();
1042
        }
1043

1044
       private:
1045
        int cnt_;
1046
        std::shared_ptr<td::Destructor> watcher_;
1047
      };
1048
      class From : public Actor {
1049
       public:
1050
        From(std::vector<ActorId<To>> to, std::shared_ptr<td::Destructor> watcher)
1051
            : to_(std::move(to)), watcher_(std::move(watcher)) {
1052
        }
1053
        void start_up() override {
1054
          stop();
1055
        }
1056

1057
       private:
1058
        std::vector<ActorId<To>> to_;
1059
        std::shared_ptr<td::Destructor> watcher_;
1060
      };
1061

1062
      class Master : public Actor {
1063
       public:
1064
        Master(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
1065
        }
1066

1067
       private:
1068
        std::shared_ptr<td::Destructor> watcher_;
1069
        int cnt_ = 5;
1070
        void loop() override {
1071
          if (cnt_-- < 0) {
1072
            return stop();
1073
          }
1074
          int from_n = 2;
1075
          int to_n = 2;
1076
          std::vector<std::vector<ActorId<To>>> from(from_n);
1077
          for (int i = 0; i < to_n; i++) {
1078
            int cnt = td::Random::fast(1, 2);
1079
            int to_cnt = td::Random::fast(1, cnt);
1080
            auto to =
1081
                td::actor::create_actor<To>(
1082
                    td::actor::ActorOptions().with_name(PSLICE() << "To#" << i).with_poll(td::Random::fast(0, 4) == 0),
1083
                    to_cnt, watcher_)
1084
                    .release();
1085
            for (int j = 0; j < cnt; j++) {
1086
              auto from_i = td::Random::fast(0, from_n - 1);
1087
              from[from_i].push_back(to);
1088
            }
1089
          }
1090
          for (int i = 0; i < from_n; i++) {
1091
            td::actor::create_actor<From>(
1092
                td::actor::ActorOptions().with_name(PSLICE() << "From#" << i).with_poll(td::Random::fast(0, 4) == 0),
1093
                std::move(from[i]), watcher_)
1094
                .release();
1095
          }
1096
          alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 10) * 0.01 / 30);
1097
        }
1098
      };
1099
      td::actor::create_actor<Master>("Master", watcher).release();
1100
    });
1101

1102
    scheduler.run();
1103
  }
1104
}
1105
#endif  //!TD_THREAD_UNSUPPORTED
1106

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.