最近在Celery上折磨了好一阵子,最近终于差不多代码也比较稳定了,所以在这里把这几天踩的坑都写一写,不仅是方便自己,也是方便别人。

使用RabbitMQ而不是Redis做Broker

这是第一点,也是保证后面的一些程序正常运行的基础。使用Redis做Broker的时候,Celery的好些原语都不能正常起作用,至于为什么,并没有深究,这块就是作为经验而言吧。

另外,使用RabbitMQ的时候,实例化celery的时候,Broker应该使用pyamqp: celery = Celery(app.__name__, broker='pyamqp://{}:{}@{}:{}/{}'.format(RM_USER, RM_PWD, RM_HOST, RM_PORT, RM_VHOST))

RabbitMQ的安装

参考:Using RabbitMQ

  • MacOS:
1
2
3
brew install rabbitmq
# 设置PATH
PATH=$PATH:/usr/local/sbin
  • Ubuntu
1
sudo apt-get install rabbitmq-server
  • docker
1
2
# management是带管理界面的,latest没有管理界面
docker pull hub.c.163.com/library/rabbitmq:3.6.10-management

RabbitMQ的配置

  • 普通
1
2
3
4
5
6
7
sudo rabbitmqctl add_user celery celerypwd
sudo rabbitmqctl add_vhost celery_host
sudo rabbitmqctl set_permissions -p celery_host celery ".*" ".*" ".*"
# 启动
sudo rabbitmq-server
sudo rabbitmq-server -detached
sudo rabbitmqctl stop
  • docker
1
2
# http://localhost:15672/ 是管理界面
docker run -d --name=rabbitmq_ability --net=host -e RABBITMQ_DEFAULT_USER=celery -e RABBITMQ_DEFAULT_VHOST=celery_host -e RABBITMQ_DEFAULT_PASS=celerypwd -p 5672:5672 -p 15672:15672 -it hub.c.163.com/library/rabbitmq:3.6.10-management

Celery中的Signature

subtask

Broker是Redis,在一个函数中,如果重复调用一个task执行apply_async,第一个可以执行,后面的都是NotRegisted,但是如果改成subtask就可以正常运行。subtask的意义和Signature相似的,感觉是Signature的前身吧。下面是几个用法,这个subtask概念理解起来很简单。

1
2
3
tasks.add.subtask((2,2), countdown=10)
test_print_cp = subtask(test_print, app=celery_current_app)
test_print_cp.apply_async()

子任务的调用和普通任务都是一样的

Signature

作用就是为了可以将一个任务调用的签名传递给其他处理函数或者是作为其他函数的参数,signature包含了参数,关键字参数和单个任务调用的执行选项,这样就可以传递给函数或者是序列化传递给其他地方。functools里面有一个partial和这个概念很像,以下是几个例子

1
2
3
4
5
from celery import signature
signature('tasks.add', args=(2, 2), countdown=10)
# 也可以这么调用
tasks.add.signature((2, 2), countdown=10)
tasks.add.s(2, 2)
  • add.s() 可变signature

上面这些只是创建了签名但是并不会调用,注意在后面的概念理解上,需要明确的区分哪些是signature,哪些是result,这很关键,这样可以清楚的区分一长串的原语是怎么构成的。如果要调用的话,需要使用调用API,比如delay, apply_async也可以直接调用,比如

1
2
3
tasks.add.s(2, 2).delay()
tasks.add.s(2, 2)()
tasks.add.s(2, 2).apply_async()
  • partial signature

上面这些都返回的是AsyncResult的实例,这里面包含了许多控制task还有获取结果的方法。除了在创建signature的时候将参数传递进去,也可以传递一部分参数,然后在调用的时候再传递剩余的,比如这样

1
2
3
4
5
6
7
partial = add.s(2)
partial.delay(4)
partial.apply_async((4,))
# 可以在task上面调用clone方法,根据之前的task创建目前的task
s = add.s(2)
s.clone(args=(4,), kwargs={'debug': True})
# 等同于 add.s(4, 2, debug=True)
  • 不可变的signature

就是在创建了这个signature之后,signature的参数就是不可变的了,partial这种会用到callbacks或者其他的工作流上,就会动态的给其signature传递参数,但是使用不可变的signature,就不会传参数了。这个特性是很经常用到的。

1
add.si(2, 2)

原语

想起一句话May the force be with you,原力,感觉原语就和原力一样强大

group

group可以使得一组任务并行,比如这样

1
2
3
4
5
6
7
8
# task集合必须是signature或者子任务的集合,也就是不会立即启动的任务
tasks = [distribute_calc.si(fundId, start, end, stype) for fundId in fundIds_group]
# 返回的是group对象
grp = group(tasks)
# 调用后返回的是GroupResult对象,就可以使用相应的方法来处理了
grp()
# 可以使用任何调用API来调用
grp.apply_async()

如果后面要在其他地方可以取到GroupResult,必须在调用后获得GroupResult对象后,调用save方法

1
2
res = grp.apply_async()
res.save()

