翻译自 concurrent.futures 运行并行任务

估计今晚失眠,好烦啊。嗯,可以在战斗力的每个计算进程中调用多线程啊!加快速度。

concurrent.futures为异步调用提供了高层次的接口。

异步运行可以通过线程来运行,使用ThreadPoolExecutor。或者是独立的进程,使用ProcessPoolExecutor。每种实现都是相同的接口,都是通过抽象类Executor来定义的。

Executor Objects

Executor是一个抽象类,提供了异步调用的方法。不应该直接使用而是通过具体的子类来调用,其中包含了以下方法。

submit

submit(fn, *args, **kwargs)

排定任务,然后会返回一个Future对象,代表了要执行的可调用对象

1
2
3
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())

map

map(func, *iterables, timeout=None, chunksize=1)

和Python中的map很像,但是是异步调用的并且一些调用将会是并发执行的。返回的迭代器当在timeout秒后没有得到结果,那么将会引发concurrent.futures.TimeoutError异常。如果timeout没有指定或者是None的话,将不会限制等待时间。

如果其中的一个调用引发了异常,那么当从迭代器中取数据的时候这个异常会引发。当使用ProcessPoolExecutor的时候,这个方法将会将迭代对象分成不同份提交给进程池来独立的运行。这些块的大小通过设置chunksize来指定。对于特别长的可迭代对象,给chunksize指定一个特别大的值可以有效的提高性能。使用ThreadPoolExecutor时候,chunksize是无效的。

shutdown

shutdown(wait=True)

当等待的任务结束后,给运行单元一个释放所有资源的信号。在shutdown之后调用Executor.submit()Executor.map()将会引发一个RuntimeError

如果wait是True,这个方法会等待所有的任务结束后,然后与这个运行单元相关的资源都释放了才返回。如果wait是False,那么这个方法会立即返回,并且在所有等待任务结束后释放资源。如果忽略这个参数,整个Python程序将一直到所有等待任务结束后才会退出。

你可以避免显式的取调用这个方法,如果你是用with语句的话,它会关闭这个执行器的。

1
2
3
4
5
6
import shutil
with ThreadPoolExecutor(max_workers=4) as e:
e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ThreadPoolExecutor

ThreadPoolExecutorExecutor的一个子类,会使用一个线程池来异步执行调用。

在从可调用对象生成的Future中等待另一个Future的结果的时候,会发生死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

还有这个,因为只有一个线程来执行

1
2
3
4
5
6
def wait_on_future():
f = executor.submit(pow, 5, 2)
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

ThreadPoolExecutor

ThreadPoolExecutor(max_workers=None, thread_name_prefix=”)

Executor子类使用最多max_workers个线程去异步调用。

如果max_workers给定None或者没有给定,那么默认会执行当前机器核心数的5倍线程数,假定ThreadPoolExecutor会经常在I/O而不是CPU运算的时候,max_workers应该给定更高的值。

thread_name_prefix允许用户去控制线程,也比较容易debug

ThreadPoolExecutor Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

这个类使用进程池的Executor子类,它使用了multiprocessing模块,这样就可以避开全局解释器锁,同时也意味这只有可序列化的对象可以被执行和返回。

__main__模块必须被子进程的worker导入,因此意味着ProcessPoolExecutor是不能在交互式解释器中运行的。

在提交到ProcessPoolExecutor可调用对象中调用Executor或者Future方法会导致死锁。

ProcessPoolExecutor(max_workers=None)

它会使用最多max_workers个进程来异步执行任务。如果max_workers没有给定或者是None,将会启动当前机器核心数相同的进程。如果max_workers数目小于等于0,将会引发ValueError

如果worker进程中的一个意外终止了,BrokenProcessPool错误将会引发。在之前,行为是不确定的,大多数情况是死锁。

ProcessPoolExecutor Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()

Future Objects

Future类压缩了异步调用的执行。通过Executor.submit()来创建一个Future实例,不应该直接实例化,下面是其中的方法

cancel()

尝试取消调用,如果调用已经被执行,那么是无法取消的,这个方法就会返回False。否则将会被取消,返回True

cancelled()

如果成功取消则返回True

running()

如果正在运行,并且不能取消,则返回True

done()

如果被成功取消或者完成运行后返回True

result(timeout=None)

返回调用的结果。如果调用没有完成,那么将会等待timeout秒。如果在timeout秒还没有完成,将会引发concurrent.futures.TimeoutError。如果没有设置或者是None,将不会有等待时间

如果future在完成前取消,cancelledError将会引发。

如果调用引发异常,这里会引发同样的异常。

  • exception(timeout=None)
  • add_done_callback(fn)
  • set_result(result)
  • set_exception(exception)

模块函数

concurrent.futures.wait

wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待通过fs给定的Future实例完成运行,返回两个值,第一个值是在wait函数完成调用时,完成的Future。第二个是没有完成的

return_when表示了函数应该在什么时候返回,有以下几个参数

  • FIRST_COMPLETED
  • FIRST_EXCEPTION
  • ALL_COMPLETED

concurrent.futures.as_completed

as_completed(fs, timeout=None)

返回一个通过fs给定的Future实例的迭代器,可能是不同的Executor实例创建的,这个迭代器y有任务完成的时候产生future,因此这个必须迭代才能起到阻塞完成的效果。fs给定的任何future如果重复的话,只会返回一次。如果在as_completed之前返回的会最先产生。