一、TimeLimit&SoftTimeLimit 的坑
celery 中 time limit 有两种, soft_time_limit 和 time_limit, 区别是 soft_time_limit 会在内部抛一个 Exception, task 可以 通过try...except 自行处理,time_limit则是中断woker重启没法被catch做一些操作。
python">from myapp import app
from celery.exceptions
import SoftTimeLimitExceeded
@app.task
def mytask():try: do_work() except SoftTimeLimitExceeded: clean_up_in_a_hurry()
经过本人反复实验, 只有celery pool 的默认模式prefork,同时支持上面两种超时处理,两种协程模式gevent和eventlet不支持soft_time_limit,最坑爹的solo模式怎么配置都毫无作用。
以下是 Celery 支持的一些常见并发模式:
-
solo:默认模式,工作进程在单个进程中顺序执行任务。
-
prefork:传统的 UNIX 方式,工作进程会创建多个子进程来并行执行任务。
-
eventlet:基于 Eventlet 库,使用协程来实现并发。
-
gevent:基于 Gevent 库,使用绿色线程来实现并发。
-
threads:使用 Python 的标准库
threading
模块来创建一个线程池。 -
processes:使用 Python 的
multiprocessing
模块来创建一个进程池。 -
aiopg:使用异步进程池,基于
asyncio
和multiprocessing
。
解决方案
自己实现task的超时捕捉异常返回的操作。
python常用定义超时异常的方法有func-timeout库
也可以通过自定义装饰器,或者魔法模块contextlib来实现。
我这里具体任务是处理视频,根据视频长度不同来设置不同的超时是最优化的方案,因此我用contextlib来实现超时捕获
python">import signal
from contextlib import contextmanager
# 自定义超时异常
class TimeoutError(Exception):def __init__(self, msg):super(TimeoutError, self).__init__()self.msg = msg
@contextmanager
def timeout(interval):def _handler(signum,frame):raise TimeoutError(f"任务超时!")try:signal.signal(signal.SIGALRM,_handler)signal.alarm(interval) # interval秒后向进程发送SIGALRM信号yield# except TimeoutError as e:# raise TimeoutError(e)finally:signal.alarm(0)
使用时
python">try:with timeout(limit_time):dowork()
except TimeoutError as e:clean_up_in_a_hurry()
值得注意的是,该方法是python在linux下的超时,windows下不支持signal库需要用别的方法。
目前用下来celery的坑还是不少的(包括监控flower的时间的坑),执行效率其实也不咋地。