构建高性能异步任务引擎:FastAPI + Celery + Redis

server/2024/12/24 2:24:01/

在现代应用开发中,异步任务处理是一个常见的需求。无论是数据处理、图像生成,还是复杂的计算任务,异步执行都能显著提升系统的响应速度和吞吐量。今天,我们将通过一个实际项目,探索如何使用 FastAPICeleryRedis 构建一个高性能的异步任务引擎。

项目背景

技术栈介绍

  • FastAPI:一个现代、高性能的 Web 框架,基于 Python 3.7+ 的异步编程特性构建。它支持自动生成 OpenAPI 文档和 Swagger UI,能够快速构建 RESTful API,并且具有极低的延迟和高并发处理能力。
  • Celery:一个分布式任务队列系统,主要用于处理异步任务和定时任务。它支持多种消息传输机制,能够将任务分发到多个工作节点上并行处理,从而提高系统的吞吐量和响应速度。
  • Redis:一个高性能的键值存储系统,常用于缓存、消息队列和分布式锁等场景。在 Celery 中,Redis 通常作为消息代理(Broker)和结果存储(Backend),负责任务的分发和结果的持久化。

项目目标

通过 FastAPI、Celery 和 Redis 的结合,构建一个能够高效处理用户提交的 Python 代码的异步任务引擎。用户可以通过 API 提交代码,系统会异步执行代码,并返回任务的执行结果。

项目目录结构

project/
├── main.py
├── utils.py
├── schemas.py
└── app/├── __init__.py├── config.py└── tasks/├── __init__.py└── tasks.py

代码功能深度解析

1. main.py:FastAPI 应用的核心

main.py 是项目的核心入口文件,负责定义 FastAPI 应用的接口和逻辑。

FastAPI 应用初始化
python">app = FastAPI(title="Async Task API", description="", version="1.0.0")

这里我们创建了一个 FastAPI 应用,命名为 Async Task API,版本为 1.0.0

自定义 Swagger UI
python">def swagger_monkey_patch(*args, **kwargs):return get_swagger_ui_html(*args,**kwargs,swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui-bundle.js",swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui.min.css",)
applications.get_swagger_ui_html = swagger_monkey_patch

通过 Monkey Patch 的方式,我们自定义了 Swagger UI 的资源加载路径,使用了国内的 CDN 加速资源,提升文档加载速度。

全局异常处理
python">@app.exception_handler(Exception)
def validation_exception_handler(request, err):base_error_message = f"Failed to execute: {request.method}: {request.url}"return JSONResponse(status_code=400, content={"message": f"{base_error_message}. Detail: {err}"})

我们定义了一个全局异常处理器,捕获所有未处理的异常,并返回一个包含错误信息的 JSON 响应。

HTTP 中间件:计算请求处理时间
python">@app.middleware("http")
async def add_process_time_header(request, call_next):start_time = time.time()response = await call_next(request)process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(f'{process_time:0.4f} sec')return response

这个中间件用于计算每个请求的处理时间,并将处理时间添加到响应头 X-Process-Time 中,方便调试和性能优化。

创建任务的 API
python">@app.post('/tasks')
def create_pytask(task: schemas.PyTask):code = task.codetime_limit = task.time_limitexpires = task.expiresresult = execute_python_code.apply_async(args=(code,), time_limit=time_limit, expires=expires)return JSONResponse(content={"task_id": result.id})

用户可以通过 /tasks 接口提交 Python 代码,代码会被异步执行。任务的执行结果可以通过 /tasks/{task_id} 接口查询。

查询任务结果的 API
python">@app.get('/tasks/{task_id}', response_model=schemas.PyTaskResult)
def get_task_result(task_id: str):return get_task_info(task_id)

用户可以通过 /tasks/{task_id} 接口查询任务的执行结果和状态。

2. utils.py:任务信息获取工具

utils.py 文件定义了一个工具函数 get_task_info,用于获取 Celery 任务的状态和结果。

python">def get_task_info(task_id):task_result = AsyncResult(task_id, app=app)result = {"task_id": task_id,"task_status": task_result.status,"task_result": task_result.result}return result

通过 AsyncResult,我们可以获取任务的当前状态(如 PENDINGSUCCESSFAILURE 等)和执行结果。

3. schemas.py:数据模型定义

schemas.py 文件定义了 Pydantic 模型,用于验证和序列化请求和响应的数据。

任务请求模型
python">class PyTask(BaseModel):code: strexpires: Optional[int] = Nonetime_limit: Optional[int] = None

用户提交的任务请求包含以下字段:

  • code: 任务的 Python 代码。
  • expires: 任务的过期时间(可选)。
  • time_limit: 任务的时间限制(可选)。
