MetaGPT
1import asyncio
2import threading
3from typing import Any
4
5
6def run_coroutine_in_new_loop(coroutine) -> Any:
7"""Runs a coroutine in a new, separate event loop on a different thread.
8
9This function is useful when try to execute an async function within a sync function, but encounter the error `RuntimeError: This event loop is already running`.
10"""
11new_loop = asyncio.new_event_loop()
12t = threading.Thread(target=lambda: new_loop.run_forever())
13t.start()
14
15future = asyncio.run_coroutine_threadsafe(coroutine, new_loop)
16
17try:
18return future.result()
19finally:
20new_loop.call_soon_threadsafe(new_loop.stop)
21t.join()
22new_loop.close()
23
24
25class NestAsyncio:
26"""Make asyncio event loop reentrant."""
27
28is_applied = False
29
30@classmethod
31def apply_once(cls):
32"""Ensures `nest_asyncio.apply()` is called only once."""
33if not cls.is_applied:
34import nest_asyncio
35
36nest_asyncio.apply()
37cls.is_applied = True
38