Ton

Форк
0
/
actors_promise.cpp 
570 строк · 14.7 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19
#include "td/actor/actor.h"
20
#include "td/actor/PromiseFuture.h"
21
#include "td/actor/MultiPromise.h"
22
#include "td/utils/MovableValue.h"
23
#include "td/utils/tests.h"
24

25
template <class T>
26
class X {
27
 public:
28
  X() = default;
29
  X(X &&) = default;
30
  template <class S>
31
  X(S s) : t(s) {
32
  }
33
  T t;
34
};
35

36
TEST(Actor, promise) {
37
  using Int = td::MovableValue<int>;
38
  using td::Promise;
39
  using td::Result;
40

41
  auto set_int = [](td::Result<Int> &destination) {
42
    return [&destination](Int value) { destination = std::move(value); };
43
  };
44
  auto set_result_int = [](Result<Int> &destination) {
45
    return [&destination](Result<Int> value) { destination = std::move(value); };
46
  };
47

48
  {
49
    Result<Int> result{2};
50
    {
51
      Promise<Int> promise = set_int(result);
52
      promise.set_value(Int{3});
53
    }
54
    ASSERT_TRUE(result.is_ok());
55
    ASSERT_EQ(result.ok().get(), 3);
56
  }
57

58
  {
59
    Result<Int> result{2};
60
    {
61
      Promise<Int> promise = set_int(result);
62
      (void)promise;
63
      // will set Int{} on destruction
64
    }
65
    ASSERT_TRUE(result.is_ok());
66
    ASSERT_EQ(result.ok().get(), 0);
67
  }
68

69
  {
70
    Result<Int> result{2};
71
    {
72
      Promise<Int> promise = set_result_int(result);
73
      promise.set_value(Int{3});
74
    }
75
    ASSERT_TRUE(result.is_ok());
76
    ASSERT_EQ(result.ok().get(), 3);
77
  }
78

79
  {
80
    Result<Int> result{2};
81
    {
82
      Promise<Int> promise = set_result_int(result);
83
      (void)promise;
84
      // will set Status::Error() on destruction
85
    }
86
    ASSERT_TRUE(result.is_error());
87
  }
88

89
  {
90
    std::unique_ptr<int> res;
91
    Promise<td::Unit> x = [a = std::make_unique<int>(5), &res](td::Unit) mutable { res = std::move(a); };
92
    x(td::Unit());
93
    CHECK(*res == 5);
94
  }
95

96
  {//{
97
   //Promise<Int> promise;
98
   //std::tuple<Promise<Int> &&> f(std::move(promise));
99
   //std::tuple<Promise<Int>> x = std::move(f);
100
   //}
101

102
   {
103
       //using T = Result<int>;
104
       //using T = std::unique_ptr<int>;
105
       //using T = std::function<int()>;
106
       //using T = std::vector<int>;
107
       //using T = X<int>;
108
       ////using T = Promise<Int>;
109
       //T f;
110
       //std::tuple<T &&> g(std::move(f));
111
       //std::tuple<T> h = std::move(g);
112
   }}
113

114
  {
115
    int result = 0;
116
    auto promise = td::lambda_promise<int>([&](auto x) { result = x.move_as_ok(); });
117
    promise.set_value(5);
118
    ASSERT_EQ(5, result);
119

120
    Promise<int> promise2 = [&](auto x) { result = x.move_as_ok(); };
121
    promise2.set_value(6);
122
    ASSERT_EQ(6, result);
123
  }
124
}
125

126
TEST(Actor, safe_promise) {
127
  int res = 0;
128
  {
129
    td::Promise<int> promise = td::PromiseCreator::lambda([&](int x) { res = x; });
130
    auto safe_promise = td::SafePromise<int>(std::move(promise), 2);
131
    promise = std::move(safe_promise);
132
    ASSERT_EQ(res, 0);
133
    auto safe_promise2 = td::SafePromise<int>(std::move(promise), 3);
134
  }
135
  ASSERT_EQ(res, 3);
136
}
137

