目录
- 第一个示例
- 第二个示例
- 完成的功能
下面两个示例展示了如何使用 AutoGen 库中的
ClosureAgent
来创建和使用代理。
ClosureAgent
允许你使用闭包(即一个没有定义类的函数)来定义代理,并从运行时中提取值。代码中展示了两个示例:
第一个示例
import asyncio
from dataclasses import dataclassfrom autogen_core import (ClosureAgent,ClosureContext,DefaultSubscription,DefaultTopicId,MessageContext,SingleThreadedAgentRuntime,
)
"""
The closure agent allows you to define an agent using a closure, or function without needing to define a class. It allows values to be extracted out of the runtime.The closure can define the type of message which is expected, or `Any` can be used to accept any type of message."""
@dataclass
class MyMessage:content: strasync def main():queue = asyncio.Queue[MyMessage]()async def output_result(_ctx: ClosureContext, message: MyMessage, ctx: MessageContext) -> None:await queue.put(message)runtime = SingleThreadedAgentRuntime()await ClosureAgent.register_closure(runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()])runtime.start()await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId())await runtime.stop_when_idle()result = await queue.get()print(result)await main()
MyMessage(content='Hello, world!')
这个示例定义了一个简单的代理,它接收一个消息并将其放入一个 asyncio 队列中。
- 定义了一个名为
MyMessage
的数据类,用于存储消息内容。 - 创建了一个名为
output_result
的异步函数,它接收一个ClosureContext
、一个MyMessage
和一个MessageContext
,然后将消息放入队列中。 - 创建了一个
SingleThreadedAgentRuntime
实例。 - 使用
register_closure
方法注册了output_result
函数作为一个代理。它订阅了默认的主题。 - 启动运行时,发布一个
MyMessage
消息,并在空闲时停止运行时。 - 从队列中获取结果并打印。
这个示例展示了如何注册一个闭包代理,并在运行时中发送和接收消息。
第二个示例
@dataclass
class Message:content: strasync def test_register_receives_publish() -> None:runtime = SingleThreadedAgentRuntime()queue = asyncio.Queue[tuple[str, str]]()async def log_message(closure_ctx: ClosureContext, message: Message, ctx: MessageContext) -> None:key = closure_ctx.id.keyawait queue.put((key, message.content))await ClosureAgent.register_closure(runtime, "name", log_message, subscriptions=lambda: [DefaultSubscription()])runtime.start()await runtime.publish_message(Message("first message"), topic_id=DefaultTopicId())await runtime.publish_message(Message("second message"), topic_id=DefaultTopicId())await runtime.publish_message(Message("third message"), topic_id=DefaultTopicId())await runtime.stop_when_idle()assert queue.qsize() == 3assert queue.get_nowait() == ("default", "first message")assert queue.get_nowait() == ("default", "second message")assert queue.get_nowait() == ("default", "third message")assert queue.empty()test_register_receives_publish()
<coroutine object test_register_receives_publish at 0x00000150ABEAA640>
这个示例定义了一个测试函数,用于测试注册代理、接收和发布消息的功能。
- 定义了一个名为
Message
的数据类,用于存储消息内容。 - 创建了一个名为
test_register_receives_publish
的异步测试函数。 - 在这个函数中,创建了一个
SingleThreadedAgentRuntime
实例和一个 asyncio 队列。 - 定义了一个名为
log_message
的异步函数,它接收一个ClosureContext
、一个Message
和一个MessageContext
,然后将代理的 ID 和消息内容作为一个元组放入队列中。 - 使用
register_closure
方法注册了log_message
函数作为一个代理,并订阅了默认的主题。 - 启动运行时,并发布三个
Message
消息。 - 等待运行时空闲后,检查队列中的消息数量和内容,确保有三个消息,并且它们的顺序和内容与发布的消息一致。
这个示例展示了如何测试代理的注册、消息的接收和发布功能。
完成的功能
这两个示例展示了如何使用 ClosureAgent
和 SingleThreadedAgentRuntime
来创建、注册和使用闭包代理。它们展示了如何发送和接收消息,以及如何测试代理的功能。这些示例是理解和学习如何使用 AutoGen 库来构建代理和消息传递系统的基础。