估计今晚失眠,好烦啊。嗯,可以在战斗力的每个计算进程中调用多线程啊!加快速度。
concurrent.futures
为异步调用提供了高层次的接口。
异步运行可以通过线程来运行,使用ThreadPoolExecutor
。或者是独立的进程,使用ProcessPoolExecutor
。每种实现都是相同的接口,都是通过抽象类Executor
来定义的。
Executor Objects
Executor
是一个抽象类,提供了异步调用的方法。不应该直接使用而是通过具体的子类来调用,其中包含了以下方法。
submit
submit(fn, *args, **kwargs)
排定任务,然后会返回一个Future
对象,代表了要执行的可调用对象
|
|
map
map(func, *iterables, timeout=None, chunksize=1)
和Python中的map很像,但是是异步调用的并且一些调用将会是并发执行的。返回的迭代器当在timeout
秒后没有得到结果,那么将会引发concurrent.futures.TimeoutError
异常。如果timeout没有指定或者是None的话,将不会限制等待时间。
如果其中的一个调用引发了异常,那么当从迭代器中取数据的时候这个异常会引发。当使用ProcessPoolExecutor
的时候,这个方法将会将迭代对象分成不同份提交给进程池来独立的运行。这些块的大小通过设置chunksize
来指定。对于特别长的可迭代对象,给chunksize
指定一个特别大的值可以有效的提高性能。使用ThreadPoolExecutor
时候,chunksize
是无效的。
shutdown
shutdown(wait=True)
当等待的任务结束后,给运行单元一个释放所有资源的信号。在shutdown之后调用Executor.submit()
和Executor.map()
将会引发一个RuntimeError
如果wait
是True,这个方法会等待所有的任务结束后,然后与这个运行单元相关的资源都释放了才返回。如果wait
是False,那么这个方法会立即返回,并且在所有等待任务结束后释放资源。如果忽略这个参数,整个Python程序将一直到所有等待任务结束后才会退出。
你可以避免显式的取调用这个方法,如果你是用with语句的话,它会关闭这个执行器的。
|
|
ThreadPoolExecutor
ThreadPoolExecutor
是Executor
的一个子类,会使用一个线程池来异步执行调用。
在从可调用对象生成的Future中等待另一个Future的结果的时候,会发生死锁。
|
|
还有这个,因为只有一个线程来执行
|
|
ThreadPoolExecutor
ThreadPoolExecutor(max_workers=None, thread_name_prefix=”)
Executor
子类使用最多max_workers
个线程去异步调用。
如果max_workers
给定None或者没有给定,那么默认会执行当前机器核心数的5倍线程数,假定ThreadPoolExecutor
会经常在I/O
而不是CPU运算的时候,max_workers
应该给定更高的值。
thread_name_prefix
允许用户去控制线程,也比较容易debug
ThreadPoolExecutor Example
|
|
ProcessPoolExecutor
这个类使用进程池的Executor
子类,它使用了multiprocessing
模块,这样就可以避开全局解释器锁,同时也意味这只有可序列化的对象可以被执行和返回。
__main__
模块必须被子进程的worker导入,因此意味着ProcessPoolExecutor
是不能在交互式解释器中运行的。
在提交到ProcessPoolExecutor
可调用对象中调用Executor
或者Future
方法会导致死锁。
ProcessPoolExecutor(max_workers=None)
它会使用最多max_workers
个进程来异步执行任务。如果max_workers
没有给定或者是None,将会启动当前机器核心数相同的进程。如果max_workers
数目小于等于0,将会引发ValueError
如果worker进程中的一个意外终止了,BrokenProcessPool
错误将会引发。在之前,行为是不确定的,大多数情况是死锁。
ProcessPoolExecutor Example
|
|
Future Objects
Future类压缩了异步调用的执行。通过Executor.submit()
来创建一个Future实例,不应该直接实例化,下面是其中的方法
cancel()
尝试取消调用,如果调用已经被执行,那么是无法取消的,这个方法就会返回False。否则将会被取消,返回True
cancelled()
如果成功取消则返回True
running()
如果正在运行,并且不能取消,则返回True
done()
如果被成功取消或者完成运行后返回True
result(timeout=None)
返回调用的结果。如果调用没有完成,那么将会等待timeout秒。如果在timeout秒还没有完成,将会引发concurrent.futures.TimeoutError
。如果没有设置或者是None,将不会有等待时间
如果future在完成前取消,cancelledError
将会引发。
如果调用引发异常,这里会引发同样的异常。
exception(timeout=None)
add_done_callback(fn)
set_result(result)
set_exception(exception)
模块函数
concurrent.futures.wait
wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待通过fs给定的Future实例完成运行,返回两个值,第一个值是在wait函数完成调用时,完成的Future。第二个是没有完成的
return_when
表示了函数应该在什么时候返回,有以下几个参数
FIRST_COMPLETED
FIRST_EXCEPTION
ALL_COMPLETED
concurrent.futures.as_completed
as_completed(fs, timeout=None)
返回一个通过fs给定的Future实例的迭代器,可能是不同的Executor
实例创建的,这个迭代器y有任务完成的时候产生future,因此这个必须迭代才能起到阻塞完成的效果。fs给定的任何future如果重复的话,只会返回一次。如果在as_completed
之前返回的会最先产生。