【FastAPI】使用 FastAPI 实现后台多任务(BackgroundTasks和Celery两种方案)

server/2024/12/23 4:26:58/

在现代 Web 应用程序中,处理后台任务(如发送邮件、生成报告等)是非常常见的需求。FastAPI 作为一个高效、现代的 Python Web 框架,提供了处理后台任务的简单方式。在这篇文章中,我们将探讨如何使用 FastAPI 内置的 BackgroundTasks 处理简单的后台任务,以及如何通过 Celery 实现分布式的任务处理。

1. 为什么需要后台任务?

有时候我们不希望用户在等待某些繁重的操作时卡住,比如:

  • 处理文件上传和解析
  • 发送大量邮件
  • 执行耗时的数据分析
  • 与外部 API 交互并等待响应

这些任务如果在主线程中完成,会大大增加请求的响应时间,影响用户体验。而后台任务可以在返回响应后继续运行,用户无需等待,系统也能更好地处理这些任务。

2. 使用 FastAPI 内置的 BackgroundTasks

FastAPI 提供了一个内置的工具 BackgroundTasks,它允许我们在处理请求时轻松启动后台任务。BackgroundTasks 可以在返回 HTTP 响应之后执行操作,非常适合轻量级的异步任务。

2.1 安装 FastAPI

首先,确保你安装了 FastAPI 和 Uvicorn:

pip install fastapi uvicorn

2.2 示例:使用 BackgroundTasks

让我们来看一个简单的示例,演示如何使用 BackgroundTasks 执行后台任务。假设我们要在用户请求完成后写一条日志,而不是让用户等待日志写入完成。

from fastapi import FastAPI, BackgroundTasks
import timeapp = FastAPI()# 定义后台任务函数
def write_log(log_message: str):# 模拟耗时任务with open("log.txt", mode="a") as log_file:log_file.write(f"{log_message}\n")time.sleep(5)  # 模拟耗时# 启动后台任务
@app.post("/start-task/")
async def start_background_task(background_tasks: BackgroundTasks, message: str):# 将任务添加到后台任务中background_tasks.add_task(write_log, message)return {"message": "后台任务已启动,正在处理"}# 启动服务: uvicorn main:app --reload

2.3 解释

  • write_log: 这是一个简单的日志写入任务,我们使用 time.sleep(5) 模拟耗时操作。
  • start_background_task: 这个 API 端点启动了后台任务,并立即返回响应,用户无需等待日志写入完成。
  • background_tasks.add_task(write_log, message): 这会将日志任务添加到后台任务队列中。

2.4 测试

启动 FastAPI 应用后,使用以下命令发送 POST 请求:

curl -X 'POST' \'http://127.0.0.1:8000/start-task/' \-H 'Content-Type: application/json' \-d '{"message": "这是一个后台任务"}'

这会立即返回一个响应,但后台任务将继续运行,日志会在 log.txt 文件中写入。

2.5 适用场景

使用 BackgroundTasks 非常适合以下场景:

  • 发送通知或邮件
  • 日志记录
  • 轻量级的数据处理

但是,BackgroundTasks 只适用于单节点应用程序。如果你需要分布式任务队列,或者需要处理更复杂的任务调度,Celery 是更好的选择。

3. 使用 Celery 实现分布式后台任务

Celery 是一个功能强大的分布式任务队列系统,适用于需要更复杂任务处理的场景。它可以与消息队列(如 RabbitMQ 或 Redis)配合使用,以便在多个进程或机器之间分发任务。

3.1 安装 Celery 和 Redis

我们将使用 Redis 作为 Celery 的消息队列。首先,安装 Celery 和 Redis:

pip install celery[redis]

3.2 配置 Celery

接下来,我们需要配置 Celery 应用。创建一个 celery_app.py 文件,并配置 Celery:

from celery import Celery# 配置 Celery,使用 Redis 作为消息队列和结果存储
celery_app = Celery("tasks",broker="redis://localhost:6379/0",  # Redis 作为消息队列backend="redis://localhost:6379/0"  # Redis 作为结果存储
)# 定义异步任务
@celery_app.task
def long_task(message: str):import timetime.sleep(10)  # 模拟耗时任务return f"任务完成,信息: {message}"

3.3 在 FastAPI 中集成 Celery

现在,我们在 FastAPI 应用中集成 Celery。创建一个 main.py 文件:

