Flask使用线程异步执行耗时任务

news/2025/2/19 8:01:02/

1 问题说明

1.1 任务简述

在开发Flask应用中一定会遇到执行耗时任务,但是Flask是轻量级的同步框架,即在单个请求时服务会阻被塞,直到任务完成(注意:当前请求被阻塞不会影响到其他请求)。

解决异步问题有两种思路,一种是借助外部工具实现异步,例如:消息队列(RabbitMQ)、 异步任务队列(Celery+Redis);另一种借助Python中的进程、线程或协程解决异步。我的是小项目选择因此选择了第二种方法。

经过测试,我在Flask中使用协程(gevent)会被阻塞;使用进程(multiprocessing)不会被阻塞,操作数据库出了问题(有可能是我没操作正确的问题);最后选择使用线程(threading)。

注意:使用线程会出现线程安全问题,

1.2 注意的问题

(1)线程安全

"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中# 2.1 直接使用Session是线程不安全的,不推荐使用self.session = Session(self.engine)# 2.2 直接使用scoped_session是线程安全的,推荐使用# 获取sessionmakersession_factory = sessionmaker(engine)# scoped_session是线程安全的# 注意,此session不能使用Query对象session = scoped_session(session_factory)!!!
"""

(2)使用数据库

# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):
"""
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
"""

2 工程布局

项目布局如下:
在这里插入图片描述

3 源代码

(1)main.py

from blueprint import init_blueprint
from config_app import app
from config_db import init_mysql_db# 初始化MySQL
init_mysql_db()init_blueprint()if __name__ == '__main__':app.run(host='0.0.0.0', debug=True)

(2)config_app.py

from flask import Flask
from flask_cors import CORSdef create_app():flask_app = Flask(__name__)CORS(flask_app, supports_credentials=True)return flask_appapp = create_app()

(3)config_db.py


from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow# 添加pymysql驱动,连接MySQL数据库
import pymysqlfrom config_app import apppymysql.install_as_MySQLdb()"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中# 2.1 直接使用Session是线程不安全的,不推荐使用self.session = Session(self.engine)# 2.2 直接使用scoped_session是线程安全的,推荐使用# 获取sessionmakersession_factory = sessionmaker(engine)# scoped_session是线程安全的# 注意,此session不能使用Query对象session = scoped_session(session_factory)!!!
"""# 创建MySQL单实例
mysql_db = SQLAlchemy()# 创建Schema
mysql_schema = Marshmallow()# 创建数据库
"""
create database async default character set utf8mb4 collate utf8mb4_unicode_ci;
"""class MysqlConf:acc = "root"pwd = "123456"host = "192.168.108.200"port = 3306db = "async"mysql_conf = MysqlConf()# 初始化MySQL数据库
def init_mysql_db():# 配置MySQL数据库urldb_url = "mysql://" + mysql_conf.acc + ":" + mysql_conf.pwd + "@" + mysql_conf.host + ":" + str(mysql_conf.port) + "/" + mysql_conf.dbapp.config["SQLALCHEMY_DATABASE_URI"] = db_url# 关闭sqlalchemy自动跟踪数据库app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False# 显示底层执行的SQL语句app.config['SQLALCHEMY_ECHO'] = True# 解决‘No application found. Either work inside a view function or push an application context.’app.app_context().push()# 初始化appmysql_db.init_app(app)# 初始化schemamysql_schema.init_app(app)# 初始化table
def init_table():# 删除表mysql_db.drop_all()# 创建表mysql_db.create_all()

(4)dao.py

import datetimefrom config_db import mysql_db as db# Create table of bm_record
class User(db.Model):# Record table__tablename__ = "as_user"id = db.Column("us_id", db.Integer, nullable=False, primary_key=True, autoincrement=True)name = db.Column("us_name", db.String(100))age = db.Column("us_age", db.Integer)create_time = db.Column("us_create_time", db.DateTime, default=datetime.datetime.now)# 插入数据
def insert_record_dao(user: User):# Add datadb.session.add(user)# Commit datadb.session.commit()

(5)blueprint.py

