在现代 Web 应用程序中,处理后台任务(如发送邮件、生成报告等)是非常常见的需求。FastAPI 作为一个高效、现代的 Python Web 框架,提供了处理后台任务的简单方式。在这篇文章中,我们将探讨如何使用 FastAPI 内置的 BackgroundTasks
处理简单的后台任务,以及如何通过 Celery 实现分布式的任务处理。
1. 为什么需要后台任务?
有时候我们不希望用户在等待某些繁重的操作时卡住,比如:
- 处理文件上传和解析
- 发送大量邮件
- 执行耗时的数据分析
- 与外部 API 交互并等待响应
这些任务如果在主线程中完成,会大大增加请求的响应时间,影响用户体验。而后台任务可以在返回响应后继续运行,用户无需等待,系统也能更好地处理这些任务。
2. 使用 FastAPI 内置的 BackgroundTasks
FastAPI 提供了一个内置的工具 BackgroundTasks
,它允许我们在处理请求时轻松启动后台任务。BackgroundTasks
可以在返回 HTTP 响应之后执行操作,非常适合轻量级的异步任务。
2.1 安装 FastAPI
首先,确保你安装了 FastAPI 和 Uvicorn:
pip install fastapi uvicorn
2.2 示例:使用 BackgroundTasks
让我们来看一个简单的示例,演示如何使用 BackgroundTasks
执行后台任务。假设我们要在用户请求完成后写一条日志,而不是让用户等待日志写入完成。
from fastapi import FastAPI, BackgroundTasks
import timeapp = FastAPI()# 定义后台任务函数
def write_log(log_message: str):# 模拟耗时任务with open("log.txt", mode="a") as log_file:log_file.write(f"{log_message}\n")time.sleep(5) # 模拟耗时# 启动后台任务
@app.post("/start-task/")
async def start_background_task(background_tasks: BackgroundTasks, message: str):# 将任务添加到后台任务中background_tasks.add_task(write_log, message)return {"message": "后台任务已启动,正在处理"}# 启动服务: uvicorn main:app --reload
2.3 解释
write_log
: 这是一个简单的日志写入任务,我们使用time.sleep(5)
模拟耗时操作。start_background_task
: 这个 API 端点启动了后台任务,并立即返回响应,用户无需等待日志写入完成。background_tasks.add_task(write_log, message)
: 这会将日志任务添加到后台任务队列中。
2.4 测试
启动 FastAPI 应用后,使用以下命令发送 POST 请求:
curl -X 'POST' \'http://127.0.0.1:8000/start-task/' \-H 'Content-Type: application/json' \-d '{"message": "这是一个后台任务"}'
这会立即返回一个响应,但后台任务将继续运行,日志会在 log.txt
文件中写入。
2.5 适用场景
使用 BackgroundTasks
非常适合以下场景:
- 发送通知或邮件
- 日志记录
- 轻量级的数据处理
但是,BackgroundTasks
只适用于单节点应用程序。如果你需要分布式任务队列,或者需要处理更复杂的任务调度,Celery
是更好的选择。
3. 使用 Celery 实现分布式后台任务
Celery 是一个功能强大的分布式任务队列系统,适用于需要更复杂任务处理的场景。它可以与消息队列(如 RabbitMQ 或 Redis)配合使用,以便在多个进程或机器之间分发任务。
3.1 安装 Celery 和 Redis
我们将使用 Redis 作为 Celery 的消息队列。首先,安装 Celery 和 Redis:
pip install celery[redis]
3.2 配置 Celery
接下来,我们需要配置 Celery 应用。创建一个 celery_app.py
文件,并配置 Celery:
from celery import Celery# 配置 Celery,使用 Redis 作为消息队列和结果存储
celery_app = Celery("tasks",broker="redis://localhost:6379/0", # Redis 作为消息队列backend="redis://localhost:6379/0" # Redis 作为结果存储
)# 定义异步任务
@celery_app.task
def long_task(message: str):import timetime.sleep(10) # 模拟耗时任务return f"任务完成,信息: {message}"
3.3 在 FastAPI 中集成 Celery
现在,我们在 FastAPI 应用中集成 Celery。创建一个 main.py
文件:
from fastapi import FastAPI
from celery.result import AsyncResult
from celery_app import long_taskapp = FastAPI()# 启动任务
@app.post("/start-task/")
async def start_task(message: str):task = long_task.delay(message) # 使用 Celery 异步执行任务return {"task_id": task.id, "message": "任务已提交"}# 查询任务状态
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):task_result = AsyncResult(task_id)if task_result.state == "PENDING":return {"task_id": task_id, "status": "任务处理中"}elif task_result.state == "SUCCESS":return {"task_id": task_id, "status": "任务完成", "result": task_result.result}else:return {"task_id": task_id, "status": task_result.state}
3.4 启动 Celery Worker 和 FastAPI
首先,启动 Celery Worker:
celery -A celery_app worker --loglevel=info
然后启动 FastAPI:
uvicorn main:app --reload
3.5 测试
发送 POST 请求启动任务:
curl -X 'POST' \'http://127.0.0.1:8000/start-task/' \-H 'Content-Type: application/json' \-d '{"message": "这是一个长时间运行的任务"}'
然后可以通过 /task-status/{task_id}
查询任务状态:
curl http://127.0.0.1:8000/task-status/{task_id}
4. 总结
FastAPI 提供了灵活的异步任务处理能力。对于简单的任务,可以使用内置的 BackgroundTasks
,它非常适合单节点的轻量级任务处理。如果你的应用需要更复杂的任务调度或者分布式处理,Celery 是一个强大的选择。
什么时候使用 BackgroundTasks
?
- 当任务较轻量,且不需要分布式支持时。
- 简单的任务,例如发送邮件、记录日志等。
什么时候使用 Celery?
- 当你需要分布式任务队列时。
- 任务较为复杂且需要任务状态管理或排队处理时。
希望这篇文章能帮助你更好地理解如何使用 FastAPI 来实现后台任务。如果你对更高级的任务处理有兴趣,可以进一步探索 Celery 的任务调度、链式任务等功能。