celery初探
celery初探
资源
https://www.jianshu.com/p/9be4d8d30d8e
celery视频教程中用到的文档
实践
根据 celery视频教程中用到的文档 ,见 python/tasks_cel
旧的操作
疑问
celery -A appname
appname 如何定义、规则是啥? 貌似是指 任务的目录。
简单的测试例子:
新建一个目录celerytest,并建一个空的__init__.py文件
mkdir celerytest
cd celerytest
touch __init__.py
app_test.py 启动模块
from celery import Celery
app = Celery('celerytest', include=['celerytest.tasks'])
app.config_from_object('celerytest.celeryconfig')
if __name__ == '__main__':
app.start()
celeryconfig.py 存放配置。
BROKER_URL = 'redis://localhost:55034' # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:55034/0' # 把任务结果存在了Redis
CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案
CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型
tasks.py 具体的任务
import time
from celerytest.app_test import app
@app.task
def add(x, y):
time.sleep(1)
return x + y
diaoyong.py
如果跟上面的文件放到一个目录,则引用的时候用点.,放到上级目录时,用模块名称
from .tasks import add
import time
# 备注,引入当前模块下的模块时 使用.
t1 = time.time()
r1 = add.delay(1, 2)
r2 = add.delay(2, 4)
r3 = add.delay(3, 6)
r4 = add.delay(4, 8)
r5 = add.delay(5, 10)
r_list = [r1, r2, r3, r4, r5]
for r in r_list:
while not r.ready():
pass
print(r.result)
t2 = time.time()
print('共耗时:%s' % str(t2-t1))
说明:
上面的例子,需要提前准备好redis。我是安装redis,没有成功,使用了docker。
调用时候,其实并不是1秒完成的,而是5秒。原因是:我就起了一个进程。同个机器上,开多个进程见celery命令
celery命令
并发实现:
--concurrency 设置并发的数量
celery -A proj worker --loglevel=INFO --concurrency=10
可以在一台机器上启动多个woker,但是最好使用不同的名称来标识他们
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h