python之异步任务

news/2024/9/19 11:56:50/ 标签: python

在 Python 中,异步任务通常通过使用库如 Celery 来实现。Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时提供操作控制。

Celery 中,delayapply_async 是两种常用的方法来调度异步任务。

delay 方法

delayCelery 提供的一个快捷方法,用于简化任务的调用。它会自动将任务标记为异步执行。

python">from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def add(x, y):return x + y# 使用 delay 方法调用任务
result = add.delay(4, 6)

apply_async 方法

apply_async 提供了更多的控制选项,例如可以指定任务的执行时间、重试策略等。

python">from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def add(x, y):return x + y# 使用 apply_async 方法调用任务
result = add.apply_async((4, 6), countdown=10)  # 任务将在10秒后执行
参数说明
  • args:任务的参数,通常以元组形式传递。
  • kwargs:任务的关键字参数,以字典形式传递。
  • countdown:任务延迟执行的时间(以秒为单位)。
  • eta:任务的预计执行时间(datetime 对象)。
  • expires:任务的过期时间(datetime 对象或秒数)。
  • retry:是否在任务失败时自动重试。
  • retry_policy:重试策略,例如最大重试次数、重试间隔等。

示例1

使用 apply_async 方法来设置任务的各种参数:

python">from celery import Celery
from datetime import datetime, timedeltaapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task(bind=True, max_retries=3)
def add(self, x, y):try:return x + yexcept Exception as exc:raise self.retry(exc=exc, countdown=5)# 使用 apply_async 方法调用任务
eta = datetime.utcnow() + timedelta(seconds=10)
result = add.apply_async((4, 6), eta=eta, expires=60, retry=True, retry_policy={'max_retries': 5,'interval_start': 0,'interval_step': 0.2,'interval_max': 0.2,
})

任务 add 被设置为在10秒后执行,并且在60秒后过期。如果任务失败,它会自动重试最多5次,每次重试间隔0.2秒。

  • delay 方法是 apply_async 的简化版本,适用于简单的异步任务调用。
  • apply_async 方法提供了更多的控制选项,适用于需要更复杂调度和重试策略的任务。

示例2

假设你有一个自定义的任务基类 CallbackTask,你可以这样定义一个任务:

python">from celery import Celery, Taskapp = Celery('tasks', broker='pyamqp://guest@localhost//')class CallbackTask(Task):def on_success(self, retval, task_id, args, kwargs):print(f'Task {task_id} succeeded with result: {retval}')def on_failure(self, exc, task_id, args, kwargs, einfo):print(f'Task {task_id} failed with exception: {exc}')@app.task(name='my_custom_task', base=CallbackTask, ignore_result=True)
def add(x, y):return x + y# 调用任务
result = add.delay(4, 6)
  1. 自定义任务基类 CallbackTask

    • on_success 方法:当任务成功完成时调用。
    • on_failure 方法:当任务失败时调用。
  2. 任务定义

    • @app.task(name='my_custom_task', base=CallbackTask, ignore_result=True)
      • name='my_custom_task':任务的自定义名称。
      • base=CallbackTask:任务的基类是 CallbackTask
      • ignore_result=True:任务的结果将不会被存储。
  3. 调用任务

    • result = add.delay(4, 6):异步调用任务 add,传递参数 46

Celery 中,任务的参数通常以元组或字典的形式传递,并且 Celery 会自动处理参数的序列化和反序列化。因此,你通常不需要手动将参数 JSON 化。

参数传递

简单参数
python">from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def add(x, y):return x + y# 使用 delay 方法调用任务,传递参数
result = add.delay(4, 6)

在这个示例中,参数 46 被传递给任务 addCelery 会自动处理这些参数的序列化和反序列化。

复杂参数

如果你需要传递更复杂的参数,例如嵌套的字典或列表,Celery 也能处理这些情况:

python">from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def process_data(data):# 假设 data 是一个字典return data['key1'] + data['key2']# 使用 apply_async 方法调用任务,传递复杂参数
data = {'key1': 10, 'key2': 20}
result = process_data.apply_async((data,))

在这个示例中,data 是一个字典,Celery 会自动将其序列化并传递给任务 process_data

常见错误

Celery 中,如果尝试传递一个 Django 模型对象作为任务参数,而没有设置适当的序列化和反序列化方法,通常会遇到序列化错误。默认情况下,Celery 使用 JSON 作为序列化格式,而 JSON 不支持直接序列化 Django 模型对象。

如果直接传递一个 Django 模型对象作为任务参数,可能会遇到类似以下的错误:

kombu.exceptions.EncodeError: Object of type <YourModel> is not JSON serializable

这个错误表明 Celery 尝试将 Django 模型对象序列化为 JSON,但失败了,因为 JSON 序列化器不知道如何处理 Django 模型对象。

