2
This file is part of TON Blockchain Library.
4
TON Blockchain Library is free software: you can redistribute it and/or modify
5
it under the terms of the GNU Lesser General Public License as published by
6
the Free Software Foundation, either version 2 of the License, or
7
(at your option) any later version.
9
TON Blockchain Library is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU Lesser General Public License for more details.
14
You should have received a copy of the GNU Lesser General Public License
15
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
17
Copyright 2017-2020 Telegram Systems LLP
19
#include "td/actor/actor.h"
20
#include "td/actor/PromiseFuture.h"
21
#include "td/actor/MultiPromise.h"
22
#include "td/utils/MovableValue.h"
23
#include "td/utils/tests.h"
37
using Int = td::MovableValue<int>;
41
auto set_int = [](td::Result<Int> &destination) {
42
return [&destination](Int value) { destination = std::move(value); };
44
auto set_result_int = [](Result<Int> &destination) {
45
return [&destination](Result<Int> value) { destination = std::move(value); };
49
Result<Int> result{2};
51
Promise<Int> promise = set_int(result);
52
promise.set_value(Int{3});
54
ASSERT_TRUE(result.is_ok());
55
ASSERT_EQ(result.ok().get(), 3);
59
Result<Int> result{2};
61
Promise<Int> promise = set_int(result);
63
// will set Int{} on destruction
65
ASSERT_TRUE(result.is_ok());
66
ASSERT_EQ(result.ok().get(), 0);
70
Result<Int> result{2};
72
Promise<Int> promise = set_result_int(result);
73
promise.set_value(Int{3});
75
ASSERT_TRUE(result.is_ok());
76
ASSERT_EQ(result.ok().get(), 3);
80
Result<Int> result{2};
82
Promise<Int> promise = set_result_int(result);
84
// will set Status::Error() on destruction
86
ASSERT_TRUE(result.is_error());
90
std::unique_ptr<int> res;
91
Promise<td::Unit> x = [a = std::make_unique<int>(5), &res](td::Unit) mutable { res = std::move(a); };
97
//Promise<Int> promise;
98
//std::tuple<Promise<Int> &&> f(std::move(promise));
99
//std::tuple<Promise<Int>> x = std::move(f);
103
//using T = Result<int>;
104
//using T = std::unique_ptr<int>;
105
//using T = std::function<int()>;
106
//using T = std::vector<int>;
108
////using T = Promise<Int>;
110
//std::tuple<T &&> g(std::move(f));
111
//std::tuple<T> h = std::move(g);
116
auto promise = td::lambda_promise<int>([&](auto x) { result = x.move_as_ok(); });
117
promise.set_value(5);
118
ASSERT_EQ(5, result);
120
Promise<int> promise2 = [&](auto x) { result = x.move_as_ok(); };
121
promise2.set_value(6);
122
ASSERT_EQ(6, result);
126
TEST(Actor, safe_promise) {
129
td::Promise<int> promise = td::PromiseCreator::lambda([&](int x) { res = x; });
130
auto safe_promise = td::SafePromise<int>(std::move(promise), 2);
131
promise = std::move(safe_promise);
133
auto safe_promise2 = td::SafePromise<int>(std::move(promise), 3);
138
TEST(Actor, split_promise) {
141
using td::split_promise;
142
using td::SplitPromise;
144
td::optional<std::pair<int, double>> x;
145
auto pair = [&](Result<std::pair<int, double>> res) { x = res.move_as_ok(); };
146
static_assert(std::is_same<SplitPromise<decltype(pair)>::ArgT, std::pair<int, double>>::value, "A");
148
std::is_same<SplitPromise<decltype(pair)>::SplittedT, std::pair<Promise<int>, Promise<double>>>::value, "A");
149
auto splitted = split_promise(pair);
150
static_assert(std::is_same<decltype(splitted), std::pair<Promise<int>, Promise<double>>>::value, "A");
152
splitted.first.set_value(1);
153
splitted.second.set_value(2.0);
154
CHECK(x.unwrap() == std::make_pair(1, 2.0));
157
td::optional<std::tuple<int, double, std::string>> x;
158
auto triple = [&](Result<std::tuple<int, double, std::string>> res) { x = res.move_as_ok(); };
159
static_assert(std::is_same<SplitPromise<decltype(triple)>::ArgT, std::tuple<int, double, std::string>>::value, "A");
160
static_assert(std::is_same<SplitPromise<decltype(triple)>::SplittedT,
161
std::tuple<Promise<int>, Promise<double>, Promise<std::string>>>::value,
163
auto splitted = split_promise(triple);
165
std::is_same<decltype(splitted), std::tuple<Promise<int>, Promise<double>, Promise<std::string>>>::value, "A");
166
std::get<0>(splitted).set_value(1);
167
std::get<1>(splitted).set_value(2.0);
168
std::get<2>(splitted).set_value("hello");
169
CHECK(x.unwrap() == std::make_tuple(1, 2.0, "hello"));
173
auto pair = [&](Result<std::pair<int, double>> res) {
175
code = res.error().code();
177
auto splitted = split_promise(td::Promise<std::pair<int, double>>(pair));
178
splitted.second.set_error(td::Status::Error(123, "123"));
180
splitted.first.set_value(1);
185
TEST(Actor, promise_future) {
186
using td::make_promise_future;
188
auto pf = make_promise_future<int>();
189
td::optional<int> res;
190
pf.second.map([](int x) { return x * 2; }).map([](int x) { return x + 10; }).map([&](int x) {
195
pf.first.set_value(6);
196
ASSERT_EQ(22, res.unwrap());
199
LOG(ERROR) << "Second test";
200
td::optional<int> res;
202
.map([](int x) { return x * 2; })
203
.map([](int x) { return x + 10; })
204
.fmap([&](int x) { return td::make_future(x * 2); })
205
.finish([&](int x) { res = x; });
206
ASSERT_EQ(44, res.unwrap());
210
TEST(Actor2, actor_lost_promise) {
211
using namespace td::actor;
213
Scheduler scheduler({1}, false, Scheduler::Paused);
215
auto watcher = td::create_shared_destructor([] {
216
LOG(ERROR) << "STOP";
217
SchedulerContext::get()->stop();
219
scheduler.run_in_context([watcher = std::move(watcher)] {
220
class B : public Actor {
222
void start_up() override {
225
uint32 query(uint32 x) {
229
class A : public Actor {
231
A(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
234
b_ = create_actor<B>(ActorOptions().with_name("B"));
235
//send_closure(b_, &B::query, 2, [self = actor_id(this)](uint32 y) { send_closure(self, &A::on_result, 2, y); });
236
send_closure_later(b_, &B::query, 2,
237
[self = actor_id(this), a = std::make_unique<int>()](Result<uint32> y) mutable {
240
send_closure(self, &A::finish);
242
send_closure(b_, &B::query, 2, [self = actor_id(this), a = std::make_unique<int>()](Result<uint32> y) mutable {
245
send_closure(self, &A::finish);
249
LOG(ERROR) << "FINISH";
254
std::shared_ptr<td::Destructor> watcher_;
255
td::actor::ActorOwn<B> b_;
257
create_actor<A>(ActorOptions().with_name("A").with_poll(), watcher).release();
262
TEST(Actor2, MultiPromise) {
264
MultiPromise::Options fail_on_error;
265
fail_on_error.ignore_errors = false;
266
MultiPromise::Options ignore_errors;
267
ignore_errors.ignore_errors = true;
270
auto log = [&](Result<Unit> res) {
274
str += PSTRING() << "E" << res.error().code() << ";";
277
auto clear = [&] { str = ""; };
281
MultiPromise mp(ignore_errors);
283
auto mp_init = mp.init_guard();
284
mp_init.add_promise(log);
287
ASSERT_EQ("OK;", str);
292
MultiPromise mp(ignore_errors);
294
auto mp_init = mp.init_guard();
295
mp_init.add_promise(log);
296
mp_init.get_promise().set_error(Status::Error(1));
299
ASSERT_EQ("OK;", str);
304
MultiPromise mp(ignore_errors);
307
auto mp_init = mp.init_guard();
308
mp_init.add_promise(log);
309
promise = mp_init.get_promise();
313
auto mp_init = mp.add_promise_or_init(log);
314
ASSERT_TRUE(!mp_init);
316
promise.set_error(Status::Error(2));
317
ASSERT_EQ("OK;OK;", str);
320
auto mp_init = mp.add_promise_or_init(log);
321
ASSERT_TRUE(mp_init);
324
ASSERT_EQ("OK;", str);
329
MultiPromise mp(fail_on_error);
331
auto mp_init = mp.init_guard();
332
mp_init.get_promise().set_value(Unit());
333
mp_init.add_promise(log);
336
ASSERT_EQ("OK;", str);
341
MultiPromise mp(fail_on_error);
343
auto mp_init = mp.init_guard();
344
mp_init.get_promise().set_value(Unit());
345
mp_init.add_promise(log);
346
mp_init.get_promise().set_error(Status::Error(1));
347
ASSERT_EQ("E1;", str);
349
mp_init.get_promise().set_error(Status::Error(2));
351
mp_init.add_promise(log);
352
ASSERT_EQ("E1;", str);
354
ASSERT_EQ("E1;", str);
359
MultiPromise mp(fail_on_error);
362
auto mp_init = mp.init_guard();
363
mp_init.get_promise().set_value(Unit());
364
mp_init.add_promise(log);
365
promise = mp_init.get_promise();
369
auto mp_init = mp.add_promise_or_init(log);
370
ASSERT_TRUE(mp_init.empty());
372
promise.set_error(Status::Error(2));
373
ASSERT_EQ("E2;E2;", str);
377
auto mp_init = mp.add_promise_or_init(log);
378
ASSERT_TRUE(!mp_init.empty());
380
ASSERT_EQ("OK;", str);
384
#if TD_HAVE_COROUTINES
385
#include <experimental/coroutine>
387
template <class T = Unit>
389
struct final_awaiter {
390
bool await_ready() const noexcept {
394
std::experimental::coroutine_handle<> await_suspend(std::experimental::coroutine_handle<P> continuation) noexcept {
395
return continuation.promise().continuation_;
397
void await_resume() noexcept {
400
struct promise_type {
401
task get_return_object() {
404
std::experimental::suspend_always initial_suspend() {
407
final_awaiter final_suspend() {
408
return final_awaiter{};
410
void return_value(T v) {
414
return std::move(value_.value());
416
void unhandled_exception() {
420
std::experimental::coroutine_handle<> continuation_;
424
std::experimental::coroutine_handle<promise_type> coroutine_handle_;
425
task(task &&other) = default;
426
task(promise_type &promise)
427
: coroutine_handle_(std::experimental::coroutine_handle<promise_type>::from_promise(promise)) {
430
bool await_ready() const noexcept {
431
return !coroutine_handle_ || coroutine_handle_.done();
433
std::experimental::coroutine_handle<> await_suspend(std::experimental::coroutine_handle<> continuation) noexcept {
434
coroutine_handle_.promise().continuation_ = continuation;
435
return coroutine_handle_;
437
T await_resume() noexcept {
438
return coroutine_handle_.promise().move_value();
449
auto a = co_await f();
450
auto b = co_await g();
454
struct immediate_task {
455
struct promise_type {
456
immediate_task get_return_object() {
459
std::experimental::suspend_never initial_suspend() {
462
std::experimental::suspend_never final_suspend() {
467
void unhandled_exception() {
475
OnActor(T &&actor_id) : actor_id_(actor_id.as_actor_ref()) {
477
bool await_ready() const noexcept {
480
void await_suspend(std::experimental::coroutine_handle<> continuation) noexcept {
481
//TODO: destroy if lambda is lost
482
send_lambda(actor_id_, [continuation]() mutable { continuation.resume(); });
484
void await_resume() noexcept {
488
actor::detail::ActorRef actor_id_;
491
immediate_task check_h() {
492
LOG(ERROR) << "check_h: call h";
493
auto c = co_await h();
494
LOG(ERROR) << "check_h: after call h";
498
TEST(ActorCoro, Task) {
504
class Printer : public Actor {
515
class SampleActor : public Actor {
517
SampleActor(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
521
std::shared_ptr<Destructor> watcher_;
522
ActorOwn<Printer> printer_;
523
void start_up() override {
524
printer_ = create_actor<Printer>("Printer");
527
task<Unit> print_a() {
528
auto self = actor_id(this);
529
LOG(ERROR) << "enter print_a";
530
co_await OnActor(printer_);
531
detail::current_actor<Printer>().print_a();
532
co_await OnActor(self);
533
LOG(ERROR) << "exit print_a";
536
task<Unit> print_b() {
537
auto self = actor_id(this);
538
LOG(ERROR) << "enter print_b";
539
co_await OnActor(printer_);
540
detail::current_actor<Printer>().print_b();
541
co_await OnActor(self);
542
LOG(ERROR) << "exit print_b";
546
immediate_task run_coroutine() {
554
TEST(ActorCoro, Simple) {
555
using namespace td::actor;
557
Scheduler scheduler({1});
559
auto watcher = td::create_shared_destructor([] {
560
LOG(ERROR) << "STOP";
561
SchedulerContext::get()->stop();
563
scheduler.run_in_context([watcher = std::move(watcher)] {
564
create_actor<actor::SampleActor>(ActorOptions().with_name("SampleActor").with_poll(), watcher).release();