【译】Celery文档1:First Steps with Celery——安装和配置Celery

ops/2025/2/13 0:54:08/

https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html#first-steps

在这里插入图片描述

Celery的第一步

Celery时一个自带电池的任务队列
本教程内容:

  • 安装消息传输代理(broker)
  • 安装Celery并创建第一个任务(task)
  • 启动Celery工作进程(worker)并执行任务
  • 追踪任务的状态

选择Broker

Celery需要一个方法来发送和接受消息,这个方法被称为消息代理(message broker)。
Celery支持多种消息代理,如RabbitMQ、Redis等。

安装RabbitMQ
(推荐)在Dockers上运行RabbitMQ:

docker run -d -p 5672:5672 rabbitmq

或者在Ubuntu上安装RabbitMQ:

sudo apt-get install rabbitmq-server

运行Celery worker server

celery -A tasks worker --loglevel=INFO

Windows下有个坑:celery正常启动和接收任务但不能执行,报错:ValueError: not enough values to unpack (expected 3, got 0)。
需要借助eventlet
1.安装eventlet: pip install eventlet
2.借助eventlet启动celery: celery -A tasks worker --loglevel=INFO -P eventlet
参考1:https://www.cnblogs.com/qumogu/p/13284173.html
参考2:https://stackoverflow.com/questions/37255548/how-to-run-celery-on-windows
但这只是一个临时解决方案, celery对windows的支持很差,最好还是在Linux下运行。windows系统可以用WSL。

调用task

使用delay()方法调用task:

在Python shell中:

python">from tasks import add
add.delay(4, 4)

注:delay()方法是apply_async()方法的快捷方式。

然后,之前启动的worker进程会执行这个任务。可以在worker进程的日志中看到任务的执行情况:

[2024-04-10 21:58:25,217: INFO/MainProcess] Task tasks.add[987d2e18-0090-4b5b-bcb5-bd038b9690a3] received
[2024-04-10 21:58:25,221: INFO/MainProcess] Task tasks.add[987d2e18-0090-4b5b-bcb5-bd038b9690a3] succeeded in 0.0s: 8   

保留结果

如果要跟踪任务的状态, Celery需要将状态存储或发送到某个地方,如SQLAlchemy/Django ORM、MongoDB、Memcached、Redis、RPC(RabbitMQ/AMQP),并且可以自定义。

在此示例中,我们使用 rpc作为结果后端(result backend),它将状态作为暂时性消息发送回。Celery通过 backend参数 指定后端(如果选择使用配置模块,则通过result_backend设置指定)。因此,您可以在 tasks.py 文件中修改此行以启用 rpc:// 后端:

python">app = Celery('tasks', backend='rpc://', broker='pyamqp://')

或者,如果您想使用 Redis 作为结果后端,但仍然使用 RabbitMQ 作为消息代理(一种流行的组合):

python">app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

现在配置了结果后端,关闭当前 python 会话并再次导入 tasks 模块以使更改生效。这一次,您将保留调用任务时返回的 AsyncResult 实例:

python">from tasks import add   
result = add.delay(4, 4)

然后可以用ready()方法检查任务是否完成:

python">result.ready()

您可以等待结果完成,但很少使用,因为这会将异步调用转换为同步调用:

python">result.get(timeout=1)
8

Configuration

Celery就像家用电器一样,不需要太多配置。 只需要配置输入(连接到代理 broker)和输出(连接到结果后端)即可使用。但是,如果你仔细观察,你会发现有很多按钮。这就是配置选项
默认的配置通常是足够的,但是也可以通过修改配置让Celery更适合你的需求。

可以直接在app上修改配置:

python">app.conf.task_serializer = 'json'

如果一次性修改多个配置,可以使用update方法:

python">app.conf.update(task_serializer='json',accept_content=['json'],  # Ignore other contentresult_serializer='json',timezone='Europe/Oslo',enable_utc=True,
)

对于较大的项目,建议使用专用的配置模块。可以用app.config_from_object()告诉 Celery 使用配置模块:

python">app.config_from_object('celeryconfig')

配置模块名称通常是celeryconfig

该模块必须在当前目录可以访问,
celeryconfig.py:

python">broker_url = 'pyamqp://'
result_backend = 'rpc://'task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

要验证配置文件是否正常工作且不包含任何语法错误,可以尝试导入它:
python -m celeryconfig

下面是两个配置示例:
将行为异常的任务路由到专用队列的方式

python">task_routes = {'tasks.add': 'low-priority',
}

对任务进行速率限制

python">task_annotations = {'tasks.add': {'rate_limit': '10/m'}
}

http://www.ppmy.cn/ops/30280.html

相关文章

Java设计模式 _结构型模式_桥接模式

一、桥接模式 1、桥接模式 桥接模式(Bridge Pattern)是一种结构型设计模式。用于把一个类中多个维度的抽象化与实现化解耦,使得二者可以独立变化。 2、实现思路 使用桥接模式,一定要找到这个类中两个变化的维度:如支…

【leetcode】缓存淘汰策略题目总结

146. LRU 缓存 我们使用了一个双向链表cache来存储数据,同时使用一个哈希表hash_map来映射键和链表节点的迭代器。当调用get(key)函数时,我们首先检查hash_map中是否存在该key,如果存在则将之前位置移到链表头部并返回值;当调用put(key, value)函数时,我们先检查hash_map…

深度学习避坑指南

安装设置了CUDA,但在anaconda的虚拟环境中,CUDA_HOME显示未设置,torch.cuda.is_available()输出未False。 原因:Cuda和torch版本不匹配。 解决办法:检查环境变量,并安装于CUDA适配的torch

前端请求没问题,后端正常运行,但查不出数据

写代码时写得快了些,Orders.的订单状态写错了CONFIRMED 改成COMPLETED

简单数据加解密,JS和JAVA同时实现

前端Vue调用Java后端接口中的数据进行加密,以避免敏感数据泄露。 现在实现一个高性能加密方法,用来对数据进行加密后传输。算法包括JS的加密和解密方法,也包括Java的加密解密方法。 可以在前端加密,后端解密。也可以在后端加密&…

vector的使用

1.构造函数 void test_vector1() {vector<int> v; //无参的构造函数vector<int> v2(10, 0);//n个value构造&#xff0c;初始化为10个0vector<int> v3(v2.begin(), v2.end());//迭代器区间初始化,可以用其他容器的区间初始化vector<int> v4(v3); //拷贝…

三种修改 Docker 镜像默认存储位置的方法

由于系统初始分区的原因&#xff0c;导致操作系统中对应 / 分区不会太大&#xff0c;通过 /var 目录不会单独分区。如果上面运行 Docker 服务&#xff0c;经过长时间的使用&#xff0c;会使原本就比较大的分区越来越不够用。如何更好地的处理这个问题呢&#xff1f; 1. 使用软…

labview强制转换的一个坑

32位整形强制转换成枚举的结果如何&#xff1f; 你以为的结果是 实际上的结果是 仔细看&#xff0c;枚举的数据类型是U16&#xff0c;"1"的数据类型是U32&#xff0c;所以转换产生了不可预期的结果。所以使用强制转换时一定要保证两个数据类型一致&#xff0c;否则…