并行For循环:轻松优化让Python性能翻倍!

2023年 8月 18日 52.3k 0

大家有没有这样的感觉,Python在处理庞大数据集时速度总是那么感人。一旦代码开始运行,我们就只能带着愧疚的心情消磨时间,刷着手机。

并行For循环:轻松优化让Python性能翻倍!

MPI(Message Passing Interface)是并行计算领域中,用于在不同进程间传递信息的标准解决方案。而mpi4py则是其Python版本。

网上有丰富的教程讲解如何通过mpi4py实现同步运行相对独立的Python代码,这在服务器上运行代码时尤为实用。

在正式开始之前,我们需要先了解两个基本概念:

node,通常翻译为服务器节点。在这里,我们可以将一个节点理解为一台个人电脑。每个node(每台电脑)可以拥有多个core(核)。例如,你可能听说过一个程序在12个nodes上运行,每个nodes执行128个任务。这说明该程序在12*128个cores上同步运行。

例如,在以下简单示例中,共有12个并行任务。我们让它在2个nodes上运行,因此每nodes需要执行6个任务。同时,我们指定每个node只调用4个cores(因为所有cores平分内存,如果一次性调用所有cores,每个core可用的内存可能不足以满足单个任务需求)。

这样,6个任务分配到4个核心,有些core需要运行两次,例如[2,2,1,1],可参考下图。并行For循环:轻松优化让Python性能翻倍!

要让Python代码通过mpi4py实现并行,实际上需要修改的部分并不多。

基本逻辑是从系统中获取所有node和core的索引index,从而获得可以进行同步计算的所有“通道”的index。

然后根据“通道”的总数量,将需要运行的任务分成多个组。

最后将不同的组分配到不同的“通道”上单独运行即可。

1. 修改python代码以支持mpi4py

假设想要并行运算的Python代码名为python_mpi4py.py,这个代码是可以在一个节点上独立执行的。

下面的解释仅为帮助理解(可选择跳过),实际上代码读懂了改的地方不多。

  • • 参数num用于指定主体代码运行在哪个节点上。实际上,它仅用于输出信息。
  • • 参数t1t2用于指定在所有任务中,当前节点(节点索引为num)运行第t1t2步。我们总共有12个任务(代码中的periods=12),并指定两个节点运行这12个任务,因此当前节点只负责运行所有任务中的一部分(第t1t2步)。由于调用了2个节点,python_mpi4py.py将运行两次,每次接收不同的t1t2,两次加起来就完成了所有任务。
  • • ranksizempi4py中的重要概念。现在我们将注意力放在单个节点上,这里的rank可以看作是该节点中所有核心的索引。例如,我们指定调用4个核心,那么rank的值就是一个列表rank=[0,1,2,3]size(代码中写作npro)是获取的核心总数,这里size=4。这里的解释肯定过于简化,但大致如此。
  • • 接下来是前面提到的分组。尽管该节点已经得到了一个sub-group(只包含steps_global[t1:t2]),但这个sub-group仍需要进一步分配给不同的cores(代码中的list_all_pros)。
  • • 然后,各个core将同时进行。但是,在每个core上有不止一个任务([2,2,1,1]),因此需要进行唯一的循环。
#%%
import sys
import numpy as np
import mpi4py
import time as pytime
import pandas as pd

# get the number of the node, and the range of the steps [t1:t2] that runs on this node 
num = int(sys.argv[1])
t1 = int(sys.argv[2])
t2 = int(sys.argv[3])

# example of all the steps that need to be run on all the nodes
time = pd.date_range('2020-01-01', periods=12, freq='H')

# the steps that need to be run on this node
steps_global = np.arange(time.size)
steps = steps_global[t1:t2] # sub-group for this node

# === mpi4py ===
try:
  from mpi4py import MPI
  comm = MPI.COMM_WORLD
  rank = comm.Get_rank()
  npro = comm.Get_size()
except:
  print('::: Warning: Proceeding without mpi4py! :::')
  rank = 0
  npro = 1


list_all_pros = [0]*npro # sub-sub-groups for all the cores
for nn in range(npro):
  list_all_pros[nn] = steps[nn::npro]
steps = list_all_pros[rank]

pytime.sleep(0.1*rank) # to make sure the print statements are in order

# use mpi4py here
for kk, step in enumerate(steps):
  print(f'node: {num}: kk = {kk+1}/{steps.size}, step = {step}')
  print(f'{time[step]}')

上面的代码,我们把原本要进行的12步循环,最后压缩到了最大2步循环。当然,这个想象空间还是很大的。

2. 在单个node上运行python代码

要运行上面的包含mpi4py的代码,最简单的可以一句bash命令就可以:

mpirun -np 4 python -u python_mpi4py.py $1 $2 $3

上面命令-np 4指定4个核同时运行。然后$1指定node的index,$2$3分别指定在这个node上面运行的步骤的index。

当然服务器上,一般要先allocate资源,然后写一个脚本(命名为submit_python_mpi4py.sh)提交后台运行代码:

#!/bin/bash
#SBATCH --job-name=parallel
#SBATCH --time=00:01:00
#SBATCH --partition=compute
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --account=*****

mpirun -np 4 python -u python_mpi4py.py $1 $2 $3

3. 在多个node上运行python代码

为了便于理解,我们可以通过一个Python代码多次提交上述的bash代码,从而申请多个node。这样做可以更直接地控制哪些任务运行在哪个节点上。例如,让不同的模型在不同的节点上运行。

我们将这个Python文件命名为master_submitter.py

#!/usr/bin/env python
#%%
import os
import numpy as np

#%%
nsteps = 12
npar   = 6
njobs  = int(nsteps/npar) # 2 nodes


#%%
for kk in range(njobs): #0,1 node-index
  k1 = kk*npar #0,6 the starting task-index for node1 and node2
  k2 = (kk+1)*npar #6,12 the ending task-index for node1 and node2
  print("-----node line -----")
  os.system(f"sbatch ./submit_python_mpi4py.sh {kk+1} {k1} {k2}") # 
# %%

上面的例子简要展示了如何利用mpi4py在多个node和多个core上进行并行计算的方法。

在上面的例子中,各个任务之间是完全没有依赖关系的。然而,在for循环结束后,通常我们需要执行类似concat操作,将各个核心运行的结果汇总起来。mpi4py还支持在不同任务之间传输数据。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论