翻译自https://blog.miguelgrinberg.com/post/using-celery-with-flask

http://www.cnblogs.com/jay54520/p/6212623.html

终于得到自己web程序要进行计算密集型的解决办法了,所以将此文翻译留作后用,也方便有需要的人。

正文

Working with Flask and Celery

Celery和Flask的整合是非常容易的,没有其他需要的扩展包。一个使用Celery的Flask应用需要这样初始化Celery客户端:

1
2
3
4
5
6
7
from flask import Flask
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = ‘redis://localhost:6379/0’
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

正如所看到的,Celery是通过创建Celery的实例初始化的,传递了应用的名称和连接URL作为消息代理,将CELERY_BROKER_URL作为键放入app.config字典。这个URL告诉代理服务在哪里运行。如果你运行的不是Redis,或者有在不同的机器上的代理,那么你需要相应的改变这个URL。

另外的可选配置选项可以直接从Flask的配置通过celery.conf.update()传递,CELERY_RESULT_BACKEND只有你需要存储任务状态和结果值的时候才是必须的。第一个例子不需要这个功能,但是第二个需要,所以最好从一开始配置好。

任何你想作为后台任务的函数都需要使用celery.task装饰器装饰,比如

1
2
3
@celery.task
def my_background_task(arg1, arg2):
return result

然后Flask应用就可以像如下请求后台任务的运行了:

1
task = my_background_task.delay(10, 20)

delay()方法是更加强大的apply_async()调用的一个缩略,这里是一个同等的调用使用apply_async():

1
task = my_background_task.apply_async(args=[10, 20])

当使用apply_async(),你可以给Celery关于后台任务如何被运行更加详细的指示,一个有用的选项是请求任务在将来的某个时候运行。例如,下面的语句将会使任务在大约1分钟后运行

task = my_background_task.apply_async(args=[10, 20], countdown=60)

delay()apply_async()的返回值都是一个代表任务的对象,这个对象被用来获取状态。我将在稍后的文章里展示如何完成的,但是现在keep it simple并且不要关心这些任务的结果。

Simple Example: Sending Asynchronous Emails

第一个例子将要展示的是一个非常普通的应用需求:不阻塞主应用去发送电子邮件。

作为示例我将使用Flask-Mail扩展,我已经在其他文章中详细的介绍了。我将假定你对这这个扩展很熟悉,因此如果你需要复习一下请看我的Flask book或者这篇介绍。

这个我将用于展示的示例应用包含有只有一个输入的简单web表单。用户被要求输入email地址在这个区域,然后提交,服务器将会向该地址发送一封email。表单含有两个提交按钮,一个是立即发送email,另外一个是等待一分钟后发送。截图最上面的部分展示了这个表单的样子。

这里是支持这个例子的HTML模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<html>
<head>
<title>Flask + Celery Examples</title>
</head>
<body>
<h1>Flask + Celery Examples</h1>
<h2>Example 1: Send Asynchronous Email</h2>
{% for message in get_flashed_messages() %}
<p style="color: red;">{{ message }}</p>
{% endfor %}
<form method="POST">
<p>Send test email to: <input type="text" name="email" value="{{ email }}"></p>
<input type="submit" name="submit" value="Send">
<input type="submit" name="submit" value="Send in 1 minute">
</form>
</body>
</html>

希望你没有觉得惊讶,只是常规的HTML表单,加上可以从Flask闪现消息的能力。

Flask-Mail扩展需要一些配置,特别是当发送email的时候使用的邮件服务器,我使用Gmail账户作为email服务器

1
2
3
4
5
6
7
# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.googlemail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = 'flask@example.com'

现在为了防止我的email账户处于风险中,我将它们设置成环境变量,然后导入到应用中。