138
TEST(Actor, split_promise) {
139
  using td::Promise;
140
  using td::Result;
141
  using td::split_promise;
142
  using td::SplitPromise;
143
  {
144
    td::optional<std::pair<int, double>> x;
145
    auto pair = [&](Result<std::pair<int, double>> res) { x = res.move_as_ok(); };
146
    static_assert(std::is_same<SplitPromise<decltype(pair)>::ArgT, std::pair<int, double>>::value, "A");
147
    static_assert(
148
        std::is_same<SplitPromise<decltype(pair)>::SplittedT, std::pair<Promise<int>, Promise<double>>>::value, "A");
149
    auto splitted = split_promise(pair);
150
    static_assert(std::is_same<decltype(splitted), std::pair<Promise<int>, Promise<double>>>::value, "A");
151

152
    splitted.first.set_value(1);
153
    splitted.second.set_value(2.0);
154
    CHECK(x.unwrap() == std::make_pair(1, 2.0));
155
  }  // namespace td
156
  {
157
    td::optional<std::tuple<int, double, std::string>> x;
158
    auto triple = [&](Result<std::tuple<int, double, std::string>> res) { x = res.move_as_ok(); };
159
    static_assert(std::is_same<SplitPromise<decltype(triple)>::ArgT, std::tuple<int, double, std::string>>::value, "A");
160
    static_assert(std::is_same<SplitPromise<decltype(triple)>::SplittedT,
161
                               std::tuple<Promise<int>, Promise<double>, Promise<std::string>>>::value,
162
                  "A");
163
    auto splitted = split_promise(triple);
164
    static_assert(
165
        std::is_same<decltype(splitted), std::tuple<Promise<int>, Promise<double>, Promise<std::string>>>::value, "A");
166
    std::get<0>(splitted).set_value(1);
167
    std::get<1>(splitted).set_value(2.0);
168
    std::get<2>(splitted).set_value("hello");
169
    CHECK(x.unwrap() == std::make_tuple(1, 2.0, "hello"));
170
  }
171
  {
172
    int code = 0;
173
    auto pair = [&](Result<std::pair<int, double>> res) {
174
      res.ensure_error();
175
      code = res.error().code();
176
    };
177
    auto splitted = split_promise(td::Promise<std::pair<int, double>>(pair));
178
    splitted.second.set_error(td::Status::Error(123, "123"));
179
    CHECK(code == 0);
180
    splitted.first.set_value(1);
181
    CHECK(code == 123);
182
  }
183
}
184

185
TEST(Actor, promise_future) {
186
  using td::make_promise_future;
187
  {
188
    auto pf = make_promise_future<int>();
189
    td::optional<int> res;
190
    pf.second.map([](int x) { return x * 2; }).map([](int x) { return x + 10; }).map([&](int x) {
191
      res = x;
192
      return td::Unit();
193
    });
194
    CHECK(!res);
195
    pf.first.set_value(6);
196
    ASSERT_EQ(22, res.unwrap());
197
  }
198
  {
199
    LOG(ERROR) << "Second test";
200
    td::optional<int> res;
201
    td::make_future(6)
202
        .map([](int x) { return x * 2; })
203
        .map([](int x) { return x + 10; })
204
        .fmap([&](int x) { return td::make_future(x * 2); })
205
        .finish([&](int x) { res = x; });
206
    ASSERT_EQ(44, res.unwrap());
207
  }
208
}
209

210
TEST(Actor2, actor_lost_promise) {
211
  using namespace td::actor;
212
  using namespace td;
213
  Scheduler scheduler({1}, false, Scheduler::Paused);
214

215
  auto watcher = td::create_shared_destructor([] {
216
    LOG(ERROR) << "STOP";
217
    SchedulerContext::get()->stop();
218
  });
219
  scheduler.run_in_context([watcher = std::move(watcher)] {
220
    class B : public Actor {
221
     public:
222
      void start_up() override {
223
        stop();
224
      }
225
      uint32 query(uint32 x) {
226
        return x * x;
227
      }
228
    };
229
    class A : public Actor {
230
     public:
231
      A(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
232
      }
233
      void start_up() {
234
        b_ = create_actor<B>(ActorOptions().with_name("B"));
235
        //send_closure(b_, &B::query, 2, [self = actor_id(this)](uint32 y) { send_closure(self, &A::on_result, 2, y); });
236
        send_closure_later(b_, &B::query, 2,
237
                           [self = actor_id(this), a = std::make_unique<int>()](Result<uint32> y) mutable {
238
                             LOG(ERROR) << "!";
239
                             CHECK(y.is_error());
240
                             send_closure(self, &A::finish);
241
                           });
242
        send_closure(b_, &B::query, 2, [self = actor_id(this), a = std::make_unique<int>()](Result<uint32> y) mutable {
243
          LOG(ERROR) << "!";
244
          CHECK(y.is_error());
245
          send_closure(self, &A::finish);
246
        });
247
      }
248
      void finish() {
249
        LOG(ERROR) << "FINISH";
250
        stop();
251
      }
252

253
     private:
254
      std::shared_ptr<td::Destructor> watcher_;
255
      td::actor::ActorOwn<B> b_;
256
    };
257
    create_actor<A>(ActorOptions().with_name("A").with_poll(), watcher).release();
258
  });
259
  scheduler.run();
260
}
261

