Django项目异步改造--Celery

news/2024/11/24 8:05:33/

Celery 是一个简单,灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护该系统所需的工具。这是一个任务队列,着重于实时处理,同时还支持任务调度。

Celery 通过消息进行通信,通常使用经纪人在 clients 和 workers 之间进行调解。要启动一个任务,客户端会在队列中放入一条消息,然后经纪人将消息传递给工人。

一个 Celery 系统可以由多个 worker 和 broker 组成,从而实现高可用性和横向扩展。

在web 项目中,通常因为服务器http超时时间的限制,任务无法在规定时间完成,这就需要使用异步来实现。以下就如何使用celery 改造现有Django项目。

1、安装依赖

pip install celery redis

pip 安装的redis 无法在项目中使用,因为需要在环境中安装,方法如下:

wget http://download.redis.io/releases/redis-3.2.8.tar.gz -o /home/admin/package/
cd /home/admin/package/ 
tar -zxvf redis-3.2.8.tar.gz 
mv ./redis-3.2.8 /usr/local/redis/ 
cd /usr/local/redis/ 
make 
make install 
cd utils 
echo | /bin/bash install_server.sh  

2、配置celery

2.1、settings.py

在 Django 项目的 settings.py 文件中添加以下配置:

# 配置 Celery
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'# 配置 redis
REDIS_HOST = [("127.0.0.1", 6379)]
REDIS_PASSWORD = ""

2.2、项目主目录下配置

2.2.1、__init__.py

#coding=utf-8
from __future__ import absolute_import, unicode_literalsfrom django.conf import settings
from .celery import app as celery_app
from ._redis import TaskRedisredis_client = TaskRedis(settings.REDIS_HOST,settings.REDIS_PASSWORD,db=1
)__all__ = ['celery_app']

2.2.2、新建celery 配置文件celery.py

#!/usr/bin/env python
# coding=utf-8
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platformsfrom django.conf import settingsfrom celery.schedules import crontab
from datetime import timedeltaos.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名称.settings')
platforms.C_FORCE_ROOT = True
app = Celery('项目名称', backend='redis://localhost:6379/1', broker='redis://localhost:6379/0')# 将settings对象作为参数传入
app.config_from_object('django.conf:settings', )# celery 自动发现任务: APP/tasks.py
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)# 设置定时参数
app.conf.update(# 并发数CELERYD_CONCURRENCY=4,# 每个worker 执行多少任务就killCELERYD_MAX_TASKS_PER_CHILD=50,# celery 路径CELERY_ROUTES={"项目名称.tasks.add": {"queue": "q1"},  # 把add任务放入q1队列},# celery beat 调度CELERYBEAT_SCHEDULE={"asy-update-info": {"task": "get_info","schedule": crontab(minute=0,hour=1),                       # 定时同步aone信息},# 定时执行    #    "schedule": crontab(minute=0, hour=3),  # 每日早上3点#    "schedule": crontab(minute=0, hour=1),#    "schedule": crontab(minute=0, hour=1), #    "schedule": crontab(minute=0, hour="*/1") # 每隔1小时执行一次#    "schedule": crontab(minute=0, hour="*/12") # 每隔十小时执行一次#    "schedule": crontab(minute=0, hour=3) # 每日凌晨3点#    "schedule": crontab(minute=0, hour=3, day_of_week=2) # 每周二凌晨3点#    "schedule": crontab(minute=0, hour=3, day_of_week=1)  # 每周一凌晨3点#    "schedule": crontab(minute=0, hour=3, day_of_week=2) # 每周二凌晨3点#    "schedule": crontab(minute=0, hour=3, day_of_week=2) # 每周二凌晨3点#    "schedule": crontab(minute=0, hour=4, day_of_week='1,4')  # 每周一、四凌晨4点#     
)@app.task(bind=True)
def debug_task(self):print('Request: {0!r}'.format(self.request))

2.2.3、项目目录下创建redis存储的队列信息方法文件_redis.py

#coding:utf-8import datetime
import jsonfrom redis import Redis, ConnectionPoolclass RedisInit(object):def __init__(self, redis_config, password, master=None, connect_type='direct', db=0):""":param redis_config::param password::param master::param connect_type: 连接方式 direct: 直连 sentinel: 哨兵"""self.redis_config = redis_configself.password = passwordself.master = masterself.connect_type = connect_typeself.redis = Noneself.host, self.port = self.redis_config[0]self.db = dbself._connect()def _connect(self):# 直接if self.connect_type == 'direct':self.pool = ConnectionPool(host=self.host, port=self.port, password=self.password, db=self.db,max_connections=200, socket_keepalive=True, decode_responses=True)self.redis = Redis(connection_pool=self.pool)# 哨兵else:# 哨兵连接池self.sentinel = Sentinel(self.redis_config, password=self.password, db=self.db,socket_keepalive=True, decode_responses=True)self.redis = self.sentinel.master_for(self.master, max_connections=200)class TaskRedis(RedisInit):def __init__(self, *args, **kwargs):super(TaskRedis, self).__init__(*args, **kwargs)def _set_task(self, key, task_id):"""缓存 task 的celery对象:param key 键:param task_id celery task id:return"""self.redis.set(key, task_id)self.redis.expire(key, 7200)def _get_task_id(self, key):"""获取celery的task id"""return self.redis.get(key)

