Autogen_core源码:_cancellation_token.py

devtools/2025/2/2 23:07:21/

目录

    • _cancellation_token.py代码
    • 代码解释
      • 类的初始化
      • 取消操作
      • 检查取消状态
      • 添加回调函数
      • 关联异步`Future`对象
      • 总结
    • 代码示例
      • 示例 1:基本的取消操作
      • 示例 2:添加回调函数
      • 示例 3:检查令牌是否已取消

_cancellation_token.py代码

import threading
from asyncio import Future
from typing import Any, Callable, Listclass CancellationToken:"""A token used to cancel pending async calls"""def __init__(self) -> None:self._cancelled: bool = Falseself._lock: threading.Lock = threading.Lock()self._callbacks: List[Callable[[], None]] = []def cancel(self) -> None:"""Cancel pending async calls linked to this cancellation token."""with self._lock:if not self._cancelled:self._cancelled = Truefor callback in self._callbacks:callback()def is_cancelled(self) -> bool:"""Check if the CancellationToken has been used"""with self._lock:return self._cancelleddef add_callback(self, callback: Callable[[], None]) -> None:"""Attach a callback that will be called when cancel is invoked"""with self._lock:if self._cancelled:callback()else:self._callbacks.append(callback)def link_future(self, future: Future[Any]) -> Future[Any]:"""Link a pending async call to a token to allow its cancellation"""with self._lock:if self._cancelled:future.cancel()else:def _cancel() -> None:future.cancel()self._callbacks.append(_cancel)return future

代码解释

这段Python代码定义了一个名为CancellationToken的类,其主要功能是提供一种机制,用于取消挂起的异步调用。下面详细解释代码的逻辑和功能:

类的初始化

def __init__(self) -> None:self._cancelled: bool = Falseself._lock: threading.Lock = threading.Lock()self._callbacks: List[Callable[[], None]] = []
  • _cancelled:一个布尔类型的私有变量,用于标记该取消令牌是否已经被使用(即是否已经调用了cancel方法),初始值为False
  • _lock:一个线程锁对象,用于确保在多线程环境下对共享资源(如_cancelled_callbacks)的访问是线程安全的。
  • _callbacks:一个列表,用于存储当调用cancel方法时需要执行的回调函数。

取消操作

def cancel(self) -> None:"""Cancel pending async calls linked to this cancellation token."""with self._lock:if not self._cancelled:self._cancelled = Truefor callback in self._callbacks:callback()
  • cancel方法用于取消与该取消令牌关联的所有挂起的异步调用。
  • 使用with self._lock语句确保在修改_cancelled状态和执行回调函数时不会出现竞态条件。
  • 只有当_cancelledFalse时,才会将其设置为True,并依次执行_callbacks列表中的所有回调函数。

检查取消状态

def is_cancelled(self) -> bool:"""Check if the CancellationToken has been used"""with self._lock:return self._cancelled
  • is_cancelled方法用于检查该取消令牌是否已经被使用。
  • 使用with self._lock语句确保在读取_cancelled状态时不会出现竞态条件。
  • 返回_cancelled的值。

添加回调函数

def add_callback(self, callback: Callable[[], None]) -> None:"""Attach a callback that will be called when cancel is invoked"""with self._lock:if self._cancelled:callback()else:self._callbacks.append(callback)
  • add_callback方法用于添加一个回调函数,当调用cancel方法时,该回调函数将被执行。
  • 使用with self._lock语句确保在检查_cancelled状态和修改_callbacks列表时不会出现竞态条件。
  • 如果_cancelledTrue,说明取消操作已经发生,直接执行回调函数;否则,将回调函数添加到_callbacks列表中。

关联异步Future对象

def link_future(self, future: Future[Any]) -> Future[Any]:"""Link a pending async call to a token to allow its cancellation"""with self._lock:if self._cancelled:future.cancel()else:def _cancel() -> None:future.cancel()self._callbacks.append(_cancel)return future
  • link_future方法用于将一个异步Future对象与该取消令牌关联起来,以便可以取消该异步调用。
  • 使用with self._lock语句确保在检查_cancelled状态和修改_callbacks列表时不会出现竞态条件。
  • 如果_cancelledTrue,说明取消操作已经发生,直接取消Future对象;否则,定义一个内部函数_cancel,用于取消Future对象,并将其添加到_callbacks列表中。
  • 最后返回Future对象。

总结

CancellationToken类提供了一种机制,允许用户在需要时取消挂起的异步调用。通过添加回调函数和关联Future对象,当调用cancel方法时,所有与该取消令牌关联的操作都将被取消。同时,使用线程锁确保了在多线程环境下的线程安全。

代码示例

示例 1:基本的取消操作