from fastapi import FastAPI
from celery.result import AsyncResult
from celery_app import long_taskapp = FastAPI()# 启动任务
@app.post("/start-task/")
async def start_task(message: str):task = long_task.delay(message)  # 使用 Celery 异步执行任务return {"task_id": task.id, "message": "任务已提交"}# 查询任务状态
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):task_result = AsyncResult(task_id)if task_result.state == "PENDING":return {"task_id": task_id, "status": "任务处理中"}elif task_result.state == "SUCCESS":return {"task_id": task_id, "status": "任务完成", "result": task_result.result}else:return {"task_id": task_id, "status": task_result.state}

3.4 启动 Celery Worker 和 FastAPI

首先,启动 Celery Worker:

celery -A celery_app worker --loglevel=info

然后启动 FastAPI:

uvicorn main:app --reload

3.5 测试

发送 POST 请求启动任务:

curl -X 'POST' \'http://127.0.0.1:8000/start-task/' \-H 'Content-Type: application/json' \-d '{"message": "这是一个长时间运行的任务"}'

然后可以通过 /task-status/{task_id} 查询任务状态:

curl http://127.0.0.1:8000/task-status/{task_id}

4. 总结

FastAPI 提供了灵活的异步任务处理能力。对于简单的任务,可以使用内置的 BackgroundTasks,它非常适合单节点的轻量级任务处理。如果你的应用需要更复杂的任务调度或者分布式处理,Celery 是一个强大的选择。

什么时候使用 BackgroundTasks

  • 当任务较轻量,且不需要分布式支持时。
  • 简单的任务,例如发送邮件、记录日志等。

什么时候使用 Celery?

  • 当你需要分布式任务队列时。
  • 任务较为复杂且需要任务状态管理或排队处理时。

希望这篇文章能帮助你更好地理解如何使用 FastAPI 来实现后台任务。如果你对更高级的任务处理有兴趣,可以进一步探索 Celery 的任务调度、链式任务等功能。


http://www.ppmy.cn/server/124916.html

相关文章

redis和mysql端口修改

因为之前有过被删库勒索的情况所以,今天记录一下怎么修改端口。 redis 要修改Redis的端口,您需要编辑Redis配置文件,通常名为redis.conf。 找到Redis配置文件: 在Linux系统上,该文件通常位于/etc/redis/redis.conf…

el-table中根据状态改单元格样式

需求&#xff1a;el-table中&#xff0c;有一列需要根据状态不同&#xff0c;显示的样式也不通。 解决方法&#xff1a; :cell-style"cellStyle" <el-table:data"tableData"style"width: 100%"size"mini"border:header-cell-style…

vue + echarts 快速入门

vue echarts 快速入门 本案例即有nodejs和vue的基础&#xff0c;又在vue的基础上整合了echarts Nodejs基础 1、Node简介 1.1、为什么学习Nodejs(了解) 轻量级、高性能、可伸缩web服务器前后端JavaScript同构开发简洁高效的前端工程化 1.2、Nodejs能做什么(了解) Node 打破了…

go dlv idea 远程调试-入门级

一&#xff0c;准备工作 linux 安装dlv git clone https://github.com/go-delve/delve.git $GOPATH/src/github.com/go-delve/delve cd $GOPATH/src/github.com/go-delve/delve make installecho export PATH$PATH:$GOPATH/bin >> ~/.bashrc## 测试是否安装成功 dlv ve…

支持云边协同的「物联网平台+边缘计算底座」

2024年9月20日&#xff0c;工信部发布《工业重点行业领域设备更新和技术改造指南》&#xff0c;旨在指导工业领域设备更新和技术改造工作。该指南设定目标&#xff0c;到2027年完成约200万套工业软件和80万台工业操作系统的更新换代任务。此外&#xff0c;计划实现80%规模以上制…

pdf页面尺寸裁减

1、编辑pdf 2、点击裁减页面&#xff0c;并在空白区域双击裁减 3、输入裁减数据&#xff1a;

C++学习笔记----8、掌握类与对象(一)---- 对象中的动态内存分配(7)

2.4.8、使用Move语法实现Swap函数 move语法提升性能的又一个例子&#xff0c;使用swap()函数交换两个对象。下面的swapCopy()的实现没有使用move语法&#xff1a; void swapCopy(Object& a, Object& b) {Object temp { a };a b;b temp; } 首先&#xff0c;a被拷贝到…

8621 二分查找

**思路&#xff1a;** 1. 读取输入的元素个数 n。 2. 读取有序数组 ST。 3. 读取要查找的关键字 key。 4. 使用折半查找法&#xff08;即二分查找&#xff09;在数组 ST 中查找 key 的位置。 5. 如果找到 key&#xff0c;输出其位置&#xff1b;如果未找到&#xff0c;输出 &qu…