3、任务异步改造

3.1、修改任务函数

Django 中task.py

from celery import task
from celery import shared_task@shared_task
def add(x, y):sleep(5)return x + y

3.2、触发任务

Django 中view.py,只需要给函数加 delay() 方法:

from .tasks import addresult = add.delay(4, 4)

4、启动 celery 项目

项目启动时使用如下命令启动celery

# 在Django项目主目录下
celery -A '项目名称' worker -l info Please specify a different user using the -u option.User information: uid=0 euid=0 gid=0 egid=0uid=uid, euid=euid, gid=gid, egid=egid,-------------- celery@x.x.x.x v3.1.25 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-5.10.84-004x86_64-x86_64-with-redhat-7.2
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         '项目名称':0x7feac79c3390
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- 
--- ***** ----- [queues]-------------- .> celery           exchange=celery(direct) key=celery[tasks]. add[2023-04-24 06:47:56,647: INFO/MainProcess] Connected to redis://localhost:6379/0
[2023-04-24 06:47:56,652: INFO/MainProcess] mingle: searching for neighbors
[2023-04-24 06:47:57,656: INFO/MainProcess] mingle: all alone
[2023-04-24 06:47:57,662: WARNING/MainProcess] /usr/lib/python2.7/site-packages/celery/fixups/django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2023-04-24 06:47:57,663: WARNING/MainProcess] celery@x.x.x.x ready.

保持后台运行命令

celery -A '项目名称' worker -l info --detach

输出日志到日志文件

celery -A '项目名称' worker -l info --loglevel=debug --logfile=tasks.log

参考文档

1、https://www.celerycn.io/ru-men/celery-jian-jie

2、https://www.cnblogs.com/fisherbook/p/11089243.html


http://www.ppmy.cn/news/53562.html

相关文章

计算机网络学习07(DNS域名系统详解)

DNS(Domain Name System)域名管理系统,是当用户使用浏览器访问网址之后,使用的第一个重要协议。DNS 要解决的是域名和 IP 地址的映射问题。 在实际使用中,有一种情况下,浏览器是可以不必动用 DNS 就可以获知…

Python小姿势 - import requests

import requests Python中使用requests模块发送POST请求 在使用Python进行开发时,经常会遇到需要向某个网址发送POST请求的情况。这时候就需要使用到requests模块了。 requests模块是Python的一个标准模块,可以直接使用pip安装。 安装完成后,…

python与大数据

Python与大数据 随着互联网和物联网的快速发展,数据已经成为了一个非常重要的资源。人们需要对这些数据进行采集、存储、处理和分析,从而获取有价值的信息和洞见。而这些数据往往是非常大的,需要使用一些特殊的技术和工具来处理。这就是大数…

MySQL高级第十五篇:MVCC多版本并发控制原理剖析

MySQL高级第十五篇:MVCC多版本并发控制原理剖析 一、什么是MVCC?二、快照读与当前读?1. 快照读2. 当前读 三、MVCC实现原理(ReadView)1. 隐藏字段2. Read View3. 思路设计4. ReadView使用规则5. MVCC整体操作流程 四、…

5.1劳动节,致敬最可爱的人!Cocos社区杰出贡献者出炉

Cocos 引擎的生态建设与繁荣,离不开社区开发者的辛勤付出。 2022.5 ~ 2023.5 年度期间,有这样一批 Cocos 社区开发者,他们使用 Cocos Creaor 引擎创作内容与产品、分享技术和经验,为 Cocos 社区默默贡献自己的一份力量&#xff0c…

软件杯龙源风电赛题培训!千万分钟数据和全流程基线等你来战

‍‍ “中国软件杯”大学生软件设计大赛是一项面向中国在校学生的公益性赛事,大赛由国家工业和信息化部、教育部、江苏省人民政府共同主办,是全国软件行业规格最高、最具影响力的国家级一类赛事。其中,作为重点赛题的龙源风电赛,上…

JavaWeb02(Servlet页面跳转方式表单提交方式)

目录 一.servlet 1.1 什么是servlet? 1.2 实现接口,初始代码 1.3 学会配置和映射 1.4 掌握servlet的生命周期 生命周期的各个阶段 1.5 获取servlet初始化参数和上下文参数 1.5.1 初始代码 推荐使用 1.5.2 初始化参数 1.5.3 上下文参数 1.6 servlet应用:处理用户登…

终于把 vue-router 运行原理讲明白了(二)!!!

一、vue-router路由变化侦测 1.1 上一遍文章中,介绍了vue-router 的install 函数的内部实现,知道了能在this中访问$router 和视图更新的机制,文章链接终于把 vue-router 运行原理讲明白了(一)!&#xff01…