任务结果模型
python">class PyProgramResult(BaseModel):status: stroutput: Optional[str] = Noneerror: Optional[str] = None

任务的执行结果包含以下字段:

  • status: 任务的执行状态(如 successfailure)。
  • output: 任务的标准输出(可选)。
  • error: 任务的错误输出(可选)。
任务结果响应模型
python">class PyTaskResult(BaseModel):task_id: strtask_status: strtask_result: Optional[PyProgramResult] = None

任务的查询结果包含以下字段:

  • task_id: 任务的 ID。
  • task_status: 任务的状态(如 PENDINGSUCCESS 等)。
  • task_result: 任务的执行结果(可选)。

4. app/__init__.py:Celery 应用初始化

app/__init__.py 文件是 Celery 应用的初始化文件,主要用于配置 Celery 应用和任务的自动发现。

创建 Celery 应用
python">app = Celery('my_celery_project')

我们创建了一个名为 my_celery_project 的 Celery 应用。

加载配置
python">app.config_from_object('app.config')

app.config 文件中加载 Celery 的配置。

自动发现任务
python">app.autodiscover_tasks(['app.tasks'])

自动发现 app.tasks 模块中的任务。

Worker 和 Beat 初始化
python">@worker_init.connect
def worker_initialization(**kwargs):print("Worker 初始化开始")@beat_init.connect
def beat_initialization(**kwargs):print("Beat 初始化开始")

定义了 Worker 和 Beat 的初始化函数,分别在 Worker 和 Beat 启动时执行。

5. app/config.py:Celery 配置

app/config.py 文件定义了 Celery 的配置。

消息代理和结果存储
python">broker_url = 'redis://:redisisthebest@redis:6379/0'
result_backend = 'redis://:redisisthebest@redis:6379/0'

使用 Redis 作为消息代理和结果存储。

任务结果过期时间
python">result_expires = 3600

任务结果在 Redis 中保存 1 小时后过期。

序列化配置
python">task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

使用 JSON 作为任务和结果的序列化格式。

时区配置
python">timezone = 'Asia/Shanghai'
enable_utc = True

设置时区为 Asia/Shanghai,并启用 UTC 时间。

6. app/tasks/tasks.py:任务执行逻辑

app/tasks/tasks.py 文件定义了一个 Celery 任务 execute_python_code,用于执行用户提交的 Python 代码。

python">@app.task
def execute_python_code(code_string):temp_file = "temp_code.py"with open(temp_file, "w") as f:f.write(code_string)try:result = subprocess.run(["python3", temp_file],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)if result.stderr:return {"status": "failure", "error": result.stderr}else:return {"status": "success", "output": result.stdout}finally:if os.path.exists(temp_file):os.remove(temp_file)

该任务将用户提交的代码字符串保存为临时文件,然后使用 subprocess.run 执行该文件,捕获标准输出和错误输出。如果执行成功,返回 success 状态和标准输出;如果执行失败,返回 failure 状态和错误输出。最后,删除临时文件。

部署分析

version: '3.8'services:fastapi:image: lab:python-packagescontainer_name: fastapiports:- 8080:8080volumes:- D:\dockerMount\code\celery:/home/codeworking_dir: /home/codecommand: python3 main.pyrestart: unless-stoppednetworks:- mynetcelery-worker:image: lab:python-packagescontainer_name: celery-workervolumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app worker --concurrency=4 --loglevel=inforestart: unless-stoppednetworks:- mynetcelery-flower:image: lab:python-packagescontainer_name: celery-flowerports:- 5555:5555volumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app flower --port=5555restart: unless-stoppednetworks:- mynetredis:image: bitnami/redis:7.2.4-debian-12-r16container_name: redisenvironment:- REDIS_PASSWORD=redisisthebestnetworks:- mynetnetworks:mynet:external: false

在这个 Docker Compose 配置中,我们定义了三个服务:

  • fastapi:FastAPI 应用,负责接收用户请求并分发任务。
  • celery-worker:Celery 工作节点,负责执行异步任务。
  • celery-flower:Celery 的监控工具,提供任务执行的可视化界面。
  • redis:Redis 服务,作为 Celery 的消息代理和结果存储。

代码的功能和价值

功能

  1. 异步任务执行

    • 用户可以通过 /tasks 接口提交 Python 代码,代码会被异步执行。
    • 任务的执行结果可以通过 /tasks/{task_id} 接口查询。
  2. 任务状态管理

    • 任务的状态(如 PENDINGSUCCESSFAILURE 等)可以通过 /tasks/{task_id} 接口查询。
  3. 高性能和可扩展性

    • 使用 FastAPI 和 Celery 构建的异步任务引擎能够处理高并发的任务请求。
    • Celery 的分布式特性使得系统可以轻松扩展以应对更多的任务。
  4. 安全性

    • 通过设置 time_limitexpires,可以限制任务的执行时间和过期时间,防止恶意代码的长时间执行。
  5. 易用性

    • FastAPI 自动生成的 Swagger UI 使得 API 的使用和调试更加方便。
    • Pydantic 模型确保了请求和响应数据的类型安全。

