Python多进程编程
纸上得来终觉浅,绝知此事要躬行。
1. 多进程编程
由于全局解释锁(GIL)的问题,多线程并不能充分利用多核处理器,如果是一个CPU计算型的任务,应该使用多进程(multiprocessing)模块。虽然两者的工作方式并不相同,但是接口却非常相似。使用多进程模块,给每个进程赋予了单独的Python解释器,这样就规避了全局解释锁带来的问题。
1.1 多进程的使用方式
我们这里只介绍一下基本的使用方式。
- 目标函数不需要传递参数
from multiprocessing import Process def worker(): print('Worker') if __name__ == '__main__': jobs = [] for i in range(5): p = Process(target=worker) jobs.append(p) p.start()
$ python multiprocessing.py Worker Worker Worker Worker Worker
- 目标函数可传入参数
from multiprocessing import Process def worker(num): print(f'Worker: {num}') if __name__ == '__main__': jobs = [] for i in range(5): p = Process(target=worker, args=(i,)) jobs.append(p) p.start()
$ python multiprocessing.py Worker: 0 Worker: 1 Worker: 2 Worker: 3 Worker: 4
1.2 守护和非守护线程
类似于多线程一样也是可以设置守护进程的,这个守护进程也是可以一直运行且不影响主程序的结束。如果主程序结束了,会随着主程序一起结束。
- 可以看到如下的守护进程,主程序结束了还没有运行完成,随着主程序一起结束了。
from time import sleep from multiprocessing import Process, current_process def daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') sleep(2) print('Exiting :', p.name, p.pid) def non_daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') print('Exiting :', p.name, p.pid) if __name__ == '__main__': d =Process(name='daemon', target=daemon, daemon=True) n =Process(name='non-daemon', target=non_daemon) d.start() sleep(1) n.start()
$ python multiprocessing.py Starting: daemon 41295 Starting: non-daemon 41296 Exiting : non-daemon 41296
- 当然也是可以设置join方法设置超时参数的,让主程序等待守护进程执行完毕再结束程序。
from time import sleep from multiprocessing import Process, current_process def daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') sleep(2) print('Exiting :', p.name, p.pid) def non_daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') print('Exiting :', p.name, p.pid) if __name__ == '__main__': d = Process(name='daemon', target=daemon, daemon=True) n = Process(name='non-daemon', target=non_daemon) d.start() sleep(1) n.start() d.join() n.join()
$ python multiprocessing.py Starting: daemon 39312 Starting: non-daemon 39318 Exiting : non-daemon 39318 Exiting : daemon 39312
- join方法不带参数的情况下,默认为None,表示一直会阻塞下去。
from time import sleep from multiprocessing import Process, current_process def daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') sleep(2) print('Exiting :', p.name, p.pid) def non_daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') print('Exiting :', p.name, p.pid) if __name__ == '__main__': d = Process(name='daemon', target=daemon, daemon=True) n = Process(name='non-daemon', target=non_daemon) d.start() sleep(1) n.start() d.join(1) # 告诉我们进程是否当前还是存活的 print('d.is_alive()', d.is_alive()) n.join()
$ python multiprocessing.py Starting: daemon 41297 Starting: non-daemon 41298 Exiting : non-daemon 41298 d.is_alive() True
2. 同步机制
multiprocessing的Lock、Condition、Event、RLock、Semaphore等同步原语和threading模块的API风格是一样的,用法也类似,就不展开了。
>>> dir(multiprocessing) ['Array', 'Value',, 'context', 'cpu_count', 'current_process' 'Condition', 'Event', 'Lock', 'RLock', 'Semaphore', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Manager', 'managers', 'Pipe', 'Process', 'Pool']
2.1 信号量 - Semaphore
- 信号量同步基于内部的计数器,每调用一次acquire计数器就会减1,表示获取了一个锁。每调用一次release计数器就会加1,表示释放了这个锁。当计数器为0的时候,acquire的调用就会被阻塞。
import time import random import multiprocessing sema = multiprocessing.Semaphore(3) def limit_run(seam, num): t = multiprocessing.current_process() with seam: print(f'[{t.pid}]: {num} is acquire sema.') sleep_time = random.random() * 2 time.sleep(sleep_time) print(f'[{t.pid}]: {num} is release sema.') process = [] for num in range(1, 6): p = multiprocessing.Process(name="limit_run", target=limit_run, args=(sema, num)) process.append(p) p.start() for p in process: p.join()
$ python multiprocessing.py [82222]: 1 is acquire sema. [82223]: 2 is acquire sema. [82224]: 3 is acquire sema. [82223]: 2 is release sema. [82225]: 4 is acquire sema. [82224]: 3 is release sema. [82226]: 5 is acquire sema. [82222]: 1 is release sema. [82225]: 4 is release sema. [82226]: 5 is release sema.
2.2 锁 - Lock
- 互斥锁,即相对于信号量值为1的Semaphore,表示同一时刻只能有一个线程来访问这个资源。但是使用了锁会损失一定的性能,因为需要其他线程等待锁的释放。
不加锁的示例
import time import multiprocessing as mp from multiprocessing.sharedctypes import Value value = Value('i', 0) def getlock(): global value new = value.value + 1 time.sleep(0.001) value.value = new process = [] for i in range(100): p = mp.Process(name="getlock", target=getlock) p.start() process.append(p) for p in process: p.join() print(value.value)
$ python multiprocessing.py 91
加了锁的示例
import time import multiprocessing as mp from multiprocessing.sharedctypes import Value lock = mp.Lock() value = Value('i', 0) def getlock(): global value with lock: new = value.value + 1 time.sleep(0.001) value.value = new process = [] for i in range(100): p = mp.Process(name="getlock", target=getlock) p.start() process.append(p) for p in process: p.join() print(value.value)
$ python multiprocessing.py 100
2.3 可重入锁 - RLock
- 可重入锁就是acquire方法能够不被阻塞的被一个线程重复执行多次,但是需要注意的是release需要调用和acquire相同的次数才能够释放锁。
import multiprocessing lock = multiprocessing.RLock() print('First try: ', lock.acquire()) print('Second try: ', lock.acquire(0))
$ python multiprocessing.py First try: True Second try: True
2.4 条件 - Condition
- 接收条件,一个线程等待某种特定的条件,而另一个线程会发出满足这个特定条件的信号。这个同步机制最好的示例说明就是「生产者-消费者」模型。
import time import multiprocessing as mp cond = mp.Condition() def cusumer(cond): t = mp.current_process() with cond: cond.wait() print('{t.name}: consumer is start...'.format(t=t)) def product(cond): t = mp.current_process() with cond: print('{t.name}: producer is start...'.format(t=t)) cond.notify_all() process = [] for num in range(2): c = mp.Process(name='cusumer', target=cusumer, args=(cond,)) c.start() process.append(c) time.sleep(2) for num in range(1): p = mp.Process(name='product', target=product, args=(cond,)) p.start() process.append(p) for t in process: t.join()
$ python multiprocessing.py product: producer is start... cusumer: consumer is start... cusumer: consumer is start...
2.5 事件 - Event
- 事件模型,一个线程等待某种特定的条件,而另一个线程会发出满足这个特定条件的信号,最好的示例说明也是「生产者-消费者」模型。事件和条件是不同,在Condition条件中一个条件发出之后,所有接受这个条件的子线程都会处理,但是在Event事件中则是谁接收到谁来处理。
import time import random import multiprocessing as mp event = mp.Event() manager = mp.Manager() def consumer(event, q): t = mp.current_process() while True: event_is_set = event.wait(10) if event_is_set: try: integer = q.pop() print(f'{integer} popped from list by {t.name}') event.clear() except IndexError: pass def producer(event, q): t = mp.current_process() while True: integer = random.randint(10, 100) q.append(integer) print(f'{integer} appended to list by {t.name}') event.set() time.sleep(1) threads = [] q = manager.list() for name in ('consumer1', 'consumer2'): c = mp.Process(name=name, target=consumer, args=(event, q)) print(f'{name} is starting...') c.start() threads.append(c) for name in ('producer',): p = mp.Process(name=name, target=producer, args=(event, q)) print(f'{name} is starting...') p.start() threads.append(p) for t in threads: t.join()
$ python multiprocessing.py consumer1 is starting... consumer2 is starting... producer is starting... 70 appended to list by producer 70 popped from list by consumer2 59 appended to list by producer 59 popped from list by consumer1 40 appended to list by producer 40 popped from list by consumer2 88 appended to list by producer 88 popped from list by consumer2 ......
3. 进程间共享状态
多进程模块中提供了进程间共享状态的方案,有三种方案,分别是Queue、Array和Manager。而共享的意思就是可以在多个进程之前共享数据,如我在一个进程中修改了对应的值,在另一个进程中立马就可以看到修改之后的结果了。在使用多进程的过程中,最好不要使用共享资源,因为普通的全局变量是不能被子进程所共享的,只有通过Multiprocessing组件构造的数据结构可以被共享。
共享方式 | 对应的函数 | 适用范围 |
---|---|---|
Queue | 使用 Multiprocessing.Queue 类 | 只适用于 Process 类 |
sharedctypes | 使用 Multiprocessing.sharedctypes 类 | 只适用于 Process 类 |
Manager | 使用 Multiprocessing.Manager 类 | 可以适用于 Pool 类 |
3.1 内存共享 - sharedctypes
内存共享主要是靠多进程模块中sharedctypes的Value和Array实现的。
- 常见的共享类型
- 可以使用缩写也可以使用全称
In [1]: from multiprocessing.sharedctypes import typecode_to_type In [2]: typecode_to_type Out[2]: {'B': ctypes.c_ubyte, 'H': ctypes.c_ushort, 'I': ctypes.c_uint, 'L': ctypes.c_ulong, 'b': ctypes.c_byte, 'c': ctypes.c_char, 'd': ctypes.c_double, 'f': ctypes.c_float, 'h': ctypes.c_short, 'i': ctypes.c_int, 'l': ctypes.c_long, 'u': ctypes.c_wchar}
- 共享内存实例说明
- 可以看到我们通过modify这个函数对传入的值进行了修改,并且还可以给Value和Array传递lock参数来决定是否带锁,默认为不带锁。这样就实现了进程之间共享状态了。
- 需要注意的是并不是能够用于typecode_to_type列出来的这些类型,只要是ctypes里面定义的都是可以使用。
from multiprocessing import Process, Lock from multiprocessing.sharedctypes import Value, Array from ctypes import Structure, c_bool, c_double lock = Lock() class Point(Structure): _fields_ = [('x', c_double), ('y', c_double)] def modify(n, b, s, arr, A): n.value **= 2 b.value = True s.value = s.value.upper() arr[0] = 10 for a in A: a.x **= 2 a.y **= 2 n = Value('i', 7) b = Value(c_bool, False, lock=False) s = Array('c', b'hello world', lock=lock) arr = Array('i', range(5), lock=True) A = Array(Point, [(1.875, -6.25), (-5.75, 2.0)], lock=lock) p = Process(target=modify, args=(n, b, s, arr, A)) p.start() p.join() print n.value print b.value print s.value print arr[:] print [(a.x, a.y) for a in A]
$ python multiprocessing.py 49 Trueb 'HELLO WORLD' [10, 1, 2, 3, 4] [(3.515625, 39.0625), (33.0625, 4.0)]
3.2 服务器进程 - Manager
一个multiprocessing的Manager对象会控制了服务器进程,其他进程可以通过代理的方式来访问这个服务的进程。
- [1] 常见的共享方式
共享方式 | |
---|---|
Namespace | 创建一个可分享的命名空间 |
Value/Array | 和上面共享 ctypes 对象的方式一样 |
dict/list | 创建一个可分享的 dict/list 并支持对应方法 |
Condition/Event/Lock/Queue/Semaphore | 创建一个可分享的对应同步原语的对象 |
>>> dir(multiprocessing.Manager()) ['Namespace', 'Array', 'Value', 'dict', 'list', 'Condition', 'Event', 'Lock', 'RLock', 'Semaphore', 'Queue', 'JoinableQueue', 'Pool', 'address', 'connect', 'get_server', 'register', 'start', 'join', 'shutdown']
- 共享方式实例说明
- 创建了一个叫做ns的可分享的命名空间,里面有一个元素是a
- 创建了一个叫做lproxy的可分享的列表,里面有一个元素是a
- 创建了一个叫做dproxy的可分享的列表,里面有一个元素是b
- 其中,p.pid是可以拿到这个进程的对应系统进程的名称的,之后再通过modify这个函数对原有的值进行改变,输出结果。
from multiprocessing import Manager, Process def modify(ns, lproxy, dproxy): ns.a **= 2 lproxy.extend(['b', 'c']) dproxy['b'] = 0 manager = Manager() ns = manager.Namespace() ns.a = 1 lproxy = manager.list() lproxy.append('a') dproxy = manager.dict() dproxy['b'] = 2 p = Process(target=modify, args=(ns, lproxy, dproxy))p.start() print(f'PID: {p.pid}') p.join() print(ns.a) print(lproxy) print(dproxy)
$ python multiprocessing.py PID: 45121 1 ['a', 'b', 'c'] {'b': 0}
- [2] 分布式的进程间通信
- 即我们这里使用C/S架构,服务端有一个分享的列表和一个get_list的一个方法来获取分享的列表的值。之后将服务跑起来,客户端就可以使用服务端的get_list来查看和修改分享的列表的值了。
- 注意在客户端注册的时候,并没有使用这个callable的参数,请多多留意一下哈。
- 可以通过多进程模块的这个功能做一个分布式的作业调度系统或者服务之间的心跳监测等等,要多多发掘其中的潜能。
# 服务端代码 from multiprocessing.managers import BaseManager host = '127.0.0.1' port = 9030 authkey = b'secret' shared_list = [] class RemoteManager(BaseManager): pass RemoteManager.register('get_list', callable=lambda: shared_list) mgr = RemoteManager(address=(host, port), authkey=authkey) server = mgr.get_server() server.serve_forever()
$ python remote_server.py ...
# 客户端代码 from multiprocessing.managers import BaseManager host = '127.0.0.1' port = 9030 authkey = b'secret' class RemoteManager(BaseManager): pass RemoteManager.register('get_list') mgr = RemoteManager(address=(host, port), authkey=authkey) mgr.connect() l = mgr.get_list() print(l) l.append(1) print(mgr.get_list())
$ python local-client.py [] [1]
4. 进程池
任务的执行周期决定了CPU核数和任务的分配算法,使用多进程(Pool)是非常灵活且保障效率的方案。
4.1 队列 - Queue
多线程中有Queue模块来实现队列,而多进程中也包含了Queue这个类,它是
- 使用两个队列,一个用于存储完成的任务,另一个存储任务完成之后的结果。其中,JoinableQueue里面有join和task_done方法,而Queue里面什么它们的。
- 在存储完成的任务队列中使用JoinableQueue是以为提供了task_done方法能够标识该任务已经完成,通知其不要一直join阻塞着。
# 这个示例很重要,它更够帮助我们理解队列 import time from random import random from multiprocessing import Process, JoinableQueue, Queue tasks_queue = JoinableQueue() results_queue = Queue() def double(n): return n * 2 def producer(in_queue): while 1: wt = random() time.sleep(wt) in_queue.put((double, wt)) if wt > 0.9: in_queue.put(None) print('stop producer') break def consumer(in_queue, out_queue): while 1: task = in_queue.get() if task is None: break func, arg = task result = func(arg) in_queue.task_done() out_queue.put(result) processes = [] p = Process(target=producer, args=(tasks_queue,)) p.start() processes.append(p) p = Process(target=consumer, args=(tasks_queue, results_queue)) p.start() processes.append(p) tasks_queue.join() for p in processes: p.join() while 1: if results_queue.empty(): break result = results_queue.get() print(f'Result: {result}')
$ python multiprocessing.py stop producer Result: 1.5603713848691385 Result: 0.9995048352324905 Result: 0.5281936405729699 Result: 1.9964631043908454
4.2 进程池 - Pool
多进程模块中已经为我们封装好了进程池,方便我们进行多进程编程。
- 装饰器 @lru_cache 非常适合把耗时的函数执行结果保存起来,避免传入相同参数时重复计算。
- 这里 map 方法和平时我们使用的基本一样,只不过是这里提供了多进程的支持,提高执行效率。
from functools import lru_cache from multiprocessing import Pool @lru_cache(maxsize=None) def fib(n): if n < 2: return n return fib(n-1) + fib(n-2) pool = Pool(2) pool.map(fib, [35] * 2)
4.3 dummy
在开源项目代码中,很多人都使用了 dummy 这个模块。虽然定义在多进程模块里面,但是以相同 API 实现的多线程模块。它提供一种兼容的方式,这样在多线程/多进程之间切换非常方便。
- 使用技巧
- 有时候,我们拿不准这个项目到底是使用多进程(CPU)还是多线程(I/O)来处理任务,且没有其它不能选择多进程方式的因素,都统一直接上多进程模式。
- 处理这种情况还有一个简单暴力的方法,就是使用 dummy 模块实现兼容模式,看测试结果到底哪个更好就采用哪个。
进程类型 | 使用方式 | 注意事项 |
---|---|---|
多进程池 | from multiprocessing import Pool | 绑定一个 cpu 核心 |
多线程进程池 | from multiprocessing.dummy import Pool | 运行于多个 cpu 核心 |
from multiprocessing import Pool from multiprocessing.dummy import Pool
5. 参考文档
介绍一下常用的多进程模块的使用和对应的属性和方法。
5.1 Process
- 构造方法
- Process(group, target, name, args, kwargs, daemon)
- group: 线程组,目前还没有实现
- target: 要执行的方法
- name: 指定进程名称
- args/kwargs: 要传入方法的参数
- daemon: 是否为守护进程
- 属性方法
- start(): 启动 Process 进程,每个进程对象最多必须调用一次
- run(): 启动 Process 进程,可以在子类中覆盖此方法
- join(): 阻塞当前上下文环境,直到调用此方法的进程终止或到达指定的超时时间
- is_alive(): 返回进程是否在运行
- terminate(): 不管任务是否完成,立即停止工作进程
- 实例方法
- name: 进程名字
- pid: 进程号
- daemon: 和线程的 setDeamon 功能一样
- exitcode: 进程在运行时为 None、如果为–N,表示被信号 N 结束
# 继承Process类且重构run函数 import time from multiprocessing import Process class MyProcess(Process): def __init__(self, arg): super(MyProcess, self).__init__() self.arg = arg def run(self): print 'nMask', self.arg time.sleep(1) process = [] for i in range(10): p = MyProcess(i) process.append(p) p.start() for p in process: p.join()
5.2 Pool
- 构造方法
- Pool(processes, initializer, initargs, maxtasksperchild, context)
- processes: 工作进程的数量,不指定则使用 os.cpu_count()返回的数量
- maxtasksperchild: 子进程的最大任务数,用于新老交替不断更新
- context: 可用于指定用于启动工作进程的上下文
- 实例方法
- map(): 阻塞;内置函数 map 的并行处理
- map_async(): 非阻塞;可指定 callback 函数
- apply: 阻塞;使用参数 args 和关键字参数 kwds 调用 func 方法
- apply_async: 非阻塞;可指定 callback 函数
- join(): 阻塞主进程等待子进程的退出,在 close 或 terminate 之后使用
- close(): 关闭 pool 使其不在接受新的任务
- terminate(): 关闭 pool 结束工作进程不在处理未完成的任务
# 同步进程池 => 阻塞 # 会一个接着一个的执行,输出结果为 num: 2 sleep: 2 import time from multiprocessing import Pool def worker(num): print(f'num: {num}', end=' ') time.sleep(num) print(f'sleep: {num}') pool = Pool(processes=10) for num in range(100): pool.apply(worker, args=(num,)) pool.join() pool.close()
# 异步进程池 => 非阻塞 # 不等待,直接并发执行100子进程 from multiprocessing import Pool def worker(num): print(f'num: {num}') pool = Pool(processes=10) for num in range(100): pool.apply_async(worker, args=(num,)) pool.close() pool.join()
5.3 Queue
- 多进程编程中常常需进程间进行消息传递,在不使用同步原语的情况下,可以使用管道和队列。
- 队列Queue实现queue.Queue的所有方法,除了task_done()和join()方法。
- 队列SimpleQueue是一种简化的Queue类型,非常接近锁定的Pipe。
- 队列JoinableQueue是另外具有task_done()和join()方法的队列。
消息传递 | 作用 | 多进程模型 |
---|---|---|
管道 | 用于两个进程之间的连接 | Pipe |
队列 | 允许多个生产者和消费者 | Queue(先进先出)、JoinableQueue(先进后出) |
[1] Queue
- 构造方法
- multiprocessing.Queue(maxsize)
- maxsize: 设置队列数量
- 实例方法
- qsize(): 返回队列大致大小;不可靠
- empty(): 如果队列为空,返回True值;不可靠
- full(): 如果队列已满,返回True值;不可靠
- put(): 将 obj 放入队列
- put_nowait(): 相当于 put(obj, False)
- get(): 从队列中删除并返回项目
- get_nowait(): 相当于 get(False)
- close(): 指示当前进程不会在此队列上放置更多数据
[2] SimpleQueue
- 构造方法
- multiprocessing.SimpleQueue
- 没有任何参数可以使用
- 实例方法
- put(): 将item放入队列
- get(): 从队列中删除并返回项目
- empty(): 如果队列为空,返回True值
[3] JoinableQueue
- 构造方法
- multiprocessing.JoinableQueue(maxsize)
- maxsize: 设置队列数量
- 实例方法
- join(): 阻塞,直到队列中的所有项目都被获取和处理
- task_done(): 指示以前入队的任务已完成
5.4 常用函数
- [1] cpu_count
- 构造方法: multiprocessing.cpu_count()
- 函数含义: 返回系统中的 CPU 数
- [2] current_process
- 构造方法: multiprocessing.current_process()
- 函数含义: 返回与当前进程对应的 Process 对象
- [3] get_start_method
- 构造方法: multiprocessing.get_start_method()
- 函数含义: 获取当前系统的启动进程的方法名称
- [4] set_start_method
- 构造方法: multiprocessing.set_start_method()
- 函数含义: 设置当前系统的启动进程的方法名称
- [5] get_all_start_methods
- 构造方法: multiprocessing.get_all_start_methods()
- 函数含义: 返回可用的启动进程的方法名称
- [6] set_executable
- 构造方法: multiprocessing.set_executable()
- 函数含义: 设置解释器在启动子进程时要使用的路径