翻译自https://blog.miguelgrinberg.com/post/using-celery-with-flask
http://www.cnblogs.com/jay54520/p/6212623.html
终于得到自己web程序要进行计算密集型的解决办法了,所以将此文翻译留作后用,也方便有需要的人。
正文
Working with Flask and Celery
Celery和Flask的整合是非常容易的,没有其他需要的扩展包。一个使用Celery的Flask应用需要这样初始化Celery客户端:
|
|
正如所看到的,Celery是通过创建Celery的实例初始化的,传递了应用的名称和连接URL作为消息代理,将CELERY_BROKER_URL
作为键放入app.config
字典。这个URL告诉代理服务在哪里运行。如果你运行的不是Redis,或者有在不同的机器上的代理,那么你需要相应的改变这个URL。
另外的可选配置选项可以直接从Flask的配置通过celery.conf.update()
传递,CELERY_RESULT_BACKEND
只有你需要存储任务状态和结果值的时候才是必须的。第一个例子不需要这个功能,但是第二个需要,所以最好从一开始配置好。
任何你想作为后台任务的函数都需要使用celery.task
装饰器装饰,比如
|
|
然后Flask应用就可以像如下请求后台任务的运行了:
|
|
delay()
方法是更加强大的apply_async()
调用的一个缩略,这里是一个同等的调用使用apply_async()
:
|
|
当使用apply_async()
,你可以给Celery关于后台任务如何被运行更加详细的指示,一个有用的选项是请求任务在将来的某个时候运行。例如,下面的语句将会使任务在大约1分钟后运行
task = my_background_task.apply_async(args=[10, 20], countdown=60)
delay()
和apply_async()
的返回值都是一个代表任务的对象,这个对象被用来获取状态。我将在稍后的文章里展示如何完成的,但是现在keep it simple
并且不要关心这些任务的结果。
Simple Example: Sending Asynchronous Emails
第一个例子将要展示的是一个非常普通的应用需求:不阻塞主应用去发送电子邮件。
作为示例我将使用Flask-Mail
扩展,我已经在其他文章中详细的介绍了。我将假定你对这这个扩展很熟悉,因此如果你需要复习一下请看我的Flask book或者这篇介绍。
这个我将用于展示的示例应用包含有只有一个输入的简单web表单。用户被要求输入email地址在这个区域,然后提交,服务器将会向该地址发送一封email。表单含有两个提交按钮,一个是立即发送email,另外一个是等待一分钟后发送。截图最上面的部分展示了这个表单的样子。
这里是支持这个例子的HTML模板:
|
|
希望你没有觉得惊讶,只是常规的HTML表单,加上可以从Flask闪现消息的能力。
Flask-Mail扩展需要一些配置,特别是当发送email的时候使用的邮件服务器,我使用Gmail账户作为email服务器
|
|
现在为了防止我的email账户处于风险中,我将它们设置成环境变量,然后导入到应用中。
这是一个支持这个例子的简单路由
|
|
同样的,这是一个标准的Flask,因为只是一个非常简单的表单,所以我没有使用任何扩展,我使用了request.method
和request.form
来做所有的管理。我在session
中保存了用户在文本输入区输入的值,这样我就可以在页面重新载入之后记住它。
有意思的一点是发送email,它是被一个Celery
任务叫send_async_email
处理的, 通过delay()
或者apply_async()
来激发的
应用最后一部分是让任务完成的异步任务函数
|
|
这个函数被celery.task
装饰器装饰,变成了后台任务。这个函数值得关注的是Flask-Mail
需要应用上下文才能运行,所以在send()
方法被调用之前首先应该获取应用上下文。
应该注意的是在这个异步调用中返回的值没有被保存,所以应用不会知道调用是否成功。当运行这个例子的时候,应该在发送邮件时候出现问题去查看Celery工作者的输出。
Complex Example: Showing Status Updates and Results
上面的例子很简单,后台任务启动后,应用就不用管了。大多数的Celery的web开发指南就到这里结束了,但是事实上对很多应用来说,管理后台任务和获取结果是很必要的。
我现在要做的是扩展上面的应用,这个例子展示了一个长时间运行的任务。用户可以通过点击按钮来启动一个或者多个任务,浏览器上的web页面通过ajax来获取这些任务的状态更新。对于每一个任务页面将会有一个状态条,完成的百分比和一个状态信息,当任务完成后,结果值也将会显示。
Background Tasks with Status Updates
第二个例子中使用的后台任务,这里就相当于计算任务,然后结束的时候返回状态,使用update_state
来更新状态
|
|
这个任务中我在Celery装饰器中添加了bind=True
参数,用来传递一个self
参数给这个函数,这样可以用来记录状态的更新。
既然这个任务没有做任何有用的事,我决定使用一些动词,形容词和名字的集合来创建一个有趣的状态消息。你可以在上面看到我所用到的词语。没有什么错但是有趣是吧?
函数有一个10到50的随机数字的循环,每一次运行任务都会有不同的时间。在第一次迭代的时候会创建一个不同的状态消息,在随后的循环中有25%的可能性改变这个消息。
self.update_state()
调用使得Celery接受这些任务更新。那里有好几个内置状态,比如STARTED
, SUCCESS
等等,但是Celery也允许自定义状态。这里我用到了自定义状态PROGRESS
。这个状态附带了一个附加的元数据,以Python字典的形式包含了当前和总的迭代数,还有一个随机的状态消息。客户端可以使用这些元素来形成一个进度条。每一次迭代休眠一秒,模拟一些完成的工作。
当循环退出的时候,Python字典作为函数返回值返回。字典包涵了更新的迭代计数,最终的状态信息和有趣的结果。
long_task()
函数在一个Celery工作者中处理,下面是Flask应用的路由来启动后台任务
|
|
客户端需要向/longtask
提交一个POST请求来开始一个任务,服务器开始任务并且存储返回值。响应状态值用的是202,通常用在REST APIs来标志一个请求正在被处理。我也加上了Location
头,里面的URL用户可以用来获取状态信息。这个URL指向了Flask路由taskstatus,使用task.id
作为动态参数
Accessing Task Status from the Flask Application
taskstatus
路由负责报告由后台任务提供的状态更新,这是这个路由的实现
|
|
路由生成了一个JSON响应,包涵了任务状态和所有我在update_state()
中meta设置的值,这样客户端就可以用这个来创建一个进度条了。不幸的是,运行函数需要检查一些边界条件,因此它会运行一段时间。为了取得任务数据,我重新创建了任务对象,它是AsyncResult
类的对象,使用在URL中传递的task id
防坑指南
以上内容是原文的主要内容,下面我要写一下自己在使用Celery中出现的问题和解决方法。我使用的是redis做为broker和backend,代码如下:
celery = Celery('backend', broker='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB),backend='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB))
这句话在celery.py
文件里,该文件在应用目录最高层,和__init__.py
是同一级的,所以在启动celery时候这样:celery --app=backend.celery worker --loglevel=DEBUG --config=celery_settings
, --app
这个参数就不必解释了,最后一个--config
参数,这个是导入celery的参数,其实有很多参数,我在应用中只用到了CELERY_IMPORTS = ("tasks", )
,作用就是将tasks.py
中的task都注册了。
其中tasks.py
也是和celery.py
同一级的,里面包涵了所有要计算的task函数,比如这样
|
|
然后将这些函数导入到路由那块,然后启动就好了。
启动时候出现错误connection refused
,这个原因应该是redis在这样的启动方式下无法连接,所以我使用inv web.gunicorn
启动,inv
就是invoke
,这个是自定义启动方式的一个包,其实就是相当于gunicorn启动。启动之后就好了,可以正常运行了。
在Celery tasks里面使用多进程
这个可是极好的功能,我试了一下在我的程序里加入多进程,由一个Python进程中运行变成现在四个进程运行,运行时间从0.36分钟减少到0.13分钟,大概3倍,因为多进程调度有损耗,所以不会完全的是4倍,但是已经很好啦。如果是8核16线程的话,大概能有12倍左右的提升。
以下是一个使用例子:
|
|
直接启动多进程是肯定不可以的,因为是守候进程curr_proc.daemon=True
,所以启多进程之前主动设置为非守候进程curr_proc.daemon=False
,启动了以后再设为守候进程
一些其他的总结
向docker里面传文件
cat invtasks/__init__.py | docker exec -i -u root beehive_ sh -c 'cat > /opt/beehive/invtasks/__init__.py'
运行
运行
docker run -d --net=host --name=ability_stable -p 8888:8888 -it ability:170304
shell
shell中单引号和双引号是有区别的,单引号不执行里面的命令,双引号执行里面的命令
Linux
时区有问题:cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
Python模块包
sudo vim /usr/local/lib/python3.5/dist-packages/capacity/