在其他地方调用restore()方法:

1
2
# res就是一个GroupResult对象
res = celery_result.GroupResult.restore(task_id, app=celery_current_app, backend=celery_app.backend)

chain

顾名思义,这个就是将一串signature串在一起运行,一个接一个,本质是组成了一组回调函数,chain会将前一个函数的结果传递到后一个函数来,不过也可以使用si()方法来拒绝传递,chain会返回最后一个调用的task的结果

1
2
3
4
5
6
7
8
9
10
11
# 返回的是chain对象
task = chain(add.s(4,4) | mul.s(8))
# 任意方式调用
res = task.apply_async()
# 调用后返回的是AsyncResult
res.get()
# 也可以这么调用
(add.s(2, 2) | add.s(4) | add.s(8))
# 也可以使用partial
c1 = (add.s(4) | mul.s(8))
res = c1(16)

与group的连接,如果group在前面,会导致变成一个chord,也就是这个group运行完成后,会执行后面的函数作为回调,如果group在后面,则group的每一个任务都会使用前面的task的结果作为参数,可以使用si来拒绝参数

1
2
3
4
5
6
# 调用后返回的还是AsyncResult
c3 = (group(add.s(i, i) for i in xrange(10)) | xsum.s())
# group在后面
new_user_workflow = (create_user.s() | group(import_contacts.s(), send_welcome_email.s()))
new_user_workflow.delay(username='artv', first='Art', last='Vandelay', email='art@vandelay.com')

如何在先调用group,然后调用普通任务的情况下,获得GroupResult,这个还没有好的方法。

group([(group(add.s(i, i) for i in xrange(10)) | xsum.s())])这个呢?

首先外部是group对象,没毛病,但是内部tasks集合就是一个chain对象,在调用的时候先调用group,会形成并发,然后完成之后调用xsum.s(),最后会返回一个AsyncResult,这种情况,计算是可以正常完成的!获取状态也比较好弄,因为可以获取外部group的结果,可以使用res.children[0].parent来获取里面的group的状态,即GroupResult。可以使用获取的这个GroupResult来对task进行操作,res.revoke(terminated=True)

P.S.

  • 可能需要以下的参数配置

task_sent event 当一个任务消息被发布并且CELERY_TASK_SEND_SENT_EVENT配置有效的时候的时候发送这个event

CELERY_TASK_SEND_SENT_EVENT 如果启用,task_sent event 将会发送给每一个task,这样在task被worker运行之前tasks可以被追踪到

CELERY_SEND_EVENTS # 发送task相关的event,这样task就可以被比如flower之类的进行控制了

  • terminated

这里Celery使用的是SIGTERM,所以会关闭进程相关的程序,也就是等待进程释放自己的资源,然后再停止,如果程序再等待I/O,可能就不会立马作出反应。也就是说,SIGTERM多半是会被阻塞的,忽略的。

如果使用SIGKILL,就会强制杀死进程,所以可能导致孤儿进程的产生。

chord

chord可以使得你完成group所有的task之后来调用一个回调函数,这个方法经常会在易并行(embarrassingly parallel)的场景使用,关于这个embarrassingly…What is “embarrassing” about an embarrassingly parallel problem?

1
2
3
4
from celery import chord
# 这样会导致前面的十个任务并行处理,并且将结果组成一个list,传递给后面的xsum
# 当然后面的也可以不用前面的结果,使用si()就行了
res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()

下面这个是我自己写的一个并发处理,并且还能拿到状态用于后续的控制和状态读取的例子

1
2
3
4
5
6
7
8
task = group([chord(group(gen_group_tasks.si(start, end, stype)()))(publish.si(start, end, stype))]).apply_async()
# first 这个会返回所有的要并行的tasks列表
gen_group_tasks.si(start, end, stype)()
# 然后形成并行group
group(gen_group_tasks.si(start, end, stype)())
# 然后是chord,增加回调函数,回调函数不接受参数
chord(group(gen_group_tasks.si(start, end, stype)()))(publish.si(start, end, stype))
# 然后将这个chord组建成为一个group,可以获取到GroupResult,否则就只能得到publish的AsyncResult

实现的机理是:有一个循环任务不断的(间隔1秒)去检测group任务是否完成,当完成的时候调用signature回调。但是在Redis和 Memcached: 它们实现了一个计数器,在每个并行的任务完成后给计数器增加,直到计数器的数字等于并行任务的数目时候调用回调函数。

另外:chord不能在redis 2.2版本以前正常工作,最好升级到2.2以上。

尽可能的不要用chord因为它是一个同步操作,但是如果并行任务后面的同步操作是必须的,那么chord很有用,只有header里面的所有task任务被执行完成后并且返回,回调函数才可以被调用。header里面的每一步都是并行处理的,在不同的节点。然后header里面每个任务的返回值会传给回调函数,然后执行。返回的task id是回调函数的,所以可以等待最终的返回值。

map

starmap

chunks

Result

AsyncResult

GroupResult

Configuration

小结