这是一个支持这个例子的简单路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.route('/', methods=[’GET', 'POST'])
def index():
if request.method == 'GET':
return render_template('index.html', email=session.get('email', ''))
email = request.form['email']
session['email'] = email
msg = Message('Hello from flask', recipients=[request.form['email']])
msg.body = 'This is a test email sent from a background celery task'
if request.form['submit'] == 'Send':
send_async_email.delay(msg)
flash('Sending email to {0}'.format(email))
else:
# 使用了两种不同的调用方法
send_async_email.apply_async(args=[msg], countdown=60)
return redirect(url_for('index'))

同样的,这是一个标准的Flask,因为只是一个非常简单的表单,所以我没有使用任何扩展,我使用了request.methodrequest.form来做所有的管理。我在session中保存了用户在文本输入区输入的值,这样我就可以在页面重新载入之后记住它。

有意思的一点是发送email,它是被一个Celery任务叫send_async_email处理的, 通过delay()或者apply_async()来激发的

应用最后一部分是让任务完成的异步任务函数

1
2
3
4
@celery.task
def send_async_email(msg):
with app.app_context():
mail.send(msg)

这个函数被celery.task装饰器装饰,变成了后台任务。这个函数值得关注的是Flask-Mail需要应用上下文才能运行,所以在send()方法被调用之前首先应该获取应用上下文。

应该注意的是在这个异步调用中返回的值没有被保存,所以应用不会知道调用是否成功。当运行这个例子的时候,应该在发送邮件时候出现问题去查看Celery工作者的输出。

Complex Example: Showing Status Updates and Results

上面的例子很简单,后台任务启动后,应用就不用管了。大多数的Celery的web开发指南就到这里结束了,但是事实上对很多应用来说,管理后台任务和获取结果是很必要的。

我现在要做的是扩展上面的应用,这个例子展示了一个长时间运行的任务。用户可以通过点击按钮来启动一个或者多个任务,浏览器上的web页面通过ajax来获取这些任务的状态更新。对于每一个任务页面将会有一个状态条,完成的百分比和一个状态信息,当任务完成后,结果值也将会显示。

Background Tasks with Status Updates

第二个例子中使用的后台任务,这里就相当于计算任务,然后结束的时候返回状态,使用update_state来更新状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@celery.task(bind=True)
def long_task(self):
"""Background task that runs a long function with progress reports."""
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
for i in range(total):
if not message or random.random() < 0.25:
message = '{0} {1} {2}...'.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total,
'status': message})
time.sleep(1)
return {'current': 100, 'total': 100, 'status': 'Task completed!',
'result': 42}

这个任务中我在Celery装饰器中添加了bind=True参数,用来传递一个self参数给这个函数,这样可以用来记录状态的更新。

既然这个任务没有做任何有用的事,我决定使用一些动词,形容词和名字的集合来创建一个有趣的状态消息。你可以在上面看到我所用到的词语。没有什么错但是有趣是吧?

函数有一个10到50的随机数字的循环,每一次运行任务都会有不同的时间。在第一次迭代的时候会创建一个不同的状态消息,在随后的循环中有25%的可能性改变这个消息。

self.update_state()调用使得Celery接受这些任务更新。那里有好几个内置状态,比如STARTED, SUCCESS等等,但是Celery也允许自定义状态。这里我用到了自定义状态PROGRESS。这个状态附带了一个附加的元数据,以Python字典的形式包含了当前和总的迭代数,还有一个随机的状态消息。客户端可以使用这些元素来形成一个进度条。每一次迭代休眠一秒,模拟一些完成的工作。

当循环退出的时候,Python字典作为函数返回值返回。字典包涵了更新的迭代计数,最终的状态信息和有趣的结果。

long_task()函数在一个Celery工作者中处理,下面是Flask应用的路由来启动后台任务

1
2
3
4
@app.route('/longtask', methods=['POST'])
def longtask():
task = long_task.apply_async()
return jsonify({}), 202, {'Location': url_for('taskstatus', task_id=task.id)}

