Python中使用concurrent类

纸上得来终觉浅,绝知此事要躬行。

Python中使用concurrent类

在多线程或多进程编程中,不可避免的需要使用start、join等方法,复杂的话还需要使用一到两个队列才能完成要求。如果没有一个良好的设计,随着代码量越来越多,会变得越来越复杂。而没有没有什么东西,可以将上述这些步骤抽象一下,让我们不关注这些细节轻装上阵呢?

1. 原理介绍

核心原理:concurrent.futures会以子进程multiprocessing的形式,平行的运行多个Python解释器,从而令Python程序可以利用多核CPU来提升执行速度。由于子进程与主进程的Python解释器是相对分离,且它们的全局解释器锁也是相互独立的,所以每个子进程都能够完整的使用一个CPU内核,实现真正的平行计算。

使用说明

  • 从Python3.2开始,这个concurrent.futures模块已经被划到标准库,所以不需要手动安装。而在Python2中,则需要自行安装和引入第三方库futures才能够使用。
# Python2需要安装
$ pip install futures

原理解释

  • 在它的源码注释的内容中,干货很多,表达也很清晰。需要多多理解这个数据流图,对于理解该模块的原理是非常重要。我们需要注意一下这里面的future的用途和作用。
  • 在传统的并发编程中,调用函数是同步的,也就是只能等待请求返回之后才能够处理其他的工作。而在future的这种模式下,调用方式改为了异步,而原先等待返回的时间段,在主调动函数里面就可以拥有处理其他事物的能力了。
  • 在concurrent.futures模块中,最为重要的就是Executor和Future这两个核心类,Executor接收一个包含带有回调及参数的异步的任务请求,返回一个Future去执行该请求任务。
  • 这个模块主要包含两个核心类,分别是多线程的ThreadPoolExecutor和多进程的ProcessPoolExecutor。它们就是对threading和multiprocessing进行了高级别的抽象,暴露出统一的接口,方便开发者使用。

我们结合源码和下面的数据流分析一下

图示说明

  • [步骤 1]:executor.map或executor.submit会创建多个_WorkItem对象和对应的任务编号Work Ids,每个对象都传入了新创建的一个Future对象。
  • [步骤 2]:然后把每个_WorkItem对象放进一个叫做「Work Items」的dict中,键是不同的「Work Ids」。
  • [步骤 3]:创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做两件事情。
    • [事情 1]:从「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。
    • [事情 2]:从「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。
  • [总结]:看起来就是一个「生产者/消费者」模型,不过要注意,整个过程并不是多个进程与任务+结果-两个队列直接通信的,而是通过一个中间的「Local worker thread」完成的。
  • [总结]:设想一下,当某一段程序提交了一个请求,期望得到一个答复。但服务程序对这个请求可能很慢,在传统的单线程环境下,调用函数是同步的,也就是说它必须等到服务程序返回结果后,才能进行其他处理。而在Future模式下,调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可用于处理其他事物。