262
TEST(Actor2, MultiPromise) {
263
  using namespace td;
264
  MultiPromise::Options fail_on_error;
265
  fail_on_error.ignore_errors = false;
266
  MultiPromise::Options ignore_errors;
267
  ignore_errors.ignore_errors = true;
268

269
  std::string str;
270
  auto log = [&](Result<Unit> res) {
271
    if (res.is_ok()) {
272
      str += "OK;";
273
    } else {
274
      str += PSTRING() << "E" << res.error().code() << ";";
275
    }
276
  };
277
  auto clear = [&] { str = ""; };
278

279
  {
280
    clear();
281
    MultiPromise mp(ignore_errors);
282
    {
283
      auto mp_init = mp.init_guard();
284
      mp_init.add_promise(log);
285
      ASSERT_EQ("", str);
286
    }
287
    ASSERT_EQ("OK;", str);
288
  }
289

290
  {
291
    clear();
292
    MultiPromise mp(ignore_errors);
293
    {
294
      auto mp_init = mp.init_guard();
295
      mp_init.add_promise(log);
296
      mp_init.get_promise().set_error(Status::Error(1));
297
      ASSERT_EQ("", str);
298
    }
299
    ASSERT_EQ("OK;", str);
300
  }
301

302
  {
303
    clear();
304
    MultiPromise mp(ignore_errors);
305
    Promise<> promise;
306
    {
307
      auto mp_init = mp.init_guard();
308
      mp_init.add_promise(log);
309
      promise = mp_init.get_promise();
310
    }
311
    ASSERT_EQ("", str);
312
    {
313
      auto mp_init = mp.add_promise_or_init(log);
314
      ASSERT_TRUE(!mp_init);
315
    }
316
    promise.set_error(Status::Error(2));
317
    ASSERT_EQ("OK;OK;", str);
318
    clear();
319
    {
320
      auto mp_init = mp.add_promise_or_init(log);
321
      ASSERT_TRUE(mp_init);
322
      ASSERT_EQ("", str);
323
    }
324
    ASSERT_EQ("OK;", str);
325
  }
326

327
  {
328
    clear();
329
    MultiPromise mp(fail_on_error);
330
    {
331
      auto mp_init = mp.init_guard();
332
      mp_init.get_promise().set_value(Unit());
333
      mp_init.add_promise(log);
334
      ASSERT_EQ("", str);
335
    }
336
    ASSERT_EQ("OK;", str);
337
  }
338

339
  {
340
    clear();
341
    MultiPromise mp(fail_on_error);
342
    {
343
      auto mp_init = mp.init_guard();
344
      mp_init.get_promise().set_value(Unit());
345
      mp_init.add_promise(log);
346
      mp_init.get_promise().set_error(Status::Error(1));
347
      ASSERT_EQ("E1;", str);
348
      clear();
349
      mp_init.get_promise().set_error(Status::Error(2));
350
      ASSERT_EQ("", str);
351
      mp_init.add_promise(log);
352
      ASSERT_EQ("E1;", str);
353
    }
354
    ASSERT_EQ("E1;", str);
355
  }
356

357
  {
358
    clear();
359
    MultiPromise mp(fail_on_error);
360
    Promise<> promise;
361
    {
362
      auto mp_init = mp.init_guard();
363
      mp_init.get_promise().set_value(Unit());
364
      mp_init.add_promise(log);
365
      promise = mp_init.get_promise();
366
    }
367
    ASSERT_EQ("", str);
368
    {
369
      auto mp_init = mp.add_promise_or_init(log);
370
      ASSERT_TRUE(mp_init.empty());
371
    }
372
    promise.set_error(Status::Error(2));
373
    ASSERT_EQ("E2;E2;", str);
374
    clear();
375

376
    {
377
      auto mp_init = mp.add_promise_or_init(log);
378
      ASSERT_TRUE(!mp_init.empty());
379
    }
380
    ASSERT_EQ("OK;", str);
381
  }
382
}
383

