2
from typing import AsyncGenerator, Awaitable, Callable
4
from pydantic import BaseModel, ConfigDict, Field
6
from metagpt.logs import logger
7
from metagpt.roles import Role
8
from metagpt.schema import Message
11
class SubscriptionRunner(BaseModel):
12
"""A simple wrapper to manage subscription tasks for different roles using asyncio.
16
>>> from metagpt.address import SubscriptionRunner
17
>>> from metagpt.roles import Searcher
18
>>> from metagpt.schema import Message
20
>>> async def trigger():
22
... yield Message(content="the latest news about OpenAI")
23
... await asyncio.sleep(3600 * 24)
25
>>> async def callback(msg: Message):
26
... print(msg.content)
29
... pb = SubscriptionRunner()
30
... await pb.subscribe(Searcher(), trigger(), callback)
33
>>> asyncio.run(main())
36
model_config = ConfigDict(arbitrary_types_allowed=True)
38
tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)
43
trigger: AsyncGenerator[Message, None],
51
"""Subscribes a role to a trigger and sets up a callback to be called with the role's response.
54
role: The role to subscribe.
55
trigger: An asynchronous generator that yields Messages to be processed by the role.
56
callback: An asynchronous function to be called with the response from the role.
58
loop = asyncio.get_running_loop()
60
async def _start_role():
61
async for msg in trigger:
62
resp = await role.run(msg)
65
self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")
67
async def unsubscribe(self, role: Role):
68
"""Unsubscribes a role from its trigger and cancels the associated task.
71
role: The role to unsubscribe.
73
task = self.tasks.pop(role)
76
async def run(self, raise_exception: bool = True):
77
"""Runs all subscribed tasks and handles their completion or exception.
80
raise_exception: _description_. Defaults to True.
83
task.exception: _description_
86
for role, task in self.tasks.items():
90
raise task.exception()
91
logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
94
f"Task {task.get_name()} has completed. "
95
"If this is unexpected behavior, please check the trigger function."
100
await asyncio.sleep(1)