MetaGPT

Форк
0
/
subscription.py 
100 строк · 3.2 Кб
1
import asyncio
2
from typing import AsyncGenerator, Awaitable, Callable
3

4
from pydantic import BaseModel, ConfigDict, Field
5

6
from metagpt.logs import logger
7
from metagpt.roles import Role
8
from metagpt.schema import Message
9

10

11
class SubscriptionRunner(BaseModel):
12
    """A simple wrapper to manage subscription tasks for different roles using asyncio.
13

14
    Example:
15
        >>> import asyncio
16
        >>> from metagpt.address import SubscriptionRunner
17
        >>> from metagpt.roles import Searcher
18
        >>> from metagpt.schema import Message
19

20
        >>> async def trigger():
21
        ...     while True:
22
        ...         yield Message(content="the latest news about OpenAI")
23
        ...         await asyncio.sleep(3600 * 24)
24

25
        >>> async def callback(msg: Message):
26
        ...     print(msg.content)
27

28
        >>> async def main():
29
        ...     pb = SubscriptionRunner()
30
        ...     await pb.subscribe(Searcher(), trigger(), callback)
31
        ...     await pb.run()
32

33
        >>> asyncio.run(main())
34
    """
35

36
    model_config = ConfigDict(arbitrary_types_allowed=True)
37

38
    tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)
39

40
    async def subscribe(
41
        self,
42
        role: Role,
43
        trigger: AsyncGenerator[Message, None],
44
        callback: Callable[
45
            [
46
                Message,
47
            ],
48
            Awaitable[None],
49
        ],
50
    ):
51
        """Subscribes a role to a trigger and sets up a callback to be called with the role's response.
52

53
        Args:
54
            role: The role to subscribe.
55
            trigger: An asynchronous generator that yields Messages to be processed by the role.
56
            callback: An asynchronous function to be called with the response from the role.
57
        """
58
        loop = asyncio.get_running_loop()
59

60
        async def _start_role():
61
            async for msg in trigger:
62
                resp = await role.run(msg)
63
                await callback(resp)
64

65
        self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")
66

67
    async def unsubscribe(self, role: Role):
68
        """Unsubscribes a role from its trigger and cancels the associated task.
69

70
        Args:
71
            role: The role to unsubscribe.
72
        """
73
        task = self.tasks.pop(role)
74
        task.cancel()
75

76
    async def run(self, raise_exception: bool = True):
77
        """Runs all subscribed tasks and handles their completion or exception.
78

79
        Args:
80
            raise_exception: _description_. Defaults to True.
81

82
        Raises:
83
            task.exception: _description_
84
        """
85
        while True:
86
            for role, task in self.tasks.items():
87
                if task.done():
88
                    if task.exception():
89
                        if raise_exception:
90
                            raise task.exception()
91
                        logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
92
                    else:
93
                        logger.warning(
94
                            f"Task {task.get_name()} has completed. "
95
                            "If this is unexpected behavior, please check the trigger function."
96
                        )
97
                    self.tasks.pop(role)
98
                    break
99
            else:
100
                await asyncio.sleep(1)
101

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.