价值

  1. 高效的任务处理

    • 该系统能够高效地处理大量异步任务,适用于需要异步执行代码的场景,如在线代码执行、数据处理、图像处理等。
  2. 可扩展性

    • 通过 Celery 的分布式任务队列,系统可以轻松扩展以处理更多的任务,适合高并发场景。
  3. 安全性

    • 通过限制任务的执行时间和过期时间,系统能够有效防止恶意代码的滥用。
  4. 易用性

    • FastAPI 和 Pydantic 的结合使得 API 的开发和维护更加简单,同时提供了自动生成的文档和类型检查。
  5. 灵活性

    • 系统支持自定义任务的执行逻辑,可以根据业务需求扩展任务类型和功能。

总结

通过 FastAPI、Celery 和 Redis 的结合,我们构建了一个高性能、可扩展的分布式异步任务引擎。它能够高效地处理用户提交的 Python 代码,并提供任务状态查询功能。该系统适用于需要异步执行代码的场景,具有高效、安全、易用和灵活的特点。

无论是构建一个在线代码执行平台,还是处理复杂的计算任务,这个项目都为你提供了一个强大的基础。希望这篇文章能为你带来启发,让你在异步任务处理的道路上走得更远!

附图

发送任务
在这里插入图片描述
查询结果
在这里插入图片描述


http://www.ppmy.cn/server/152636.html

相关文章

go语言学习之错误记录-1、GOPROXY

go语言下载包时报错,更改代码仓库下载地址 go: golang.org/x/netv0.0.0-20210929193557-e81a3d93ecf6: Get “https://proxy.golang.org/golang.org/x/net/v/v0.0.0-20210929193557-e81a3d93ecf6.mod”: dial tcp 142.250.217.113:443: connectex: A connection at…

STM32中ADC模数转换器

一、ADC简介 ADC模拟-数字转换器 ADC可以将引脚连续变化的模拟电压转换为内存中存储的数字变量,建立模拟电路到数字电路的桥梁 12位逐次逼近型ADC,1us转换时间 输入电压范围: 0~3.3V,转换结果范围:0~4095 18个输入…

【大数据】Flink + Kafka 实现通用流式数据处理详解

目录 一、前言 二、流式数据处理场景介绍 2.1 流式数据处理概述 2.1.1 流式数据处理场景介绍 2.2 流式数据处理技术栈 2.2.1 数据采集 2.2.2 数据处理 2.2.3 数据存储 2.2.4 数据展示 2.3 流式数据处理场景面临的问题和挑战 三、通用的流式数据处理场景解决方案 3.1…

【报错】node:internal/modules/cjs/loader:936

报错问题: 当执行npm run dev后,出现下面错误 这个错误一般是由于Node.js无法找到所需的模块而引起的,解决此问题的一种方法就是重新安装所需的模块。 解决办法: 删除npm install 所下载在项目里的node_modules文件执行操作&…

Android Studio版本升级那些事

Android Studio版本升级那些事 文章目录 Android Studio版本升级那些事一、前言二、Android Studio版本相关知识1、Android13 签名应用无法在Android Studio 编译运行解决(1)无法编译运行前的尝试 2、Android Studio 的历史版本介绍4、Android Studio Gr…

二百八十、ClickHouse——用Kettle对DWD层补全的清洗数据进行记录

一、目的 在对DWD层清洗数据进行补全后,需要生成相应的补全记录,作为数据的标记 二、实施步骤 2.1 建表 create table if not exists hurys_jw.dwd_data_correction_record(data_type Int32 comment 数据类型 1:转向比,2:统计,3:评价,4…

解锁移动设备管理新技能-RayLink远程控制手机

在这个忙碌的现代社会中,智能手机已经成为我们生活的重要组成部分,它们不再仅仅是通讯工具,而是我们日常生活的核心。随着这种变化,远程控制手机的技术应运而生,为我们开启了一个全新的移动设备管理时代。今天&#xf…

websocket 局域网 webrtc 一对一 多对多 视频通话 的示例

基本介绍 WebRTC(Web Real-Time Communications)是一项实时通讯技术,它允许网络应用或者站点,在不借助中间媒介的情况下,建立浏览器之间点对点(Peer-to-Peer)的连接,实现视频流和&am…