2
This file is part of TON Blockchain Library.
4
TON Blockchain Library is free software: you can redistribute it and/or modify
5
it under the terms of the GNU Lesser General Public License as published by
6
the Free Software Foundation, either version 2 of the License, or
7
(at your option) any later version.
9
TON Blockchain Library is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU Lesser General Public License for more details.
14
You should have received a copy of the GNU Lesser General Public License
15
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
17
Copyright 2017-2020 Telegram Systems LLP
19
#include "td/actor/core/ActorLocker.h"
20
#include "td/actor/actor.h"
21
#include "td/actor/PromiseFuture.h"
23
#include "td/utils/format.h"
24
#include "td/utils/logging.h"
25
#include "td/utils/port/thread.h"
26
#include "td/utils/Random.h"
27
#include "td/utils/Slice.h"
28
#include "td/utils/StringBuilder.h"
29
#include "td/utils/tests.h"
30
#include "td/utils/Time.h"
37
TEST(Actor2, signals) {
38
using td::actor::core::ActorSignals;
40
signals.add_signal(ActorSignals::Wakeup);
41
signals.add_signal(ActorSignals::Cpu);
42
signals.add_signal(ActorSignals::Kill);
43
signals.clear_signal(ActorSignals::Cpu);
45
bool was_kill = false;
46
bool was_wakeup = false;
47
while (!signals.empty()) {
48
auto s = signals.first_signal();
49
if (s == ActorSignals::Kill) {
51
} else if (s == ActorSignals::Wakeup) {
56
signals.clear_signal(s);
58
CHECK(was_kill && was_wakeup);
62
using namespace td::actor::core;
63
ActorState::Flags flags;
64
CHECK(!flags.is_locked());
65
flags.set_locked(true);
66
CHECK(flags.is_locked());
67
flags.set_locked(false);
68
CHECK(!flags.is_locked());
70
flags.set_scheduler_id(SchedulerId{123});
72
auto signals = flags.get_signals();
73
CHECK(signals.empty());
74
signals.add_signal(ActorSignals::Cpu);
75
signals.add_signal(ActorSignals::Kill);
76
CHECK(signals.has_signal(ActorSignals::Cpu));
77
CHECK(signals.has_signal(ActorSignals::Kill));
78
flags.set_signals(signals);
79
LOG_CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw();
81
auto wakeup = ActorSignals{};
82
wakeup.add_signal(ActorSignals::Wakeup);
84
flags.add_signals(wakeup);
85
signals.add_signal(ActorSignals::Wakeup);
86
CHECK(flags.get_signals().raw() == signals.raw());
88
flags.clear_signals();
89
CHECK(flags.get_signals().empty());
91
flags.add_signals(ActorSignals::one(ActorSignals::Pause));
92
CHECK(flags.get_scheduler_id().value() == 123);
93
CHECK(flags.get_signals().has_signal(ActorSignals::Pause));
97
using namespace td::actor::core;
100
ActorSignals kill_signal;
101
kill_signal.add_signal(ActorSignals::Kill);
103
ActorSignals wakeup_signal;
104
wakeup_signal.add_signal(ActorSignals::Wakeup);
106
ActorSignals cpu_signal;
107
cpu_signal.add_signal(ActorSignals::Cpu);
110
ActorLocker lockerA(&state);
111
ActorLocker lockerB(&state);
112
ActorLocker lockerC(&state);
114
CHECK(lockerA.try_lock());
115
CHECK(lockerA.own_lock());
116
auto flagsA = lockerA.flags();
117
CHECK(lockerA.try_unlock(flagsA));
118
CHECK(!lockerA.own_lock());
120
CHECK(lockerA.try_lock());
121
CHECK(!lockerB.try_lock());
122
CHECK(!lockerC.try_lock());
124
CHECK(lockerB.try_add_signals(kill_signal));
125
CHECK(!lockerC.try_add_signals(wakeup_signal));
126
CHECK(lockerC.try_add_signals(wakeup_signal));
127
CHECK(!lockerC.add_signals(cpu_signal));
128
CHECK(!lockerA.flags().has_signals());
129
CHECK(!lockerA.try_unlock(lockerA.flags()));
131
auto flags = lockerA.flags();
132
auto signals = flags.get_signals();
133
bool was_kill = false;
134
bool was_wakeup = false;
135
bool was_cpu = false;
136
while (!signals.empty()) {
137
auto s = signals.first_signal();
138
if (s == ActorSignals::Kill) {
140
} else if (s == ActorSignals::Wakeup) {
142
} else if (s == ActorSignals::Cpu) {
147
signals.clear_signal(s);
149
CHECK(was_kill && was_wakeup && was_cpu);
150
flags.clear_signals();
151
CHECK(lockerA.try_unlock(flags));
156
ActorLocker lockerB(&state);
157
CHECK(lockerB.try_lock());
158
CHECK(lockerB.try_unlock(lockerB.flags()));
159
CHECK(lockerB.add_signals(kill_signal));
160
CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
161
auto flags = lockerB.flags();
162
flags.clear_signals();
163
ActorLocker lockerA(&state);
164
CHECK(!lockerA.add_signals(kill_signal));
165
CHECK(!lockerB.try_unlock(flags));
166
CHECK(!lockerA.add_signals(kill_signal)); // do not loose this signal!
167
CHECK(!lockerB.try_unlock(flags));
168
CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
169
CHECK(lockerB.try_unlock(flags));
173
ActorLocker lockerA(&state);
174
CHECK(lockerA.try_lock());
175
auto flags = lockerA.flags();
176
flags.add_signals(ActorSignals::one(ActorSignals::Pause));
177
CHECK(lockerA.try_unlock(flags));
178
//We have to lock, though we can't execute.
179
CHECK(lockerA.add_signals(wakeup_signal));
183
#if !TD_THREAD_UNSUPPORTED
184
TEST(Actor2, locker_stress) {
185
using namespace td::actor::core;
188
constexpr size_t threads_n = 5;
189
auto stage = [&](std::atomic<int> &value, int need) {
190
value.fetch_add(1, std::memory_order_release);
191
while (value.load(std::memory_order_acquire) < need) {
192
td::this_thread::yield();
197
std::atomic<td::uint32> request{0};
198
td::uint32 response = 0;
201
std::array<Node, threads_n> nodes;
202
auto do_work = [&]() {
203
for (auto &node : nodes) {
204
auto query = node.request.load(std::memory_order_acquire);
206
node.response = query * query;
207
node.request.store(0, std::memory_order_relaxed);
212
std::atomic<int> begin{0};
213
std::atomic<int> ready{0};
214
std::atomic<int> check{0};
215
std::vector<td::thread> threads;
216
for (size_t i = 0; i < threads_n; i++) {
217
threads.push_back(td::thread([&, id = i] {
218
for (size_t i = 1; i < 1000000; i++) {
219
ActorLocker locker(&state);
220
auto need = static_cast<int>(threads_n * i);
221
auto query = static_cast<td::uint32>(id + need);
223
nodes[id].request = 0;
224
nodes[id].response = 0;
226
if (locker.try_lock()) {
227
nodes[id].response = query * query;
229
auto cpu = ActorSignals::one(ActorSignals::Cpu);
230
nodes[id].request.store(query, std::memory_order_release);
231
locker.add_signals(cpu);
233
while (locker.own_lock()) {
234
auto flags = locker.flags();
235
auto signals = flags.get_signals();
236
if (!signals.empty()) {
239
flags.clear_signals();
240
locker.try_unlock(flags);
245
CHECK(locker.add_signals(ActorSignals{}));
246
CHECK(!locker.flags().has_signals());
247
CHECK(locker.try_unlock(locker.flags()));
248
for (size_t thread_id = 0; thread_id < threads_n; thread_id++) {
249
LOG_CHECK(nodes[thread_id].response ==
250
static_cast<td::uint32>(thread_id + need) * static_cast<td::uint32>(thread_id + need))
251
<< td::tag("thread", thread_id) << " " << nodes[thread_id].response << " "
252
<< nodes[thread_id].request.load();
258
for (auto &thread : threads) {
264
const size_t BUF_SIZE = 1024 * 1024;
266
td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1));
269
TEST(Actor2, executor_simple) {
270
using namespace td::actor::core;
271
using namespace td::actor;
272
using td::actor::detail::ActorMessageCreator;
273
struct Dispatcher : public SchedulerDispatcher {
274
void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override {
275
queue.push_back(std::move(ptr));
277
void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr) override {
280
SchedulerId get_scheduler_id() const override {
281
return SchedulerId{0};
283
std::deque<ActorInfoPtr> queue;
285
Dispatcher dispatcher;
287
class TestActor : public Actor {
294
void start_up() override {
297
void tear_down() override {
302
ActorInfoCreator actor_info_creator;
303
auto actor = actor_info_creator.create(
304
std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor"));
305
dispatcher.add_to_queue(actor, SchedulerId{0}, false);
308
ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
309
CHECK(!executor.is_closed());
310
CHECK(executor.can_send_immediate());
311
LOG_CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice();
313
executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
314
LOG_CHECK(sb.as_cslice() == "A") << sb.as_cslice();
316
auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; });
317
big_message.set_big();
318
executor.send(std::move(big_message));
319
LOG_CHECK(sb.as_cslice() == "") << sb.as_cslice();
320
executor.send(ActorMessageCreator::lambda([&] { sb << "B"; }));
321
LOG_CHECK(sb.as_cslice() == "") << sb.as_cslice();
323
CHECK(dispatcher.queue.size() == 1);
324
{ ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); }
325
CHECK(dispatcher.queue.size() == 1);
326
dispatcher.queue.clear();
327
LOG_CHECK(sb.as_cslice() == "bigB") << sb.as_cslice();
330
ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
332
ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
334
LOG_CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice();
336
CHECK(!actor->has_actor());
338
ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
340
ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
342
CHECK(dispatcher.queue.empty());
343
CHECK(sb.as_cslice() == "");
347
ActorInfoCreator actor_info_creator;
348
auto actor = actor_info_creator.create(
349
std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor"));
350
dispatcher.add_to_queue(actor, SchedulerId{0}, false);
352
ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
353
CHECK(!executor.is_closed());
354
CHECK(executor.can_send_immediate());
355
LOG_CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice();
357
auto a_msg = ActorMessageCreator::lambda([&] {
359
ActorExecuteContext::get()->set_pause();
362
executor.send(std::move(a_msg));
363
executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
364
LOG_CHECK(sb.as_cslice() == "") << sb.as_cslice();
367
CHECK(dispatcher.queue.size() == 1);
368
dispatcher.queue.clear();
369
ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue());
370
CHECK(!executor.is_closed());
371
CHECK(!executor.can_send_immediate());
372
LOG_CHECK(sb.as_cslice() == "big pause") << sb.as_cslice();
376
CHECK(dispatcher.queue.size() == 1);
377
dispatcher.queue.clear();
378
ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue());
379
CHECK(!executor.is_closed());
380
CHECK(executor.can_send_immediate());
381
LOG_CHECK(sb.as_cslice() == "A") << sb.as_cslice();
385
ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
387
ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
389
LOG_CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice();
391
dispatcher.queue.clear();
395
using namespace td::actor;
397
static std::atomic<int> global_cnt;
398
class Worker : public Actor {
400
void query(uint32 x, core::ActorInfoPtr master);
405
class Master : public Actor {
407
void on_result(uint32 x, uint32 y) {
414
core::ActorInfoPtr worker;
415
void start_up() override {
416
worker = detail::create_actor<Worker>(ActorOptions().with_name("Master"));
419
void loop() override {
423
SchedulerContext::get()->stop();
425
detail::send_closure(*worker, &Worker::close);
429
detail::send_lambda(*worker,
430
[x = l, self = get_actor_info_ptr()] { detail::current_actor<Worker>().query(x, self); });
434
void Worker::query(uint32 x, core::ActorInfoPtr master) {
436
for (int i = 0; i < 100; i++) {
439
detail::send_lambda(*master, [result = y, x] { detail::current_actor<Master>().on_result(x, result); });
442
TEST(Actor2, scheduler_simple) {
443
auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
444
core::Scheduler scheduler{group_info, SchedulerId{0}, 2};
446
scheduler.run_in_context([] {
448
for (int i = 0; i < global_cnt; i++) {
449
detail::create_actor<Master>(ActorOptions().with_name("Master"));
452
while (scheduler.run(1000)) {
454
core::Scheduler::close_scheduler_group(*group_info);
457
TEST(Actor2, actor_id_simple) {
458
auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
459
core::Scheduler scheduler{group_info, SchedulerId{0}, 2};
463
scheduler.run_in_context([] {
464
class A : public Actor {
466
A(int value) : value_(value) {
474
if (--global_cnt <= 0) {
475
SchedulerContext::get()->stop();
483
auto id = create_actor<A>("A", 123);
484
CHECK(sb.as_cslice() == "A123");
486
send_closure(id, &A::hello);
488
while (scheduler.run(1000)) {
490
CHECK(sb.as_cslice() == "hello~A");
491
core::Scheduler::close_scheduler_group(*group_info);
495
TEST(Actor2, actor_creation) {
496
auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
497
core::Scheduler scheduler{group_info, SchedulerId{0}, 1};
500
scheduler.run_in_context([] {
502
class A : public Actor {
510
void start_up() override {
512
create_actor<B>("Simple", actor_id(this)).release();
516
auto &context = *SchedulerContext::get();
517
CHECK(context.has_poll());
521
void tear_down() override {
522
if (--global_cnt <= 0) {
523
SchedulerContext::get()->stop();
528
class B : public Actor {
530
B(ActorId<A> a) : a_(a) {
534
void start_up() override {
535
auto &context = *SchedulerContext::get();
536
CHECK(!context.has_poll());
537
send_closure(a_, &A::f);
540
void tear_down() override {
541
if (--global_cnt <= 0) {
542
SchedulerContext::get()->stop();
548
create_actor<A>(ActorOptions().with_name("Poll").with_poll()).release();
550
while (scheduler.run(1000)) {
553
core::Scheduler::close_scheduler_group(*group_info);
556
TEST(Actor2, actor_timeout_simple) {
557
auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
558
core::Scheduler scheduler{group_info, SchedulerId{0}, 2};
562
auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
563
scheduler.run_in_context([watcher = std::move(watcher)] {
564
class A : public Actor {
566
A(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
568
void start_up() override {
571
void alarm() override {
572
double diff = td::Time::now() - expected_timeout_;
573
LOG_CHECK(-0.001 < diff && diff < 0.1) << diff;
582
std::shared_ptr<td::Destructor> watcher_;
583
double expected_timeout_;
586
auto wakeup_timestamp = td::Timestamp::in(0.1);
587
expected_timeout_ = wakeup_timestamp.at();
588
alarm_timestamp() = wakeup_timestamp;
591
create_actor<A>(core::ActorInfoCreator::Options().with_name("A").with_poll(), watcher).release();
592
create_actor<A>(core::ActorInfoCreator::Options().with_name("B"), watcher).release();
595
while (scheduler.run(1000)) {
597
core::Scheduler::close_scheduler_group(*group_info);
601
TEST(Actor2, actor_timeout_simple2) {
602
auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
603
core::Scheduler scheduler{group_info, SchedulerId{0}, 2};
607
auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
608
scheduler.run_in_context([watcher = std::move(watcher)] {
609
class A : public Actor {
611
A(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
613
void start_up() override {
616
void alarm() override {
621
std::shared_ptr<td::Destructor> watcher_;
623
auto wakeup_timestamp = td::Timestamp::in(0.001);
624
alarm_timestamp() = wakeup_timestamp;
627
class B : public Actor {
629
B(std::shared_ptr<td::Destructor> watcher, ActorOwn<> actor_own)
630
: watcher_(std::move(watcher)), actor_own_(std::move(actor_own)) {
632
void start_up() override {
635
void alarm() override {
640
std::shared_ptr<td::Destructor> watcher_;
641
ActorOwn<> actor_own_;
643
auto wakeup_timestamp = td::Timestamp::in(0.005);
644
alarm_timestamp() = wakeup_timestamp;
647
auto actor_own = create_actor<A>(core::ActorInfoCreator::Options().with_name("A").with_poll(), watcher);
648
create_actor<B>(core::ActorInfoCreator::Options().with_name("B"), watcher, std::move(actor_own)).release();
651
while (scheduler.run(1000)) {
653
core::Scheduler::close_scheduler_group(*group_info);
657
TEST(Actor2, actor_function_result) {
658
auto group_info = std::make_shared<core::SchedulerGroupInfo>(1);
659
core::Scheduler scheduler{group_info, SchedulerId{0}, 2};
663
auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
664
scheduler.run_in_context([watcher = std::move(watcher)] {
665
class B : public Actor {
667
uint32 query(uint32 x) {
670
void query_async(uint32 x, td::Promise<uint32> promise) {
674
class A : public Actor {
676
A(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
678
void on_result(uint32 x, td::Result<uint32> r_y) {
679
auto y = r_y.move_as_ok();
680
LOG_CHECK(x * x == y) << x << " " << y;
686
b_ = create_actor<B>(ActorOptions().with_name("B"));
688
send_closure(b_, &B::query, 3, [a = std::make_unique<int>(), self = actor_id(this)](td::Result<uint32> y) {
689
LOG_IF(ERROR, y.is_error()) << y.error();
690
send_closure(self, &A::on_result, 3, y.ok());
692
send_closure(b_, &B::query_async, 2, [self = actor_id(this)](uint32 y) {
693
CHECK(!self.empty());
694
send_closure(self, &A::on_result, 2, y);
696
send_closure_later(b_, &B::query_async, 5, [self = actor_id(this)](uint32 y) {
697
CHECK(!self.empty());
698
send_closure(self, &A::on_result, 5, y);
700
auto future = future_send_closure(b_, &B::query, 7);
701
future.finish(td::promise_send_closure(actor_id(this), &A::on_result, 7));
702
//TODO: deduce Future type (i.e. Future<td::uint32>)
703
auto future2 = future_send_closure<td::uint32>(b_, &B::query_async, 7);
704
future2.finish(td::promise_send_closure(actor_id(this), &A::on_result, 7));
709
std::shared_ptr<td::Destructor> watcher_;
710
td::actor::ActorOwn<B> b_;
712
create_actor<A>(core::ActorInfoCreator::Options().with_name("A").with_poll(), watcher).release();
713
create_actor<A>(core::ActorInfoCreator::Options().with_name("B"), watcher).release();
716
while (scheduler.run(1000)) {
718
core::Scheduler::close_scheduler_group(*group_info);
722
TEST(Actor2, actor_ping_pong) {
723
Scheduler scheduler{{3}, false, Scheduler::Paused};
727
auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
728
td::actor::set_debug(true);
729
for (int i = 0; i < 2000; i++) {
730
scheduler.run_in_context([watcher] {
731
class PingPong : public Actor {
733
PingPong(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
735
void query(td::int32 left, ActorOwn<> data) {
736
if (td::Random::fast(0, 4) == 0) {
737
alarm_timestamp() = td::Timestamp::in(0.01 * td::Random::fast(0, 10));
742
auto dest = td::Random::fast(0, (int)next_.size() - 1);
743
if (td::Random::fast(0, 1) == 0) {
744
send_closure(next_[dest], &PingPong::query, left - 1, std::move(data));
746
send_closure_later(next_[dest], &PingPong::query, left - 1, std::move(data));
749
void add_next(ActorId<PingPong> p) {
750
next_.push_back(std::move(p));
752
void start_up() override {
754
void store_data(ActorOwn<> data) {
755
data_.push_back(std::move(data));
759
std::vector<ActorId<PingPong>> next_;
760
std::vector<ActorOwn<>> data_;
761
std::shared_ptr<td::Destructor> watcher_;
764
int N = td::Random::fast(2, 100);
766
std::vector<ActorOwn<PingPong>> actors;
767
for (int i = 0; i < N; i++) {
769
create_actor<PingPong>(core::ActorInfoCreator::Options().with_name(PSLICE() << "Worker#" << i), watcher));
771
for (int i = 0; i < N; i++) {
772
for (int j = 0; j < N; j++) {
773
send_closure(actors[i], &PingPong::add_next, actors[j].get());
776
int nn = td::Random::fast(1, N);
778
auto first = actors[0].get();
779
for (int i = 0; i < N; i++) {
780
auto to = actors[i].get();
782
send_closure(to, &PingPong::query, td::Random::fast(10, 1000), std::move(actors[i]));
784
send_closure(first, &PingPong::store_data, std::move(actors[i]));
790
while (scheduler.run(0.1)) {
791
//scheduler.get_debug().dump();
796
TEST(Actor2, Schedulers) {
797
for (auto mode : {Scheduler::Running, Scheduler::Paused}) {
798
for (auto start_count : {0, 1, 2}) {
799
for (auto run_count : {0, 1, 2}) {
800
for (auto stop_count : {0, 1, 2}) {
801
for (size_t threads : {0, 1}) {
802
Scheduler scheduler({threads}, false, mode);
803
for (int i = 0; i < start_count; i++) {
806
for (int i = 0; i < run_count; i++) {
809
for (int i = 0; i < stop_count; i++) {
818
TEST(Actor2, SchedulerZeroCpuThreads) {
819
Scheduler scheduler({0});
820
scheduler.run_in_context([] {
821
class A : public Actor {
822
void start_up() override {
823
SchedulerContext::get()->stop();
826
create_actor<A>(ActorOptions().with_name("A").with_poll(false)).release();
830
TEST(Actor2, SchedulerTwo) {
831
Scheduler scheduler({0, 0});
832
scheduler.run_in_context([] {
833
class B : public Actor {
835
void start_up() override {
836
CHECK(SchedulerContext::get()->get_scheduler_id() == SchedulerId{1});
839
CHECK(SchedulerContext::get()->get_scheduler_id() == SchedulerId{1});
840
SchedulerContext::get()->stop();
843
class A : public Actor {
844
void start_up() override {
845
CHECK(SchedulerContext::get()->get_scheduler_id() == SchedulerId{0});
847
create_actor<B>(ActorOptions().with_name("B").with_poll(false).on_scheduler(SchedulerId{1})).release();
848
send_closure(id, &B::close);
851
create_actor<A>(ActorOptions().with_name("A").with_poll(false).on_scheduler(SchedulerId{0})).release();
855
TEST(Actor2, ActorIdDynamicCast) {
856
Scheduler scheduler({0});
857
scheduler.run_in_context([] {
858
class A : public Actor {
861
CHECK(actor_id().actor_info_ptr() == get_actor_info_ptr());
862
SchedulerContext::get()->stop();
865
auto actor_own_a = create_actor<A>(ActorOptions().with_name("A").with_poll(false));
866
auto actor = &actor_own_a.get_actor_unsafe();
867
ActorOwn<> actor_own = actor_dynamic_cast<Actor>(std::move(actor_own_a));
868
CHECK(actor_own_a.empty());
869
actor_own_a = actor_dynamic_cast<A>(std::move(actor_own));
870
CHECK(actor_own.empty());
871
CHECK(&actor_own_a.get_actor_unsafe() == actor);
873
auto actor_id_a = actor_own_a.release();
874
ActorId<> actor_id = actor_dynamic_cast<Actor>(actor_id_a);
875
actor_id_a = actor_dynamic_cast<A>(actor_id);
876
CHECK(&actor_id_a.get_actor_unsafe() == actor);
878
auto actor_shared_a = ActorShared<A>(actor_id_a, 123);
879
ActorShared<> actor_shared = actor_dynamic_cast<Actor>(std::move(actor_shared_a));
880
CHECK(actor_shared_a.empty());
881
CHECK(actor_shared.token() == 123);
882
actor_shared_a = actor_dynamic_cast<A>(std::move(actor_shared));
883
CHECK(actor_shared.empty());
884
CHECK(&actor_shared_a.get_actor_unsafe() == actor);
885
CHECK(actor_shared_a.token() == 123);
887
send_closure(actor_shared_a, &A::close);
892
TEST(Actor2, send_vs_close) {
893
for (int it = 0; it < 100; it++) {
894
Scheduler scheduler({8});
896
auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
897
scheduler.run_in_context([watcher = std::move(watcher)] {
898
class To : public Actor {
902
virtual ~Callback() {
904
virtual void on_closed(ActorId<To> to) = 0;
906
To(int cnt, std::shared_ptr<td::Destructor> watcher) : cnt_(cnt), watcher_(std::move(watcher)) {
913
void add_callback(std::unique_ptr<Callback> callback) {
914
callbacks_.push_back(std::move(callback));
916
void start_up() override {
917
alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 4) * 0.001);
919
void tear_down() override {
920
if (td::Random::fast(0, 4) == 0) {
921
send_closure(actor_id(this), &To::self_ref, actor_id(this));
923
for (auto &callback : callbacks_) {
924
callback->on_closed(actor_id(this));
927
void self_ref(ActorId<To>) {
929
void alarm() override {
935
std::shared_ptr<td::Destructor> watcher_;
936
std::vector<std::unique_ptr<Callback>> callbacks_;
938
class From : public Actor {
940
From(std::vector<ActorId<To>> to, std::shared_ptr<td::Destructor> watcher)
941
: to_(std::move(to)), watcher_(std::move(watcher)) {
943
void start_up() override {
946
void on_closed(ActorId<To>) {
948
void loop() override {
949
while (!to_.empty()) {
950
if (td::Random::fast(0, 3) == 0) {
953
auto id = to_.back();
955
if (td::Random::fast(0, 4) == 0) {
956
class Callback : public To::Callback {
958
Callback(ActorId<From> from) : from_(std::move(from)) {
960
void on_closed(ActorId<To> id) override {
961
send_closure(from_, &From::on_closed, std::move(id));
967
send_closure(id, &To::add_callback, std::make_unique<Callback>(actor_id(this)));
969
send_closure(id, &To::on_event);
979
std::vector<ActorId<To>> to_;
980
std::shared_ptr<td::Destructor> watcher_;
983
class Master : public Actor {
985
Master(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
989
std::shared_ptr<td::Destructor> watcher_;
991
void loop() override {
997
std::vector<std::vector<ActorId<To>>> from(from_n);
998
for (int i = 0; i < to_n; i++) {
999
int cnt = td::Random::fast(1, 10);
1000
int to_cnt = td::Random::fast(1, cnt);
1002
td::actor::create_actor<To>(
1003
td::actor::ActorOptions().with_name(PSLICE() << "To#" << i).with_poll(td::Random::fast(0, 4) == 0),
1006
for (int j = 0; j < cnt; j++) {
1007
auto from_i = td::Random::fast(0, from_n - 1);
1008
from[from_i].push_back(to);
1011
for (int i = 0; i < from_n; i++) {
1012
td::actor::create_actor<From>(
1013
td::actor::ActorOptions().with_name(PSLICE() << "From#" << i).with_poll(td::Random::fast(0, 4) == 0),
1014
std::move(from[i]), watcher_)
1017
alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 10) * 0.01 / 30);
1020
td::actor::create_actor<Master>("Master", watcher).release();
1026
TEST(Actor2, send_vs_close2) {
1027
for (int it = 0; it < 100; it++) {
1028
Scheduler scheduler({8});
1030
auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
1031
//std::shared_ptr<td::Destructor> watcher;
1032
scheduler.run_in_context([watcher = std::move(watcher)] {
1033
class To : public Actor {
1035
To(int cnt, std::shared_ptr<td::Destructor> watcher) : cnt_(cnt), watcher_(std::move(watcher)) {
1037
void start_up() override {
1038
alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 4) * 0.001 / 30);
1040
void alarm() override {
1046
std::shared_ptr<td::Destructor> watcher_;
1048
class From : public Actor {
1050
From(std::vector<ActorId<To>> to, std::shared_ptr<td::Destructor> watcher)
1051
: to_(std::move(to)), watcher_(std::move(watcher)) {
1053
void start_up() override {
1058
std::vector<ActorId<To>> to_;
1059
std::shared_ptr<td::Destructor> watcher_;
1062
class Master : public Actor {
1064
Master(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
1068
std::shared_ptr<td::Destructor> watcher_;
1070
void loop() override {
1076
std::vector<std::vector<ActorId<To>>> from(from_n);
1077
for (int i = 0; i < to_n; i++) {
1078
int cnt = td::Random::fast(1, 2);
1079
int to_cnt = td::Random::fast(1, cnt);
1081
td::actor::create_actor<To>(
1082
td::actor::ActorOptions().with_name(PSLICE() << "To#" << i).with_poll(td::Random::fast(0, 4) == 0),
1085
for (int j = 0; j < cnt; j++) {
1086
auto from_i = td::Random::fast(0, from_n - 1);
1087
from[from_i].push_back(to);
1090
for (int i = 0; i < from_n; i++) {
1091
td::actor::create_actor<From>(
1092
td::actor::ActorOptions().with_name(PSLICE() << "From#" << i).with_poll(td::Random::fast(0, 4) == 0),
1093
std::move(from[i]), watcher_)
1096
alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 10) * 0.01 / 30);
1099
td::actor::create_actor<Master>("Master", watcher).release();
1105
#endif //!TD_THREAD_UNSUPPORTED