客户端需要向/longtask提交一个POST请求来开始一个任务,服务器开始任务并且存储返回值。响应状态值用的是202,通常用在REST APIs来标志一个请求正在被处理。我也加上了Location头,里面的URL用户可以用来获取状态信息。这个URL指向了Flask路由taskstatus,使用task.id作为动态参数

Accessing Task Status from the Flask Application

taskstatus路由负责报告由后台任务提供的状态更新,这是这个路由的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@app.route('/status/<task_id>')
def taskstatus(task_id):
task = long_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif taks.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info.get('current', 0),
'total': task.info.get('total', 1),
'status': task.info.get('status', '')
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # this is the exception raised
}
return jsonify(response)

路由生成了一个JSON响应,包涵了任务状态和所有我在update_state()中meta设置的值,这样客户端就可以用这个来创建一个进度条了。不幸的是,运行函数需要检查一些边界条件,因此它会运行一段时间。为了取得任务数据,我重新创建了任务对象,它是AsyncResult类的对象,使用在URL中传递的task id

防坑指南

以上内容是原文的主要内容,下面我要写一下自己在使用Celery中出现的问题和解决方法。我使用的是redis做为broker和backend,代码如下:

celery = Celery('backend', broker='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB),backend='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB))

这句话在celery.py文件里,该文件在应用目录最高层,和__init__.py是同一级的,所以在启动celery时候这样:celery --app=backend.celery worker --loglevel=DEBUG --config=celery_settings--app这个参数就不必解释了,最后一个--config参数,这个是导入celery的参数,其实有很多参数,我在应用中只用到了CELERY_IMPORTS = ("tasks", ),作用就是将tasks.py中的task都注册了。

其中tasks.py也是和celery.py同一级的,里面包涵了所有要计算的task函数,比如这样

1
2
3
4
5
6
@celery.task(bind=True)
def test_ing(self, a):
self.update_state(state='PROGRESS',
meta={'current': 10, 'total': 10, 'time': 10})
time.sleep(10)
return {'current': a, 'total': a, 'time': a / 60}

然后将这些函数导入到路由那块,然后启动就好了。

启动时候出现错误connection refused,这个原因应该是redis在这样的启动方式下无法连接,所以我使用inv web.gunicorn启动,inv就是invoke,这个是自定义启动方式的一个包,其实就是相当于gunicorn启动。启动之后就好了,可以正常运行了。

在Celery tasks里面使用多进程

这个可是极好的功能,我试了一下在我的程序里加入多进程,由一个Python进程中运行变成现在四个进程运行,运行时间从0.36分钟减少到0.13分钟,大概3倍,因为多进程调度有损耗,所以不会完全的是4倍,但是已经很好啦。如果是8核16线程的话,大概能有12倍左右的提升。

以下是一个使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from celery import Celery
import time
import multiprocessing as mp
app = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks")
def test_func(i):
print "beg...:", i
time.sleep(5)
print "....end:", i
return i * 5
@app.task
def fun_1(n):
curr_proc = mp.current_process()
curr_proc.daemon = False
p = mp.Pool(mp.cpu_count())
curr_proc.daemon = True
for i in range(n):
p.apply_async(test_func, args=(i,))
p.close()
p.join()
return 1
if __name__ == "__main__":
app.start()

直接启动多进程是肯定不可以的,因为是守候进程curr_proc.daemon=True,所以启多进程之前主动设置为非守候进程curr_proc.daemon=False,启动了以后再设为守候进程

一些其他的总结

向docker里面传文件

cat invtasks/__init__.py | docker exec -i -u root beehive_ sh -c 'cat > /opt/beehive/invtasks/__init__.py'

运行

运行

docker run -d --net=host --name=ability_stable -p 8888:8888 -it ability:170304

shell

shell中单引号和双引号是有区别的,单引号不执行里面的命令,双引号执行里面的命令

Linux

时区有问题:cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

Python模块包

sudo vim /usr/local/lib/python3.5/dist-packages/capacity/