解决方法

  1. 传递模型对象的主键

    • 传递模型对象的主键(或其他简单类型)作为任务参数,然后在任务内部重新获取模型对象。
    python">from celery import Celery
    from myapp.models import MyModelapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
    def process_model_object(model_id):obj = MyModel.objects.get(id=model_id)# 处理对象print(obj)# 调用任务,传递模型对象的主键
    obj = MyModel.objects.first()
    process_model_object.delay(obj.id)
    
  2. 自定义序列化和反序列化

    • 自定义任务参数的序列化和反序列化方法,将模型对象转换为可序列化的格式(如字典)。
    python">from celery import Celery
    from myapp.models import MyModelapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
    def process_model_object(model_data):# 反序列化模型对象obj = MyModel(**model_data)# 处理对象print(obj)# 调用任务,传递模型对象的字典表示
    obj = MyModel.objects.first()
    model_data = {'id': obj.id,'field1': obj.field1,'field2': obj.field2,# 其他字段
    }
    process_model_object.delay(model_data)
    
  3. 使用 Pickle 序列化器

    • Celery 支持多种序列化器,包括 Pickle。Pickle 可以序列化几乎所有 Python 对象,但它有安全风险,不建议在不受信任的环境中使用。
    python">from celery import Celery
    from myapp.models import MyModelapp = Celery('tasks', broker='pyamqp://guest@localhost//')
    app.conf.update(task_serializer='pickle',accept_content=['pickle'],  # Ignore other contentresult_serializer='pickle',
    )@app.task
    def process_model_object(obj):# 处理对象print(obj)# 调用任务,传递模型对象
    obj = MyModel.objects.first()
    process_model_object.delay(obj)
    

总结

  • 传递模型对象的主键:这是最常见和推荐的方法,因为它简单且安全。
  • 自定义序列化和反序列化:适用于需要传递复杂对象的情况。
  • 使用 Pickle 序列化器:虽然方便,但有安全风险,不建议在不受信任的环境中使用。

http://www.ppmy.cn/news/1523332.html

相关文章

Harmony arkTs组件开发:ListItem控件中不能使用多个组件

如下代码&#xff1a; List(){ForEach(this.indexList, (item: number) >{ListItem(){Text("测试")Row(){Image(this.images[item]).width(20).height(20).margin({left:15})Text(this.arr[item])//.width(80%).fontSize(sizes.oneFontSize).fontColor(colors.Tit…

Web 原生组件化方案:Web Components

你好&#xff0c;我是沐爸&#xff0c;欢迎点赞、收藏、评论和关注。 Web 组件化是一种将Web应用的UI部分拆分成可复用的独立组件的架构方法。这种方法有助于提高代码的可维护性、可重用性和可测试性。 而Web Components 标准则提供了一套原生的API&#xff0c;允许开发者创建…

【阿雄不会写代码】全国职业院校技能大赛GZ036第十套

也不说那么多了&#xff0c;要用到这篇博客&#xff0c;肯定也知道他是干嘛的&#xff0c;给博主点点关注点点赞&#xff01;&#xff01;&#xff01;这样博主才能更新更多免费的教程&#xff0c;不然就直接丢付费专栏里了&#xff0c;需要相关文件请私聊

Java进阶13讲__第12讲_2/2

线程安全问题 线程同步方案 线程池 线程通信 理论补充 1. 线程安全问题 1.1 举例说明 1.2 代码实现 package com.itheima.a_线程安全;/* 线程安全:多个线程同时修改同一个资源取钱案例小明和小红是一对夫妻&#xff0c;他们有一个共同的账户&#xff0c;余额是10万元如…

《React Native 应用开发最佳实践》

⭐️React Native 应用开发最佳实践⭐️ 近年来&#xff0c;React Native 应用开发因其能够使用 JavaScript 构建原生移动应用的能力而大受欢迎。它提供了跨平台兼容性、更快的开发时间以及更易于维护的特性&#xff0c;成为了许多开发者的首选。然而&#xff0c;要确保 React…

免费的 Mac 应用清理工具Pearcleaner v3.8.6

免费的 Mac 应用清理工具。这是一款免费开源的 Mac 应用清理工具&#xff0c;能够彻底卸载应用并清理残留文件。它采用 SwiftUI 开发&#xff0c;提供了简单易用的界面&#xff0c;支持右键卸载、迷你模式和 Homebrew 清理等功能。 下载链接&#xff1a;https://pan.quark.cn/s…

计算机网络(三) —— 简单Udp网络程序

目录 一&#xff0c;初始化服务器 1.0 辅助文件 1.1 socket函数 1.2 填充sockaddr结构体 1.3 bind绑定函数 1.4 字符串IP和整数IP的转换 二&#xff0c;运行服务器 2.1 接收 2.2 处理 2.3 返回 三&#xff0c;客户端实现 3.1 UdpClient.cc 实现 3.2 Main.cc 实现 …

前端面试热点题目——typescript篇

在TypeScript面试中&#xff0c;面试官通常会考察你对TypeScript特性的理解、类型系统的掌握、以及在实际项目中的应用能力。以下是一些热点题目及其相应的代码示例&#xff0c;旨在帮助你准备TypeScript相关的面试。 1. 类型别名与接口的区别及使用场景 问题&#xff1a;请解…

react js 笔记 3

起因&#xff0c; 目的: 专注。 学习 react js 的时候&#xff0c; 就专注这一方面 &#xff0c;其他都不要碰。 比如&#xff0c; python, C语言&#xff0c; R, 都不看。 只看 js.专注&#xff0c;减少来回切换。 重复。 自己写的笔记&#xff0c;需要反复多看几遍&#xff…

java开发后端

1.BeanUtils.toBean 方法 它是一个常见的 Java 工具方法&#xff0c;用于将一个 JavaBean 对象转换为另一个 JavaBean 对象 FlowOrderDO flowOrder BeanUtils.toBean(createReqVO, FlowOrderDO.class); 这行代码使用了 BeanUtils.toBean 方法&#xff0c;它是一个常见的 Ja…

MySQL笔记2(DQL查询语言【条件、分组、排序、限制、子查询、左右连接、内连接、联合查询】)

DQL数据查询语言与项目高级查询实战 先安装数据库并创建一个库 并创建以下数据 /*创建部门表*/CREATE TABLE dept( deptnu INT PRIMARY KEY comment 部门编号, dname VARCHAR(50) comment 部门名称, addr VARCHAR(50) comment 部门地址 );/*某个公司的员工表*/ CREATE TABLE…