384
#if TD_HAVE_COROUTINES
385
#include <experimental/coroutine>
386
namespace td {
387
template <class T = Unit>
388
struct task {
389
  struct final_awaiter {
390
    bool await_ready() const noexcept {
391
      return false;
392
    }
393
    template <class P>
394
    std::experimental::coroutine_handle<> await_suspend(std::experimental::coroutine_handle<P> continuation) noexcept {
395
      return continuation.promise().continuation_;
396
    }
397
    void await_resume() noexcept {
398
    }
399
  };
400
  struct promise_type {
401
    task get_return_object() {
402
      return task{*this};
403
    }
404
    std::experimental::suspend_always initial_suspend() {
405
      return {};
406
    }
407
    final_awaiter final_suspend() {
408
      return final_awaiter{};
409
    }
410
    void return_value(T v) {
411
      value_ = v;
412
    }
413
    T move_value() {
414
      return std::move(value_.value());
415
    }
416
    void unhandled_exception() {
417
    }
418

419
    optional<T> value_;
420
    std::experimental::coroutine_handle<> continuation_;
421
  };
422

423
  // awaiter
424
  std::experimental::coroutine_handle<promise_type> coroutine_handle_;
425
  task(task &&other) = default;
426
  task(promise_type &promise)
427
      : coroutine_handle_(std::experimental::coroutine_handle<promise_type>::from_promise(promise)) {
428
  }
429

430
  bool await_ready() const noexcept {
431
    return !coroutine_handle_ || coroutine_handle_.done();
432
  }
433
  std::experimental::coroutine_handle<> await_suspend(std::experimental::coroutine_handle<> continuation) noexcept {
434
    coroutine_handle_.promise().continuation_ = continuation;
435
    return coroutine_handle_;
436
  }
437
  T await_resume() noexcept {
438
    return coroutine_handle_.promise().move_value();
439
  }
440
};
441

442
task<int> f() {
443
  co_return 1;
444
}
445
task<int> g() {
446
  co_return 2;
447
}
448
task<int> h() {
449
  auto a = co_await f();
450
  auto b = co_await g();
451
  co_return a + b;
452
}
453

454
struct immediate_task {
455
  struct promise_type {
456
    immediate_task get_return_object() {
457
      return {};
458
    }
459
    std::experimental::suspend_never initial_suspend() {
460
      return {};
461
    }
462
    std::experimental::suspend_never final_suspend() {
463
      return {};
464
    }
465
    void return_void() {
466
    }
467
    void unhandled_exception() {
468
    }
469
  };
470
};
471

472
struct OnActor {
473
 public:
474
  template <class T>
475
  OnActor(T &&actor_id) : actor_id_(actor_id.as_actor_ref()) {
476
  }
477
  bool await_ready() const noexcept {
478
    return false;
479
  }
480
  void await_suspend(std::experimental::coroutine_handle<> continuation) noexcept {
481
    //TODO: destroy if lambda is lost
482
    send_lambda(actor_id_, [continuation]() mutable { continuation.resume(); });
483
  }
484
  void await_resume() noexcept {
485
  }
486

487
 private:
488
  actor::detail::ActorRef actor_id_;
489
};
490

491
immediate_task check_h() {
492
  LOG(ERROR) << "check_h: call h";
493
  auto c = co_await h();
494
  LOG(ERROR) << "check_h: after call h";
495
  ASSERT_EQ(3, c);
496
}
497

498
TEST(ActorCoro, Task) {
499
  check_h();
500
}
501
namespace actor {
502
class AsyncQuery {};
503

504
class Printer : public Actor {
505
 public:
506
  void f();
507
  void print_a() {
508
    LOG(ERROR) << "a";
509
  }
510
  void print_b() {
511
    LOG(ERROR) << "b";
512
  }
513
};
514

515
class SampleActor : public Actor {
516
 public:
517
  SampleActor(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
518
  }
519

520
 private:
521
  std::shared_ptr<Destructor> watcher_;
522
  ActorOwn<Printer> printer_;
523
  void start_up() override {
524
    printer_ = create_actor<Printer>("Printer");
525
    run_coroutine();
526
  }
527
  task<Unit> print_a() {
528
    auto self = actor_id(this);
529
    LOG(ERROR) << "enter print_a";
530
    co_await OnActor(printer_);
531
    detail::current_actor<Printer>().print_a();
532
    co_await OnActor(self);
533
    LOG(ERROR) << "exit print_a";
534
    co_return {};
535
  }
536
  task<Unit> print_b() {
537
    auto self = actor_id(this);
538
    LOG(ERROR) << "enter print_b";
539
    co_await OnActor(printer_);
540
    detail::current_actor<Printer>().print_b();
541
    co_await OnActor(self);
542
    LOG(ERROR) << "exit print_b";
543
    co_return {};
544
  }
545

546
  immediate_task run_coroutine() {
547
    co_await print_a();
548
    co_await print_b();
549
    stop();
550
  }
551
};
552
}  // namespace actor
553

554
TEST(ActorCoro, Simple) {
555
  using namespace td::actor;
556
  using namespace td;
557
  Scheduler scheduler({1});
558

559
  auto watcher = td::create_shared_destructor([] {
560
    LOG(ERROR) << "STOP";
561
    SchedulerContext::get()->stop();
562
  });
563
  scheduler.run_in_context([watcher = std::move(watcher)] {
564
    create_actor<actor::SampleActor>(ActorOptions().with_name("SampleActor").with_poll(), watcher).release();
565
  });
566
  scheduler.run();
567
}
568

569
}  // namespace td
570
#endif
571

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.