celery初探

celery初探

资源

https://www.jianshu.com/p/9be4d8d30d8e
celery视频教程中用到的文档

官网文档

实践

根据 celery视频教程中用到的文档 ,见 python/tasks_cel

旧的操作

疑问

celery -A  appname

appname 如何定义、规则是啥?   貌似是指   任务的目录。

简单的测试例子:

新建一个目录celerytest,并建一个空的__init__.py文件

mkdir celerytest
cd celerytest
touch __init__.py

app_test.py 启动模块

from celery import Celery

app = Celery('celerytest', include=['celerytest.tasks'])
app.config_from_object('celerytest.celeryconfig')

if __name__ == '__main__':
    app.start()

celeryconfig.py 存放配置。

BROKER_URL = 'redis://localhost:55034' # 使用Redis作为消息代理

CELERY_RESULT_BACKEND = 'redis://localhost:55034/0' # 把任务结果存在了Redis

CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案

CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型

tasks.py 具体的任务

import time
from celerytest.app_test import app

@app.task
def add(x, y):
    time.sleep(1)
    return x + y

diaoyong.py

如果跟上面的文件放到一个目录,则引用的时候用点.,放到上级目录时,用模块名称

from .tasks import add
import time

# 备注,引入当前模块下的模块时   使用.  

t1 = time.time()

r1 = add.delay(1, 2)
r2 = add.delay(2, 4)
r3 = add.delay(3, 6)
r4 = add.delay(4, 8)
r5 = add.delay(5, 10)

r_list = [r1, r2, r3, r4, r5]
for r in r_list:
    while not r.ready():
        pass
    print(r.result)

t2 = time.time()

print('共耗时:%s' % str(t2-t1))

说明:

上面的例子,需要提前准备好redis。我是安装redis,没有成功,使用了docker。

调用时候,其实并不是1秒完成的,而是5秒。原因是:我就起了一个进程。同个机器上,开多个进程见celery命令

celery命令

并发实现:
        --concurrency  设置并发的数量
        celery -A proj worker --loglevel=INFO --concurrency=10
 
        可以在一台机器上启动多个woker,但是最好使用不同的名称来标识他们
        celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
        celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h