html备忘录

备忘录 网站收藏数据&#xff1a; 网站收藏.js const webLinks [{ title: "智能翻译", src: "https://fanyi.baidu.com" },{ title: "哔哩哔哩", src: "https://www.bilibili.com" },{ title: "百度一下&#xff0c;你就知道&…

漫谈设计模式 [9]:外观模式

引导性开场 菜鸟&#xff1a;老鸟&#xff0c;我最近在做一个项目&#xff0c;感觉代码越来越复杂&#xff0c;我都快看不懂了。尤其是有好几个子系统&#xff0c;它们之间的调用关系让我头疼。 老鸟&#xff1a;复杂的代码确实让人头疼。你有没有考虑过使用设计模式来简化你…

微信支付开发避坑指南

1 微信支付的坑 1.1 不能用前端传递过来的金额 订单的商品金额要从数据库获取&#xff0c;前端只传商品 id。 1.2 交易类型trade type字段不要传错 v2版API&#xff0c;不同交易类型&#xff0c;要调用的支付方式也不同。 1.3 二次签名 下单时&#xff0c;在拿到预支付交…

记录深度学习量化操作

0. 简介 深度学习中做量化提升运行速度是最常用的方法&#xff0c;尤其是大模型这类非常吃GPU显存的方法。一般是高精度浮点数表示的网络权值以及激活值用低精度&#xff08;例如8比特定点&#xff09;来近似表示达到模型轻量化&#xff0c;加速深度学习模型推理&#xff0c;目…

MySQL表的操作与数据类型

目录 前言 一、表的操作 1.创建一个表 2.查看表的结构 3.修改表 4.删除一个表 二、 MySQL的数据类型 0.数据类型一览&#xff1a; 1.整数类型 2.位类型 3.小数类型 4.字符类型 前言 在MySQL库的操作一文中介绍了有关MySQL库的操作&#xff0c;本节要讲解的是由库管理的结构——…

TinyWebSever源码逐行注释(三)_ thread_pool.cpp

前言 项目源码地址 项目详细介绍 项目简介&#xff1a; Linux下C轻量级Web服务器&#xff0c;助力初学者快速实践网络编程&#xff0c;搭建属于自己的服务器. 使用 线程池 非阻塞socket epoll(ET和LT均实现) 事件处理(Reactor和模拟Proactor均实现) 的并发模型使用状态机…

python基础语法四-数据可视化

书接上回&#xff1a; python基础语法一-基本数据类型 python基础语法二-多维数据类型 python基础语法三-类 1. plot函数绘制简单折线图 (1)需要的模块&#xff1a;matplotlib.pyplot (2)语法&#xff1a;matplotlib.pyplot.plot(x, y, format_string, **kwargs) x: x轴数…

C语言程序设计-练习篇

不知道结果仍义无反顾地才是勇士。 三&#xff0c;打印整数二进制的奇数位和偶数位 题目内容&#xff1a; 获取一个整数二进制序列中所有的奇数位和偶数位&#xff0c;分别打印出二进制序列 #include <stdio.h>//打印整数二进制的奇数位和偶数位 int main() {int i 0…

C语言从头学55——学习头文件errno.h、float.h

1、头文件 errno.h 中的变量 errno 的使用 在 errno.h 定义了一个 int 类型的变量 errno&#xff08;错误码&#xff09;&#xff0c;如果发现这个变量出现非零值&#xff0c;表示已经执行的函数发生了错误。这个变量一般多用于检查数学函数运算过程中发生的错误。 …