# 构建蓝本
import threading
import time
from concurrent.futures import ThreadPoolExecutorfrom flask import Blueprint, jsonifyfrom config_app import app
from config_db import init_table
from dao import insert_record_dao, Useruser = Blueprint("user", __name__)# 注册蓝本
def init_blueprint():app.register_blueprint(user, url_prefix='/user')@user.route("/initdb")
def init_db():init_table()return jsonify("success")@user.route("/add")
def add_user():user: User = User()user.name = "zhangsan"user.age = 12time.sleep(10)insert_record_dao(user)return jsonify(user.id)executor = ThreadPoolExecutor(3)
@user.route("/thread_pool")
def run_task_thread_pool():executor.submit(_run_thread_pool)return jsonify("success")# 执行线程
def _run_thread_pool():print("Run thread pool")time.sleep(2)user: User = User()user.name = "thread pool"user.age = 12# 注意:必须使用”with app.app_context()“,否则无法插入数据库,并且不会报错with app.app_context():insert_record_dao(user)print("End thread pool")pass@user.route("/thread")
def run_task_thread():task = threading.Thread(target=_run_thread, name='thread-01')task.start()return jsonify("success")# 执行线程
def _run_thread():print("Run thread")time.sleep(2)user: User = User()user.name = "thread"user.age = 12# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):"""This typically means that you attempted to use functionality that neededthe current application. To solve this, set up an application contextwith app.app_context(). See the documentation for more information."""with app.app_context():insert_record_dao(user)print("End thread")pass

4 请求截图

(1)初始化数据库

在这里插入图片描述

(2)添加用户

在这里插入图片描述

(3)使用线程池

在这里插入图片描述

(4)使用线程

在这里插入图片描述


http://www.ppmy.cn/news/1255545.html

相关文章

1.1、Autosar_CP软件集群设计与集成指南说明

目录 前言 1、介绍 1.1、目标 1.2、范围 2、缩略语 3、相关文档

读书笔记:《Effective Modern C++(C++14)》

Effective Modern C(C14) GitHub - CnTransGroup/EffectiveModernCppChinese: 《Effective Modern C》- 完成翻译 Deducing Types 模版类型推导: 引用,const,volatile被忽略数组名和函数名退化为指针通用引用&#…

DevEco Studio 调整开发工具中的字体大小与行高

我们打开编辑器 选择 左上角 File 下的 Settings 将左侧菜单栏 编辑 展开 我们在编辑下面 选择 Font 然后 如下图指向的两个位置 我们可以调整它的字体大小和行高 设置好之后 右下角 点击 Apply 应用 然后点击 OK即可 当然 你按着 Ctrl 然后鼠标滚动 也可以像浏览器那样 拉…

SSM项目实战-登录验证成功并路由到首页面,Vue3+Vite+Axios+Element-Plus技术

1、util/request.js import axios from "axios";let request axios.create({baseURL: "http://localhost:8080",timeout: 50000 });export default request 2、api/sysUser.js import request from "../util/request.js";export const login (…

【鸿蒙应用ArkTS开发系列】- 选择图片、文件和拍照功能实现

文章目录 前言创建多媒体Demo工程创建MediaBean 实体类创建MediaHelper工具类API标记弃用问题动态申请多媒体访问权限实现选择图片显示功能打包测试 前言 在使用App的时候,我们经常会在一些社交软件中聊天时发一些图片或者文件之类的多媒体文件,那在鸿蒙…

西南科技大学模拟电子技术实验四(集成运算放大器的线性应用)预习报告

一、计算/设计过程 说明:本实验是验证性实验,计算预测验证结果。是设计性实验一定要从系统指标计算出元件参数过程,越详细越好。用公式输入法完成相关公式内容,不得贴手写图片。(注意:从抽象公式直接得出结果,不得分,页数可根据内容调整) 反相比例运算电路(1)实验…

【读书笔记】微习惯

周日晚上尝试速读一本书《微习惯》,共七章看了下目录结构并不复杂,计划每章7-8分钟读完, 从20:15-21:00。读的时候,订下闹钟,催促着自己的进度。边读边记了一些要点和微信读书里面的划线。 第六章实践内容最为丰富&…

12.1平衡树(splay),旋转操作及代码

平衡树 变量定义 tot表示结点数量,rt表示根的编号 v[i]表示结点i的权值 fa[i]表示结点i的父亲节点 chi[i][2]表示结点i的左右孩子 cnt[i]表示结点i的权值存在数量,如1123,v[3]1,则cnt[3]2;就是说i3的三号结点的权值为1&…