Celery 是一个简单,灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护该系统所需的工具。这是一个任务队列,着重于实时处理,同时还支持任务调度。
Celery 通过消息进行通信,通常使用经纪人在 clients 和 workers 之间进行调解。要启动一个任务,客户端会在队列中放入一条消息,然后经纪人将消息传递给工人。
一个 Celery 系统可以由多个 worker 和 broker 组成,从而实现高可用性和横向扩展。
在web 项目中,通常因为服务器http超时时间的限制,任务无法在规定时间完成,这就需要使用异步来实现。以下就如何使用celery 改造现有Django项目。
1、安装依赖
pip install celery redis
pip 安装的redis 无法在项目中使用,因为需要在环境中安装,方法如下:
wget http://download.redis.io/releases/redis-3.2.8.tar.gz -o /home/admin/package/
cd /home/admin/package/
tar -zxvf redis-3.2.8.tar.gz
mv ./redis-3.2.8 /usr/local/redis/
cd /usr/local/redis/
make
make install
cd utils
echo | /bin/bash install_server.sh
2、配置celery
2.1、settings.py
在 Django 项目的 settings.py 文件中添加以下配置:
# 配置 Celery
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'# 配置 redis
REDIS_HOST = [("127.0.0.1", 6379)]
REDIS_PASSWORD = ""
2.2、项目主目录下配置
2.2.1、__init__.py
#coding=utf-8
from __future__ import absolute_import, unicode_literalsfrom django.conf import settings
from .celery import app as celery_app
from ._redis import TaskRedisredis_client = TaskRedis(settings.REDIS_HOST,settings.REDIS_PASSWORD,db=1
)__all__ = ['celery_app']
2.2.2、新建celery 配置文件celery.py
#!/usr/bin/env python
# coding=utf-8
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platformsfrom django.conf import settingsfrom celery.schedules import crontab
from datetime import timedeltaos.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名称.settings')
platforms.C_FORCE_ROOT = True
app = Celery('项目名称', backend='redis://localhost:6379/1', broker='redis://localhost:6379/0')# 将settings对象作为参数传入
app.config_from_object('django.conf:settings', )# celery 自动发现任务: APP/tasks.py
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)# 设置定时参数
app.conf.update(# 并发数CELERYD_CONCURRENCY=4,# 每个worker 执行多少任务就killCELERYD_MAX_TASKS_PER_CHILD=50,# celery 路径CELERY_ROUTES={"项目名称.tasks.add": {"queue": "q1"}, # 把add任务放入q1队列},# celery beat 调度CELERYBEAT_SCHEDULE={"asy-update-info": {"task": "get_info","schedule": crontab(minute=0,hour=1), # 定时同步aone信息},# 定时执行 # "schedule": crontab(minute=0, hour=3), # 每日早上3点# "schedule": crontab(minute=0, hour=1),# "schedule": crontab(minute=0, hour=1), # "schedule": crontab(minute=0, hour="*/1") # 每隔1小时执行一次# "schedule": crontab(minute=0, hour="*/12") # 每隔十小时执行一次# "schedule": crontab(minute=0, hour=3) # 每日凌晨3点# "schedule": crontab(minute=0, hour=3, day_of_week=2) # 每周二凌晨3点# "schedule": crontab(minute=0, hour=3, day_of_week=1) # 每周一凌晨3点# "schedule": crontab(minute=0, hour=3, day_of_week=2) # 每周二凌晨3点# "schedule": crontab(minute=0, hour=3, day_of_week=2) # 每周二凌晨3点# "schedule": crontab(minute=0, hour=4, day_of_week='1,4') # 每周一、四凌晨4点#
)@app.task(bind=True)
def debug_task(self):print('Request: {0!r}'.format(self.request))
2.2.3、项目目录下创建redis存储的队列信息方法文件_redis.py
#coding:utf-8import datetime
import jsonfrom redis import Redis, ConnectionPoolclass RedisInit(object):def __init__(self, redis_config, password, master=None, connect_type='direct', db=0):""":param redis_config::param password::param master::param connect_type: 连接方式 direct: 直连 sentinel: 哨兵"""self.redis_config = redis_configself.password = passwordself.master = masterself.connect_type = connect_typeself.redis = Noneself.host, self.port = self.redis_config[0]self.db = dbself._connect()def _connect(self):# 直接if self.connect_type == 'direct':self.pool = ConnectionPool(host=self.host, port=self.port, password=self.password, db=self.db,max_connections=200, socket_keepalive=True, decode_responses=True)self.redis = Redis(connection_pool=self.pool)# 哨兵else:# 哨兵连接池self.sentinel = Sentinel(self.redis_config, password=self.password, db=self.db,socket_keepalive=True, decode_responses=True)self.redis = self.sentinel.master_for(self.master, max_connections=200)class TaskRedis(RedisInit):def __init__(self, *args, **kwargs):super(TaskRedis, self).__init__(*args, **kwargs)def _set_task(self, key, task_id):"""缓存 task 的celery对象:param key 键:param task_id celery task id:return"""self.redis.set(key, task_id)self.redis.expire(key, 7200)def _get_task_id(self, key):"""获取celery的task id"""return self.redis.get(key)
3、任务异步改造
3.1、修改任务函数
Django 中task.py
from celery import task
from celery import shared_task@shared_task
def add(x, y):sleep(5)return x + y
3.2、触发任务
Django 中view.py
,只需要给函数加 delay()
方法:
from .tasks import addresult = add.delay(4, 4)
4、启动 celery 项目
项目启动时使用如下命令启动celery
# 在Django项目主目录下
celery -A '项目名称' worker -l info Please specify a different user using the -u option.User information: uid=0 euid=0 gid=0 egid=0uid=uid, euid=euid, gid=gid, egid=egid,-------------- celery@x.x.x.x v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Linux-5.10.84-004x86_64-x86_64-with-redhat-7.2
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: '项目名称':0x7feac79c3390
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/1
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]-------------- .> celery exchange=celery(direct) key=celery[tasks]. add[2023-04-24 06:47:56,647: INFO/MainProcess] Connected to redis://localhost:6379/0
[2023-04-24 06:47:56,652: INFO/MainProcess] mingle: searching for neighbors
[2023-04-24 06:47:57,656: INFO/MainProcess] mingle: all alone
[2023-04-24 06:47:57,662: WARNING/MainProcess] /usr/lib/python2.7/site-packages/celery/fixups/django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2023-04-24 06:47:57,663: WARNING/MainProcess] celery@x.x.x.x ready.
保持后台运行命令
celery -A '项目名称' worker -l info --detach
输出日志到日志文件
celery -A '项目名称' worker -l info --loglevel=debug --logfile=tasks.log
参考文档
1、https://www.celerycn.io/ru-men/celery-jian-jie
2、https://www.cnblogs.com/fisherbook/p/11089243.html