Celery是一个实时处理和任务调度的分布式任务队列。任务就是消息,消息中的有效载荷中包含要执行任务需要的全部数据。

这是其使用场景:

  1. web应用,需要较长时间完成的任务,就可以作为任务交给celery异步执行,执行完返回给用户。
  2. 网站的定时任务
  3. 异步执行的其他任务。比如清理/设置缓存

有以下特点:

  1. 任务执行情况
  2. 管理后台管理任务
  3. 任务和配置管理相关联
  4. 多进程,EventLet和Gevent三种模式并发执行
  5. 错误处理机制
  6. 任务原语,任务分组,拆分和调用链

Celery的架构

  1. Celery Beat,任务调度器,Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
  2. Celery Worker,执行任务的消费者
  3. Broker,消息代理,接受生产者的任务消息,存进队列然后按序发送给消费者
  4. Producer,定时任务或者调用了API产生任务交给任务队列进行处理
  5. Result Backend,任务处理完后保存状态信息和结果

整体机制就是,任务发布者(一般是web应用)或者任务调度,即定时任务将任务发布到消息代理上(使用Redis或者RabbitMQ),然后消息代理将任务按序发送给Worker执行,Worker执行完后将结果存储到Backend中,也可以用Redis。

Celery的序列化

一般使用json, yaml, msgpack,其中msgpack是一个二进制的json序列化方案,比json数据结构更小,更快。序列化的作用是在客户端和消费者之间传输数据的途中需要序列化和反序列化。

一个例子

  • 主程序(实例化Celery)

一种方式,直接进行,简单粗暴

1
2
3
from celery import Celery
from config import REDISHOST, REDISDB, REDISPORT
celery = Celery('backend', broker='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB),backend='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB))

另一种方式,使用配置文件,类似于Flask应用实例的方式吧

1
2
3
4
from celery import Celery
app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')
# celery.conf.update(app.config)

第三种方式,为了能在celery task中使用应用上下文,可以这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def make_celery(app):
celery = Celery(app.__name__, broker='redis://%s:%d/%d' % (REDIS_HOST, REDIS_PORT, REDIS_DB),
backend='redis://%s:%d/%d' % (REDIS_HOST, REDIS_PORT, REDIS_DB))
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(app)
  • Celery配置文件

如果都在实例化的时候指定好了配置,那么就不需要了,如果需要指定额外的参数,那么就可以放在配置文件里,以下是几个常用的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 导入tasks
from celery.schedules import crontab
from datetime import timedelta
# 防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD = 40
# 指定时区
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_IMPORTS = ("tasks", "graph_data_tasks")
# broker代理地址
BROKER_URL = ''
# backend地址
CELERY_RESULT_BACKEND = ''
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'json'
# A value of None or 0 means results will never expire
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务结果过期时间
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
# 路由,指定队列
CELERY_ROUTES = {'backend.celery_tasks.calc_ability': {'queue': 'capacity'},
'backend.celery_tasks.graph_data_tasks': {'queue': 'celery'},
'backend.celery_tasks.calc_plat': {'queue': 'celery'},
'backend.celery_tasks.calc_sections': {'queue': 'guchong'},
'backend.celery_tasks.publish_data': {'queue': 'mysql'}}
# celery worker的并发数
CELERYD_CONCURRENCY = 2
# 使用任务调度,使用Beat进程自动生成任务
CELERYBEAT_SCHEDULE = {
'graph_data': {
'task': 'graph_data_tasks.sync_graph',
'schedule': timedelta(minutes=60),
'args': ()
},
truetrue'rank_for_guchong': {
'task': 'backend.celerytasks.rank_for_guchong.calc_ability_schedule',
'schedule': crontab(hour=11, minute=55),
'args': ()
}
}
  • Celery的tasks

将实例化的celery实例导入进来之后,使用celery的装饰器task()即可完成tasks的注册

1
2
3
4
5
6
7
8
# 指定bind为True就可以更新状态
@celery.task(bind=True)
def calc_capacity(strategy_id, start_date, end_date, assets, flows, stocks, output_path):
# meta可以指定用户数据
self.update_state(state='PROGRESS', meta={'status': 'running', 'percentage': 0.1, 'data': None})
c = CalcAbility(strategy_id, assets, flows, stocks, output_path)
c.run(start_date, end_date)
return {'current': 1, 'total': 1, 'status': 'success', 'reason': ''}
  • 使用tasks
1
2
3
4
5
6
7
8
9
10
11
12
@be_data.route('/data/graph', methods=['GET'])
# @token_required
def gen_megapool_from_summary():
'''直接从summary文件计算这些汇总信息'''
pie_tuple = Cache.get_summary_data(name='summary_data', data_type=float)
meta = {}
if not pie_tuple:
task = sync_graph.apply_async(args=[])
return gen_response(data=meta, message="success", errorcode=0), 200
elif pie_tuple[11] != db_clients.hsize('summary_ability'):
sync_graph.apply_async(args=[])
return gen_response(data=meta, message="success", errorcode=0), 200

