MetaGPT

Форк
0
/
async_helper.py 
37 строк · 1023.0 Байт
1
import asyncio
2
import threading
3
from typing import Any
4

5

6
def run_coroutine_in_new_loop(coroutine) -> Any:
7
    """Runs a coroutine in a new, separate event loop on a different thread.
8

9
    This function is useful when try to execute an async function within a sync function, but encounter the error `RuntimeError: This event loop is already running`.
10
    """
11
    new_loop = asyncio.new_event_loop()
12
    t = threading.Thread(target=lambda: new_loop.run_forever())
13
    t.start()
14

15
    future = asyncio.run_coroutine_threadsafe(coroutine, new_loop)
16

17
    try:
18
        return future.result()
19
    finally:
20
        new_loop.call_soon_threadsafe(new_loop.stop)
21
        t.join()
22
        new_loop.close()
23

24

25
class NestAsyncio:
26
    """Make asyncio event loop reentrant."""
27

28
    is_applied = False
29

30
    @classmethod
31
    def apply_once(cls):
32
        """Ensures `nest_asyncio.apply()` is called only once."""
33
        if not cls.is_applied:
34
            import nest_asyncio
35

36
            nest_asyncio.apply()
37
            cls.is_applied = True
38

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.