Python中使用concurrent类
纸上得来终觉浅,绝知此事要躬行。
在多线程或多进程编程中,不可避免的需要使用start、join等方法,复杂的话还需要使用一到两个队列才能完成要求。如果没有一个良好的设计,随着代码量越来越多,会变得越来越复杂。而没有没有什么东西,可以将上述这些步骤抽象一下,让我们不关注这些细节轻装上阵呢?
- futures - 异步执行任务
- futures - 官方文档说明
- futures 管理的并发任务池
- 使用 tornado 让你的请求异步非阻塞
- 使用 Python 进行并发编程-PoolExecutor 篇
1. 原理介绍
核心原理:concurrent.futures会以子进程multiprocessing的形式,平行的运行多个Python解释器,从而令Python程序可以利用多核CPU来提升执行速度。由于子进程与主进程的Python解释器是相对分离,且它们的全局解释器锁也是相互独立的,所以每个子进程都能够完整的使用一个CPU内核,实现真正的平行计算。
使用说明
- 从Python3.2开始,这个concurrent.futures模块已经被划到标准库,所以不需要手动安装。而在Python2中,则需要自行安装和引入第三方库futures才能够使用。
# Python2需要安装 $ pip install futures
原理解释
- 在它的源码注释的内容中,干货很多,表达也很清晰。需要多多理解这个数据流图,对于理解该模块的原理是非常重要。我们需要注意一下这里面的future的用途和作用。
- 在传统的并发编程中,调用函数是同步的,也就是只能等待请求返回之后才能够处理其他的工作。而在future的这种模式下,调用方式改为了异步,而原先等待返回的时间段,在主调动函数里面就可以拥有处理其他事物的能力了。
- 在concurrent.futures模块中,最为重要的就是Executor和Future这两个核心类,Executor接收一个包含带有回调及参数的异步的任务请求,返回一个Future去执行该请求任务。
- 这个模块主要包含两个核心类,分别是多线程的ThreadPoolExecutor和多进程的ProcessPoolExecutor。它们就是对threading和multiprocessing进行了高级别的抽象,暴露出统一的接口,方便开发者使用。
我们结合源码和下面的数据流分析一下
图示说明
- [步骤 1]:executor.map或executor.submit会创建多个_WorkItem对象和对应的任务编号Work Ids,每个对象都传入了新创建的一个Future对象。
- [步骤 2]:然后把每个_WorkItem对象放进一个叫做「Work Items」的dict中,键是不同的「Work Ids」。
- [步骤 3]:创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做两件事情。
- [事情 1]:从「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。
- [事情 2]:从「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。
- [总结]:看起来就是一个「生产者/消费者」模型,不过要注意,整个过程并不是多个进程与任务+结果-两个队列直接通信的,而是通过一个中间的「Local worker thread」完成的。
- [总结]:设想一下,当某一段程序提交了一个请求,期望得到一个答复。但服务程序对这个请求可能很慢,在传统的单线程环境下,调用函数是同步的,也就是说它必须等到服务程序返回结果后,才能进行其他处理。而在Future模式下,调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可用于处理其他事物。
# ProcessPoolExecutor data-flow through the system |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | ... | | | | ... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+
2. 性能阐述
IO 密集型任务使用多线程,计算密集型任务使用多进程
普通循环计算
- 这里使用了解释器自带的map方法,比普通的循环性能更优且更优雅。
from time import time NUMBERS = range(30, 36) def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) time_start = time() print(list(map(fib, NUMBERS))) time_end = time() print(f'COST: {time_end - time_start}')
$ python futures.py [832040, 1346269, 2178309, 3524578, 5702887, 9227465] COST: 6.766396999359131
使用多线程计算
- 这是一个计算密集型函数,因为GIL的原因,多线程是无法提升效率的。同时,线程启动的时候,有一定的开销,与线程池进行通信,也会有开销,所以这个程序使用了多线程反而更慢了。
from time import time from concurrent.futures import ThreadPoolExecutor NUMBERS = range(30, 36) def fib(n): if n <= 2: return 1 else: return fib(n-1) + fib(n-2) time_start = time() executor = ThreadPoolExecutor(max_workers=2) print(list(executor.map(fib, NUMBERS))) time_end = time() print(f'COST: {time_end - time_start}')
$ python futures.py [832040, 1346269, 2178309, 3524578, 5702887, 9227465] COST: 6.824882984161377
使用多进程计算
- 在多核的操作系统上,运行多进程程序,比其他两个版本都快,而且快很多。这是因为ProcessPoolExecutor底层就是利用multiprocessing模块所提供的机制实现的。
- 需要我们注意的是,主进程和子进程之间通信必须进行序列化和反序列化的操作,而且数据量较大的时候内存消耗也比较严重,所以multiprocessing开销相对来说是比较大。
具体的执行步骤
- [1] 将NUMBERS列表中的每一项传递给多进程实例executor的map方法
- [2] 用pickle模块对数据进行序列化,将其变成二进制形式
- [3] 通过本地套接字,将序列化之后的数据从解释器所在的进程,发送到子解释器所在的进程
- [4] 在子进程中,用pickle对二进制数据进行反序列化,将其还原成Python对象
- [5] 引入包含fib函数的Python模块
- [6] 各子进程并行的对各自的输入数据进行计算
- [7] 各子进程对运行的结果进行序列化操作,将其转变成字节
- [8] 各子进程将这些字节通过socket复制到主进程之中
- [9] 主进程对这些字节执行反序列化操作,将其还原成Python对象
- [10] 最后把每个子进程所求出的计算结果合并到一份列表之中,并返回给调用者
from time import time from concurrent.futures import ProcessPoolExecutor NUMBERS = range(30, 36) def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) time_start = time() with ProcessPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print(f'fib({num}) = {result}') time_end = time() print(f'COST: {time_end - time_start}')
$ python futures.py fib(30) = 832040 fib(31) = 1346269 fib(32) = 2178309 fib(33) = 3524578 fib(34) = 5702887 fib(35) = 9227465 COST: 5.294888973236084
3. 相关接口
虽然concurrent中只有futures这一个模块,但是功能还是很强大的。
接口说明
- 主要的两个核心类ThreadPoolExecutor和ProcessPoolExecutor都是继承自Executor类,分别被用来创建线程池和进程池。
- 值得一提的是,Executor实现了__enter__和__exit__方法,使得其对象可以使用with操作符进行上下文管理,很是方便呀。
Executor是一个抽象类,它提供了方法来执行异步调用。
Executor Objects
- map(func, *iterables, timeout=None, chunksize=1)
- 返回并发计算的结果,顺序和*iterables迭代器的顺序是一致。
- 这里我们使用with操作符,使得当任务执行完成之后自动执行shutdown函数,而无需编写相关释放代码。
- submit(fn, *args, **kwargs)
- 用于提交一个可并行的方法,同时返回一个future实例。
- future对象标识这个线程/进程异步进行,并在未来的某个时间执行完成。
- shutdown(wait=True)
- 类似与进程池/进程池中的close及join一起的效果
- __enter__()/__exit__(exc_type, exc_val, exc_tb)
- 可以使用with操作符进行上下文管理。
# 【1】map from time import time from concurrent.futures import ProcessPoolExecutor NUMBERS = range(30, 36) def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) time_start = time() with ProcessPoolExecutor(max_workers=2) as executor: results = list(executor.map(fib, NUMBERS)) print(f'results: {results}') time_end = time() print(f'COST: {time_end - time_start}')
# 【2】submit from time import time from concurrent.futures import ProcessPoolExecutor futures = [] NUMBERS = range(30, 36) def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) time_start = time() with ProcessPoolExecutor(max_workers=2) as executor: for num in NUMBERS: future = executor.submit(fib, num) futures.append(future) print(f'Results: {future.result() for future in futures}') time_end = time() print(f'COST: {time_end - time_start}')
Future类封装了异步执行操作,执行Executor.submit返回一个Future对象。
Future Objects
- cancel()
- 判断任务是否已经取消
- cancelled()
- 判断任务是否可以取消
- running()
- 判断任务是否执行中
- done()
- 判断任务是否执行完成
- result(timeout=None)
- 返回执行的结果
- exception(timeout=None)
- 等待多长时间之后自动抛出异常信息
- add_done_callback(fn)
- 默认接收一个future对象,可以在定义的函数中对其进行操作
#【1】exception from concurrent.futures import ProcessPoolExecutor NUMBERS = range(30, 36) def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) executor = ProcessPoolExecutor(max_workers=3) for num in NUMBERS: executor.submit(fib, num).exception(timeout=10) executor.shutdown(wait=True)
#【2】add_done_callback from concurrent.futures import ProcessPoolExecutor NUMBERS = range(30, 36) def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) def get_result(future): print(f'>>> {future.result()}') executor = ProcessPoolExecutor(max_workers=3) for num in NUMBERS: executor.submit(fib, num).add_done_callback(get_result) executor.shutdown(wait=True)
由concurrent.futures提供的常用方法。
模块方法
- ThreadPoolExecutor(max_size)
- 多线程
- ProcessPoolExecutor(max_size)
- 多进程
- wait(fs, timeout=None, return_when=ALL_COMPLETED)
- 使用wait方法会返回一个元组,其中包含两个集合,一个是completed(已完成的)另外一个是uncompleted(未完成的)。
- 使用wait方法的另一个优势就是能够获得更大的自由度,它接收三个参数FIRST_COMPLETED(表示其会等待直到第一个任务执行完成并返回当时所有执行成功的任务), FIRST_EXCEPTION(表示其会等待直到第一个任务执行报错并返回当时所有执行成功的任务)和ALL_COMPLETE(表示其会等待所有任务执行完成并返回当时所有执行成功的任务),默认设置为ALL_COMPLETED。
- as_completed(fs, timeout=None)
- 该方法传入futures迭代器和timeout两个参数
- 默认timeout参数的值为None,会阻塞等待任务执行完成,之后返回执行完成的future对象迭代器。这里的迭代器,是通过yield实现的。
- timeout参数的值大于0,则等待timeout时间,如果timeout设置的时间到了但仍有任务未能完成,不再执行并抛出TimeoutError异常。
#【1】as_completed from time import time from concurrent.futures import ProcessPoolExecutor, as_completed NUMBERS = range(30, 36) def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) time_start = time() with ProcessPoolExecutor(max_workers=3) as executor: futures = [executor.submit(fib, num) for num in NUMBERS] for future in futures: print(f'执行中:{future.running()}, 已完成:{future.done()}') print(f'#### 分界线 ####') for future in as_completed(futures, timeout=2): print(f'执行中:{future.running()}, 已完成:{future.done()}') time_end = time() print(f'COST: {time_end - time_start}')
$ python futures.py 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False #### 分界线 #### 执行中:False, 已完成:True 执行中:False, 已完成:True 执行中:False, 已完成:True ---------------------------------------------------------------- TimeoutError Traceback (most recent call last) ~/Escape/MorePractise/test/func.py in <module>() 16 print(f'执行中:{future.running()}, 已完成:{future.done()}') 17 print(f'#### 分界线 ####') ---> 18 for future in as_completed(futures, timeout=2): 19 print(f'执行中:{future.running()}, 已完成:{future.done()}') 20 time_end = time() ~/.pyenv/versions/3.6.4/lib/python3.6/concurrent/futures/_base.py in as_completed(fs, timeout) 236 raise TimeoutError( 237 '%d (of %d) futures unfinished' % ( --> 238 len(pending), total_futures)) 239 240 waiter.event.wait(wait_timeout) TimeoutError: 3 (of 6) futures unfinished
#【2】wait from time import time from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED NUMBERS = range(30, 36) def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) time_start = time() with ProcessPoolExecutor(max_workers=3) as executor: futures = [executor.submit(fib, num) for num in NUMBERS] print('>>> Start process...') for future in futures: print(f'执行中:{future.running()}, 已完成:{future.done()}') print('###### 分界线 ######') done, unfinished = wait(futures, timeout=2, return_when=ALL_COMPLETED) print('>>> done process...') for task in done: print(f'执行中:{task.running()}, 已完成:{task.done()}') print('>>> unfinished process...') for task in unfinished: print(f'执行中:{task.running()}, 已完成:{task.done()}') time_end = time() print(f'COST: {time_end - time_start}')
$ python futures.py >>> Start process... 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:False, 已完成:False ###### 分界线 ###### >>> done process... 执行中:False, 已完成:True 执行中:False, 已完成:True 执行中:False, 已完成:True >>> unfinished process... 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False COST: 4.277174949645996
常见的异常类
- 可以编程中主动抛出如下类型的异常信息
exception concurrent.futures.CancelledError Raised when a future is cancelled. exception concurrent.futures.TimeoutError Raised when a future operation exceeds the given timeout. exception concurrent.futures.BrokenExecutor Derived from RuntimeError, this exception class is raised when an executor is broken for some reason, and cannot be used to submit or execute new tasks. New in version 3.7. exception concurrent.futures.thread.BrokenThreadPool Derived from BrokenExecutor, this exception class is raised when one of the workers of a ThreadPoolExecutor has failed initializing. New in version 3.7. exception concurrent.futures.process.BrokenProcessPool Derived from BrokenExecutor (formerly RuntimeError), this exception class is raised when one of the workers of a ProcessPoolExecutor has terminated in a non-clean fashion (for example, if it was killed from the outside). New in version 3.3.
4. 实际使用
如果需要并发的执行一个任务,是选择map呢?还是submit呢?
[1] map 和 submit 的选择
- 如果我们需要提交的函数是一样的,就可以使用map进行处理。但是如果我们提交的函数是不一样或执行过程中可能存在异常的情况下,就需要使用到另一个submit进行处理。
- 因为使用map在执行过程中,如果出现异常就会直接抛出错误,后续步骤不会再执行了。但是submit可以分开进行处理,我们可以使用as_completed来检测其是否执行成功了。如果其执行成功会返回对应的值,如果没有成功的话,会抛出我们自己的定义的异常信息。
- 我们都知道future_to_num是定义好的一个字典,包含对应的执行函数的执行结果以及对应的输入值。而as_completed就是遍历这个函数执行结果的字典,一旦发现结果报错,就会自动执行我们事先自定义的输出信息。
from concurrent.futures import ThreadPoolExecutor, as_completed NUMBERS = range(10, 20) def fib(n): if n == 17: raise Exception("Don't do this") if n<= 2: return 1 return fib(n-1) + fib(n-2) with ThreadPoolExecutor(max_workers=3) as executor: future_to_num = {executor.submit(fib, num): num for num in NUMBERS} for future in as_completed(future_to_num): num = future_to_num[future] try: result = future.result() except Exception as e: print(f'raise an exception: {e}') else: print(f'fib({num}) = {result}') with ThreadPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print(f'fib({num}) = {result}')
$ python futures.py fib(10) = 55 fib(11) = 89 fib(12) = 144 fib(13) = 233 fib(14) = 377 fib(15) = 610 fib(16) = 987 Don not do this. Don not do this. Don not do this. fib(10) = 55 fib(11) = 89 fib(12) = 144 fib(13) = 233 fib(14) = 377 fib(15) = 610 fib(16) = 987 fib(17) = 1597 fib(18) = 2584 fib(19) = 4181
用multiprocessing中的Pool还是concurrent.futures中的PoolExecutor?
[2] 线程/进程池的选择
- Future是很常用的一种并发设计的模式,在其他语言中也可以看到这种解决方案。一个Future对象代表了一些尚未就绪(完成)的结果,在「将来」的某个时间就绪了之后就可以获取到这个结果。比如上面的例子,我们期望并发的执行一些参数不同的fib函数,获取全部的结果。传统模式就是在等待queue.get返回结果,这个是同步模式,而在Future模式下,调用方式改为异步。而原先等待返回的时间段,由于「Local worker thread」的存在,这个时候可以完成其他工作。
- 上面说到的map很像进程池或线程池的效果,但是我们发现在使用多进程时时间反而变短了。这是因为concurrent.futures底层调用的还是threaing和multiprocessing这两个模块,相对于在其上封装了一份,隐藏内部细节,方便开发者使用。
- 如何选择还是看具体需求和开发习惯了,我比较喜欢用concurrent.futures的。因为PoolExecutor由于用了future这种设计模式,一旦完成就会吐出答案,是一行一行的输出,而multiprocessing.pool是把全部结果都算完了一起返回结果。在体验上来说,第一种方式更好。
- concurrent.futures的架构明显要复杂一些,不过更利于写出高效、异步、非阻塞的并行代码,而ThreadPeool/Pool更像一个黑盒,你用就好了,细节不仅屏蔽定制性也差。
- concurrent.futures的接口更简单一些。ThreadPool/Pool的API中有processes、initializer、initargs、maxtasksperchild、context等参数,新人看起来容易不解。而concurrent.futures的参数就一个max_workers而已。
import time from multiprocessing.pool import Pool NUMBERS = range(10, 20) def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) start = time.time() pool = Pool(3) for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)): print(f'fib({num}) = {result}') print(f'COST: {time.time() - start}')
$ python futures.py fib(10) = 55 fib(11) = 89 fib(12) = 144 fib(13) = 233 fib(14) = 377 fib(15) = 610 fib(16) = 987 fib(17) = 1597 fib(18) = 2584 fib(19) = 4181 COST: 0.0200132116133264350
[3] 使用注意事项
- 当「Python版本小于3.5」并且「待处理的任务量比较大时」不应该使用concurrent.futures这种方案。阅读作者的博客,解释说是multiprocessing.pool是批量提交任务的,这样可以节省IPC(进程间通信)的开销。而PoolExecutor则是每一只提交一个任务,所以导致性能差距很大。
- 在Python3.5的时候给修复了,可以通过给map方法传递一个chunksize的参数来解决。所以当我们写代码的时候需要处理大量任务的时候,就只需要给一个比较大的chunksize参数值即可。
multiprocessing.Pool.map outperforms ProcessPoolExecutor.map. Note that the performance difference is very small per work item, so you'll probably only notice a large performance difference if you're using map on a very large iterable. The reason for the performance difference is that multiprocessing.Pool will batch the iterable passed to map into chunks, and then pass the chunks to the worker processes, which reduces the overhead of IPC between the parent and children. ProcessPoolExecutor always passes one item from the iterable at a time to the children, which can lead to much slower performance with large iterables, due to the increased IPC overhead. The good news is this issue will be fixed in Python 3.5, as as chunksize keyword argument has been added to ProcessPoolExecutor.map, which can be used to specify a larger chunk size if you know you're dealing with large iterables. See this bug for more info.
import time from multiprocessing.pool import Pool from concurrent.futures import ProcessPoolExecutor K = 50 NUMBERS = range(1, 100000) def f(x): r = 0 for k in range(1, K+2): r += x ** (1 / k**1.5) return r print('multiprocessing.pool.Pool:n') start = time.time() l = [] pool = Pool(3) for num, result in zip(NUMBERS, pool.map(f, NUMBERS)): l.append(result) print(len(l)) print('COST: {}'.format(time.time() - start)) print('ProcessPoolExecutor without chunksize:n') start = time.time() l = [] with ProcessPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(f, NUMBERS)): l.append(result) print(len(l)) print('COST: {}'.format(time.time() - start)) print('ProcessPoolExecutor with chunksize:n') start = time.time() l = [] with ProcessPoolExecutor(max_workers=3) as executor: # 保持和multiprocessing.pool的默认chunksize一样 chunksize, extra = divmod(len(NUMBERS), executor._max_workers * 4) for num, result in zip(NUMBERS, executor.map(f, NUMBERS, chunksize=chunksize)): l.append(result) print(len(l)) print('COST: {}'.format(time.time() - start))
5. 应用场景
- 并发计算
from concurrent import futures import math PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): with futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime)) if __name__ == '__main__': main()
- 网络爬虫
from concurrent import futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def load_url(url, timeout): return urllib.request.urlopen(url, timeout=timeout).read() def main(): with futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = dict( (executor.submit(load_url, url, 60), url) for url in URLS) for future in futures.as_completed(future_to_url): url = future_to_url[future] try: print('%r page is %d bytes' % ( url, len(future.result()))) except Exception as e: print('%r generated an exception: %s' % ( url, e)) if __name__ == '__main__': main()
6. 源码展示
在concurrent.futures模块中,最为重要的就是Executor和Future这两个核心类,Executor接收一个包含带有回调及参数的异步的任务请求,返回一个Future去执行该请求任务。
- Executor
- Executor是一个抽象类,它提供了方法来执行异步调用
class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" def submit(self, fn, *args, **kwargs): """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable. Returns: A Future representing the given call. """ raise NotImplementedError() def map(self, fn, *iterables, timeout=None, chunksize=1): """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. chunksize: The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. """ if timeout is not None: end_time = timeout + time.time() fs = [self.submit(fn, *args) for args in zip(*iterables)] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): try: # reverse to keep finishing order fs.reverse() while fs: # Careful not to keep a reference to the popped future if timeout is None: yield fs.pop().result() else: yield fs.pop().result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator() def shutdown(self, wait=True): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other methods can be called after this one. Args: wait: If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed. """ pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown(wait=True) return False
- Future
- Future类封装了可调用的异步执行,可以通过Executor.submit返回一个Future实例
class Future(object): """Represents the result of an asynchronous computation.""" def __init__(self): """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING self._result = None self._exception = None self._waiters = [] self._done_callbacks = [] def _invoke_callbacks(self): for callback in self._done_callbacks: try: callback(self) except Exception: LOGGER.exception('exception calling callback for %r', self) def __repr__(self): with self._condition: if self._state == FINISHED: if self._exception: return '<%s at %#x state=%s raised %s>' % ( self.__class__.__name__, id(self), _STATE_TO_DESCRIPTION_MAP[self._state], self._exception.__class__.__name__) else: return '<%s at %#x state=%s returned %s>' % ( self.__class__.__name__, id(self), _STATE_TO_DESCRIPTION_MAP[self._state], self._result.__class__.__name__) return '<%s at %#x state=%s>' % ( self.__class__.__name__, id(self), _STATE_TO_DESCRIPTION_MAP[self._state]) def cancel(self): """Cancel the future if possible. Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """ with self._condition: if self._state in [RUNNING, FINISHED]: return False if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: return True self._state = CANCELLED self._condition.notify_all() self._invoke_callbacks() return True def cancelled(self): """Return True if the future was cancelled.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] def running(self): """Return True if the future is currently executing.""" with self._condition: return self._state == RUNNING def done(self): """Return True of the future was cancelled or finished executing.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] def __get_result(self): if self._exception: raise self._exception else: return self._result def add_done_callback(self, fn): """Attaches a callable that will be called when the future finishes. Args: fn: A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added. """ with self._condition: if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: self._done_callbacks.append(fn) return fn(self) def result(self, timeout=None): """Return the result of the call that the future represents. Args: timeout: The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time. Returns: The result of the call that the future represents. Raises: CancelledError: If the future was cancelled. TimeoutError: If the future didn't finish executing before the given timeout. Exception: If the call raised then that exception will be raised. """ with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self.__get_result() self._condition.wait(timeout) if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self.__get_result() else: raise TimeoutError() def exception(self, timeout=None): """Return the exception raised by the call that the future represents. Args: timeout: The number of seconds to wait for the exception if the future isn't done. If None, then there is no limit on the wait time. Returns: The exception raised by the call that the future represents or None if the call completed without raising. Raises: CancelledError: If the future was cancelled. TimeoutError: If the future didn't finish executing before the given timeout. """ with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self._exception self._condition.wait(timeout) if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self._exception else: raise TimeoutError() # The following methods should only be used by Executors and in tests. def set_running_or_notify_cancel(self): """Mark the future as running or process any cancel notifications. Should only be used by Executor implementations and unit tests. If the future has been cancelled (cancel() was called and returned True) then any threads waiting on the future completing (though calls to as_completed() or wait()) are notified and False is returned. If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned. This method should be called by Executor implementations before executing the work associated with this future. If this method returns False then the work should not be executed. Returns: False if the Future was cancelled, True otherwise. Raises: RuntimeError: if this method was already called or if set_result() or set_exception() was called. """ with self._condition: if self._state == CANCELLED: self._state = CANCELLED_AND_NOTIFIED for waiter in self._waiters: waiter.add_cancelled(self) # self._condition.notify_all() is not necessary because # self.cancel() triggers a notification. return False elif self._state == PENDING: self._state = RUNNING return True else: LOGGER.critical('Future %s in unexpected state: %s', id(self), self._state) raise RuntimeError('Future in unexpected state') def set_result(self, result): """Sets the return value of work associated with the future. Should only be used by Executor implementations and unit tests. """ with self._condition: self._result = result self._state = FINISHED for waiter in self._waiters: waiter.add_result(self) self._condition.notify_all() self._invoke_callbacks() def set_exception(self, exception): """Sets the result of the future as being the given exception. Should only be used by Executor implementations and unit tests. """ with self._condition: self._exception = exception self._state = FINISHED for waiter in self._waiters: waiter.add_exception(self) self._condition.notify_all() self._invoke_callbacks()