另外可以通过task.id来获取task的运行状态

1
2
3
4
5
6
7
8
9
10
task = calc_ability.AsyncResult(task_id)
response = {
'current': task.info.get('current', 0),
'total': task.info.get('total', 1),
'time': task.info.get('time', 0),
'start_time': start_time,
'fundid_nums': calc_task.fundid_nums,
'status': task.info.get('status', ''),
'reason': task.info.get('reason', '')
}

也可以指定task_id和app以及backend来获取状态

1
2
3
4
5
6
from celery import result as celery_result, current_app as celery_current_app
res = celery_result.AsyncResult(task_id, app=celery_current_app, backend=celery.backend)
res.state
res.status
# 这个就是用户数据或者返回时候return的数据
res.result
  • Celery的执行

celery --app=backend.celery worker --loglevel=DEBUG --config=celery_settings --beat

  • 指定队列

Celery通常使用默认名为celery的队列来存放任务,可以通过CELERY_DEFAULT_QUEUE修改,可以使用优先级不同的队列来确保高优先级的任务不需要等待就可以得到相应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from kombu import Queue
CELERY_QUEUES = (
Queue('default', routing_key='task.#'),
trueQueue('web_tasks', routing_key='web.#')
)
# 默认交换机名字为tasks
CELERY_DEFAULT_EXCHANGE = 'tasks'
# 交换类型是topic
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
# 默认的路由键是task.default
CELERY_DEFAULT_ROUTING_KEY = 'task.default'
CELERY_ROUTES = {
'projq.tasks.add': {
'queue': 'web_tasks',
'routing_key': 'web.add',
true}
}

指定队列方式启动消费者进程:celery -A projq worker -Q web_tasks -l info

celery中的任务绑定

任务可以通过 app.task 装饰器进行注册,需要注意的一点是,当函数有多个装饰器时,为了保证 Celery 的正常运行,app.task 装饰器需要在最外层。其中有一个bind 参数,当设置了 bind 参数,则会为这个任务绑定一个 Task 实例,通过第一个 self 参数传入,可以通过这个 self 参数访问到 Task 对象的所有属性。绑定任务用于尝试重新执行任务(使用app.Task.retry()),绑定了任务就可以访问当前请求的任务信息和任何你添加到指定任务基类中的方法。也可以使用self.update_state()方法来更新状态信息

1
2
3
4
5
6
7
@celery.task(bind=True)
def calc_ability(self, fundIds, start_date, end_date, increment_end_date, maxsize):
start_date = parse_date(start_date)
end_date = parse_date(end_date)
truetrueself.update_state(state='PROGRESS',
meta={'current': index, 'total': total,
'time': used_time.used_time() / 60, 'status': 'running', 'reason': ''})

停止任务

1
2
3
4
5
6
7
8
9
10
11
rs = add.delay(1, 2)
rs.revoke() # 只是撤销,如果任务已经在执行,则撤销无效
rs.task_id # 任务id
app.control.revoke(rs.task_id) # 通过task_id撤销
app.control.revoke(rs.task_id, terminate=True) # 撤销正在执行的任务,默认使用TERM信号
app.control.revoke(rs.task_id, terminate=True, signal='SIGKILL') # 撤销正在执行的任务,使用KILL信号
# 而在最新的celery3版本中,这样停止一个任务
from .. import celery # 就是创建的celery实例
celery.control.terminate(task_id)
# 其实本质还是调用了revoke
return self.revoke(task_id, destination=destination, terminate=True, signal=signal, **kwargs)

Celery监控工具Flower

pip install flower

需要在celery_settings中指定CELERY_SEND_TASK_SENT_EVENT = True,然后和启动celery同样的目录下运行flower -A backend.celery --port=5555,即可看到管理界面了。访问http://localhost:5555

使用自动扩展

celery -A proj worker -l info --autoscale=6,3表示平时保持3个进程,最大并发进程数可以达到6个。

在Celery tasks里面使用多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from celery import Celery
import time
import multiprocessing as mp
app = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks")
def test_func(i):
print "beg...:", i
time.sleep(5)
print "....end:", i
return i * 5
@app.task
def fun_1(n):
curr_proc = mp.current_process()
curr_proc.daemon = False
p = mp.Pool(mp.cpu_count())
curr_proc.daemon = True
for i in range(n):
p.apply_async(test_func, args=(i,))
p.close()
p.join()
return 1
if __name__ == "__main__":
app.start()

直接启动多进程是肯定不可以的,因为是守候进程curr_proc.daemon=True,所以启多进程之前主动设置为非守候进程curr_proc.daemon=False,启动了以后再设为守候进程

参考自:Python Web 开发实战 – 董伟明