Autogen_core: ClosureAgent使用与测试

ops/2025/1/31 7:13:19/

目录

      • 第一个示例
      • 第二个示例
      • 完成的功能

下面两个示例展示了如何使用 AutoGen 库中的 ClosureAgent 来创建和使用代理。 ClosureAgent 允许你使用闭包(即一个没有定义类的函数)来定义代理,并从运行时中提取值。代码中展示了两个示例:

第一个示例

import asyncio
from dataclasses import dataclassfrom autogen_core import (ClosureAgent,ClosureContext,DefaultSubscription,DefaultTopicId,MessageContext,SingleThreadedAgentRuntime,
)
"""
The closure agent allows you to define an agent using a closure, or function without needing to define a class. It allows values to be extracted out of the runtime.The closure can define the type of message which is expected, or `Any` can be used to accept any type of message."""
@dataclass
class MyMessage:content: strasync def main():queue = asyncio.Queue[MyMessage]()async def output_result(_ctx: ClosureContext, message: MyMessage, ctx: MessageContext) -> None:await queue.put(message)runtime = SingleThreadedAgentRuntime()await ClosureAgent.register_closure(runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()])runtime.start()await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId())await runtime.stop_when_idle()result = await queue.get()print(result)await main()
MyMessage(content='Hello, world!')

这个示例定义了一个简单的代理,它接收一个消息并将其放入一个 asyncio 队列中。

  1. 定义了一个名为 MyMessage 的数据类,用于存储消息内容。
  2. 创建了一个名为 output_result 的异步函数,它接收一个 ClosureContext、一个 MyMessage 和一个 MessageContext,然后将消息放入队列中。
  3. 创建了一个 SingleThreadedAgentRuntime 实例。
  4. 使用 register_closure 方法注册了 output_result 函数作为一个代理。它订阅了默认的主题。
  5. 启动运行时,发布一个 MyMessage 消息,并在空闲时停止运行时。
  6. 从队列中获取结果并打印。

这个示例展示了如何注册一个闭包代理,并在运行时中发送和接收消息。

第二个示例

@dataclass
class Message:content: strasync def test_register_receives_publish() -> None:runtime = SingleThreadedAgentRuntime()queue = asyncio.Queue[tuple[str, str]]()async def log_message(closure_ctx: ClosureContext, message: Message, ctx: MessageContext) -> None:key = closure_ctx.id.keyawait queue.put((key, message.content))await ClosureAgent.register_closure(runtime, "name", log_message, subscriptions=lambda: [DefaultSubscription()])runtime.start()await runtime.publish_message(Message("first message"), topic_id=DefaultTopicId())await runtime.publish_message(Message("second message"), topic_id=DefaultTopicId())await runtime.publish_message(Message("third message"), topic_id=DefaultTopicId())await runtime.stop_when_idle()assert queue.qsize() == 3assert queue.get_nowait() == ("default", "first message")assert queue.get_nowait() == ("default", "second message")assert queue.get_nowait() == ("default", "third message")assert queue.empty()test_register_receives_publish()
<coroutine object test_register_receives_publish at 0x00000150ABEAA640>

这个示例定义了一个测试函数,用于测试注册代理、接收和发布消息的功能。

  1. 定义了一个名为 Message 的数据类,用于存储消息内容。
  2. 创建了一个名为 test_register_receives_publish 的异步测试函数。
  3. 在这个函数中,创建了一个 SingleThreadedAgentRuntime 实例和一个 asyncio 队列。
  4. 定义了一个名为 log_message 的异步函数,它接收一个 ClosureContext、一个 Message 和一个 MessageContext,然后将代理的 ID 和消息内容作为一个元组放入队列中。
  5. 使用 register_closure 方法注册了 log_message 函数作为一个代理,并订阅了默认的主题。
  6. 启动运行时,并发布三个 Message 消息。
  7. 等待运行时空闲后,检查队列中的消息数量和内容,确保有三个消息,并且它们的顺序和内容与发布的消息一致。