import asyncio
from typing import Any, Callable, List
import threading
from asyncio import Futureclass CancellationToken:"""A token used to cancel pending async calls"""def __init__(self) -> None:self._cancelled: bool = Falseself._lock: threading.Lock = threading.Lock()self._callbacks: List[Callable[[], None]] = []def cancel(self) -> None:"""Cancel pending async calls linked to this cancellation token."""with self._lock:if not self._cancelled:self._cancelled = Truefor callback in self._callbacks:callback()def is_cancelled(self) -> bool:"""Check if the CancellationToken has been used"""with self._lock:return self._cancelleddef add_callback(self, callback: Callable[[], None]) -> None:"""Attach a callback that will be called when cancel is invoked"""with self._lock:if self._cancelled:callback()else:self._callbacks.append(callback)def link_future(self, future: Future[Any]) -> Future[Any]:"""Link a pending async call to a token to allow its cancellation"""with self._lock:if self._cancelled:future.cancel()else:def _cancel() -> None:future.cancel()self._callbacks.append(_cancel)return futureasync def long_running_task():print("Task started")await asyncio.sleep(5)print("Task completed")async def main():token = CancellationToken()task = asyncio.create_task(long_running_task())token.link_future(task)# 模拟一段时间后取消任务await asyncio.sleep(2)token.cancel()try:await taskexcept asyncio.CancelledError:print("Task was cancelled")await main()
Task started
Task was cancelled

示例 2:添加回调函数

def callback_function():print("Callback function was called")async def main():token = CancellationToken()token.add_callback(callback_function)# 取消令牌token.cancel()await main()
Callback function was called

示例 3:检查令牌是否已取消


async def main():token = CancellationToken()print(f"Is cancelled before cancel: {token.is_cancelled()}")token.cancel()print(f"Is cancelled after cancel: {token.is_cancelled()}")await main()
Is cancelled before cancel: False
Is cancelled after cancel: True

http://www.ppmy.cn/devtools/155561.html

相关文章

基于 NodeJs 一个后端接口的创建过程及其规范 -- 【elpis全栈项目】

基于 NodeJs 一个后端接口的创建过程及其规范 一个接口的诞生: #mermaid-svg-46HXZKI3fdnO0rKV {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-46HXZKI3fdnO0rKV .error-icon{fill:#552222;}#mermaid-sv…

22.Word:小张-经费联审核结算单❗【16】

目录 NO1.2 NO3.4​ NO5.6.7 NO8邮件合并 MS搜狗输入法 NO1.2 用ms打开文件,而不是wps❗不然后面都没分布局→页面设置→页面大小→页面方向→上下左右:页边距→页码范围:多页:拼页光标处于→布局→分隔符:分节符…

【RAG】SKLearnVectorStore 避免使用gpt4all会connection err

gpt4all 列表中包含了多个开源的大模型,如 Qwen2.5、Llama 3、DeepSeek、Mistral 等,但 不包含 OpenAI 的 GPT-4o。GPT-4o 是 OpenAI 提供的闭源模型,目前只能通过 OpenAI API 或 ChatGPT 官方应用(网页版、移动端)访问,并不支持本地运行,也没有 GGUF 量化格式的模型文件…

wx043基于springboot+vue+uniapp的智慧物流小程序

开发语言:Java框架:springbootuniappJDK版本:JDK1.8服务器:tomcat7数据库:mysql 5.7(一定要5.7版本)数据库工具:Navicat11开发软件:eclipse/myeclipse/ideaMaven包&#…

SOME/IP--协议英文原文讲解3

前言 SOME/IP协议越来越多的用于汽车电子行业中,关于协议详细完全的中文资料却没有,所以我将结合工作经验并对照英文原版协议做一系列的文章。基本分三大块: 1. SOME/IP协议讲解 2. SOME/IP-SD协议讲解 3. python/C举例调试讲解 Note: Thi…

【数据结构】_复杂度

目录 1. 算法效率 2. 时间复杂度 2.1 时间复杂度概念 2.2 准确的时间复杂度函数式 2.3 大O渐进表示法 2.4 时间复杂度的常见量级 2.5 时间复杂度示例 3. 空间复杂度 3.1 空间复杂度概念 3.2 空间复杂度示例 1. 算法效率 一般情况下,衡量一个算法的好坏是…

基于 STM32 的智能电梯控制系统

1. 引言 随着城市化进程的加速,高层建筑日益增多,电梯作为垂直交通工具的重要性愈发凸显。传统电梯控制系统在运行效率、安全性和智能化程度上已难以满足现代需求。智能电梯控制系统能够实时监测电梯的运行状态、乘客需求,并根据这些信息优化…

Vue3的el-table-column增加跳转其他页面

效果图 既不影响显示内容&#xff0c;也不影响页面跳转 el-table-column写法 <el-table-columnlabel"系统单号"align"center"prop"systematicReceipt"width"180" ><template #default"scope"><el-link t…