这个示例展示了如何测试代理的注册、消息的接收和发布功能。

完成的功能

这两个示例展示了如何使用 ClosureAgentSingleThreadedAgentRuntime 来创建、注册和使用闭包代理。它们展示了如何发送和接收消息,以及如何测试代理的功能。这些示例是理解和学习如何使用 AutoGen 库来构建代理和消息传递系统的基础。


http://www.ppmy.cn/ops/154418.html

相关文章

MySQL(高级特性篇) 13 章——事务基础知识

一、数据库事务概述 事务是数据库区别于文件系统的重要特性之一 &#xff08;1&#xff09;存储引擎支持情况 SHOW ENGINES命令来查看当前MySQL支持的存储引擎都有哪些&#xff0c;以及这些存储引擎是否支持事务能看出在MySQL中&#xff0c;只有InnoDB是支持事务的 &#x…

CF 764B.Timofey and cubes(Java实现)

题目分析 输入n个数字&#xff0c;首尾交换&#xff0c;奇数对换&#xff0c;偶数对不换 思路分析 存入数组&#xff0c;遍历时判断i%20时(数组下标0开始&#xff0c;所以题目分析没有错)&#xff0c;对换 代码 import java.util.*;public class Main {public static void ma…

第十四讲 JDBC数据库

1. 什么是JDBC JDBC&#xff08;Java Database Connectivity&#xff0c;Java数据库连接&#xff09;&#xff0c;它是一套用于执行SQL语句的Java API。应用程序可通过这套API连接到关系型数据库&#xff0c;并使用SQL语句来完成对数据库中数据的查询、新增、更新和删除等操作…

5分钟带你获取deepseek api并搭建简易问答应用

目录 1、获取api 2、获取base_url和chat_model 3、配置模型参数 方法一&#xff1a;终端中临时将加入 方法二&#xff1a;创建.env文件 4、 配置client 5、利用deepseek大模型实现简易问答 deepseek-v3是截止博文撰写之日&#xff0c;无论是国内还是国际上发布的大模型中…

《DeepSeek-R1:使用说明译文》

官网&#xff1a;deepseek-ai/DeepSeek-R1 1. 引言 我们介绍了我们的第一代推理模型 DeepSeek-R1-Zero 和 DeepSeek-R1。 DeepSeek-R1-Zero 是一种通过大规模强化学习 &#xff08;RL&#xff09; 训练的模型&#xff0c;没有监督微调 &#xff08;SFT&#xff09; 作为初步步骤…

uniapp商城之商品分类

文章目录 前言一、新建分类组件1.轮播图效果二、一级分类与Tab交互1.封装API接口2.初始化调用3.定义类型4.渲染一级分类5.Tab交互三、二级分类与商品渲染1.提取当前二级分类数据2.渲染二级分类2.渲染商品四、骨架屏1.微信开发者工具2.生成骨架屏3.调整为vue组件方便开发的操作前…

未来无线技术的发展方向

未来无线技术的发展趋势呈现出多样化、融合化的特点&#xff0c;涵盖速度、覆盖范围、应用领域、频段利用、安全性等多个方面。这些趋势将深刻改变人们的生活和社会的运行方式。 传输速度提升&#xff1a;Wi-Fi 技术迭代加快&#xff0c;如 Wi-Fi7 理论峰值速率达 46Gbps&#…

Kafka 压缩算法详细介绍

文章目录 一 、Kafka 压缩算法概述二、Kafka 压缩的作用2.1 降低网络带宽消耗2.2 提高 Kafka 生产者和消费者吞吐量2.3 减少 Kafka 磁盘存储占用2.4 减少 Kafka Broker 负载2.5 降低跨数据中心同步成本 三、Kafka 压缩的原理3.1 Kafka 压缩的基本原理3.2. Kafka 压缩的工作流程…