In previous posts we have introduced the multiprocessing
module which makes it possible to parallelize Python programs on shared memory systems. The limitation of the multiprocessing
module is that it does not support parallelization over multiple compute nodes (i.e. on distributed memory systems). To overcome this limitation and enable cross-node parallelization, we can use MPI for Python, that is, the mpi4py
module. This module provides an object-oriented interface that resembles the message passing interface (MPI), and hence allows Python programs to exploit multiple processors on multiple compute nodes. The mpi4py
module supports both point-to-point and collective communications for Python objects as well as buffer-like objects. This post will briefly introduce the use of the mpi4py
module in communicating generic Python objects, via all-lowercase methods including send
, recv
, isend
, irecv
, bcast
, scatter
, gather
, and reduce
.
Basics of mpi4py
The mpi4py
module has been installed on Beskow. To use mpi4py
on Beskow, we need to load the module first
# on Beskow
module load mpi4py/3.0.2/py37
The mpi4py/3.0.2/py37
module will automatically load the anaconda/2019.03/py37
module, which is an open-source distribution of Python for scientific computing (see Anaconda). We recommend Python 3 because Python 2 is going to be retired soon. After loading the mpi4py
module, we can check that the python3
command (which is needed to run mpi4py
code) is available in the PATH
environment variable, using the which
command
$ which python3
/pdc/vol/anaconda/2019.03/py37/bin/python3
Now we can write our first Python program with mpi4py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print('Hello from process {} out of {}'.format(rank, size))
To understand the code we need to know several concepts in MPI. In parallel programming with MPI, we need the so-called communicator, which is a group of processes that can talk to each other. To identify the processes with that group, each process is assigned a rank that is unique within the communicator. It also makes sense to know the total number of processes, which is often referred to as the size of the communicator.
In the above code, we defined the variable comm
as the default communicator MPI.COMM_WORLD
. You may recognize that MPI.COMM_WORLD
in mpi4py
corresponds to MPI_COMM_WORLD
in MPI programs written in C. The rank of each process is retrieved via the Get_rank
method of the communicator, and the size of the communicator is obtained by the Get_size
method. Now that the rank and size are known, we can print a “Hello” message from each process.
The usual way to execute an mpi4py
code in parallel is to use mpirun
and python3
, for example “mpirun -n 4 python3 hello.py
” will run the code on 4 processes, assuming that the code is saved in a file named “hello.py”. On Beskow, however, the setup is different since the resources (compute nodes) are managed by the SLURM workload manager. We’ll need to first request one or more compute nodes using salloc
, and then execute “python3 hello.py
” via srun
. For example, the output from running the code on 4 processes is as follows:
$ salloc --nodes 1 -t 00:10:00 -A <your-project-account>
salloc: Granted job allocation <your-job-id>
$ srun -n 4 python3 hello.py
Hello from process 0 out of 4
Hello from process 1 out of 4
Hello from process 2 out of 4
Hello from process 3 out of 4
Point-to-point communication
For point-to-point communication between Python objects, mpi4py
provides the send
and recv
methods that are similar to those in MPI. An example of code for passing (which is usually referred to as “communicating”) a Python dictionary object between the master process (which has rank = 0) and the worker processes (that all have rank > 0) is given below.
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# master process
if rank == 0:
data = {'x': 1, 'y': 2.0}
# master process sends data to worker processes by
# going through the ranks of all worker processes
for i in range(1, size):
comm.send(data, dest=i, tag=i)
print('Process {} sent data:'.format(rank), data)
# worker processes
else:
# each worker process receives data from master process
data = comm.recv(source=0, tag=rank)
print('Process {} received data:'.format(rank), data)
In this example (which we will name “send-recv.py”), the dictionary object data
is sent from the master process to each worker process. We check the value of rank
to determine if a process is the master process. The for
loop is only executed for the master process, and i
(starting from 1) goes through the ranks of the worker processes. We can also use tag
in the send
and recv
methods to distinguish between the messages if there are multiple communications between two processes. The output from running the example on 4 processes is as follows:
$ srun -n 4 python3 send-recv.py
Process 0 sent data: {'x': 1, 'y': 2.0}
Process 0 sent data: {'x': 1, 'y': 2.0}
Process 0 sent data: {'x': 1, 'y': 2.0}
Process 1 received data: {'x': 1, 'y': 2.0}
Process 2 received data: {'x': 1, 'y': 2.0}
Process 3 received data: {'x': 1, 'y': 2.0}
Note, however, that the above example uses blocking communication methods, which means that the execution of code will not proceed until the communication is completed. This blocking behaviour is not always desirable in parallel programming; in some cases it is beneficial to use non-blocking communication methods. As shown in the code below, isend
and irecv
are non-blocking methods that immediately return Request
objects, and we can use the wait
method to manage the completion of the communication. If necessary, one can do some computations between comm.isend(...)
and req.wait()
to increase the efficiency by overlapping the communications and computations.
if rank == 0:
data = {'x': 1, 'y': 2.0}
for i in range(1, size):
req = comm.isend(data, dest=i, tag=i)
req.wait()
print('Process {} sent data:'.format(rank), data)
else:
req = comm.irecv(source=0, tag=rank)
data = req.wait()
print('Process {} received data:'.format(rank), data)
Collective communication
In parallel programming, it is often useful to perform what is known as collective communication, for example, broadcasting a Python object from the master process to all the worker processes. The example code below broadcasts a Numpy array using the bcast
method.
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
data = np.arange(4.0)
else:
data = None
data = comm.bcast(data, root=0)
if rank == 0:
print('Process {} broadcast data:'.format(rank), data)
else:
print('Process {} received data:'.format(rank), data)
The output from running the above code (broadcast.py) on 4 processes is as follows:
$ srun -n 4 python3 broadcast.py
Process 0 broadcast data: [0. 1. 2. 3.]
Process 1 received data: [0. 1. 2. 3.]
Process 2 received data: [0. 1. 2. 3.]
Process 3 received data: [0. 1. 2. 3.]
If one needs to divide a task and distribute the sub-tasks to the processes, the scatter
method will be useful. However note that it is not possible to distribute more elements than the number of processors. If one has a big list or array, it is necessary to make slices of the list or array before calling the scatter
method. The code below is an example of scattering a Numpy array, which is converted into a list of array slices before the scatter
method is called.
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
if rank == 0:
data = np.arange(15.0)
# determine the size of each sub-task
ave, res = divmod(data.size, nprocs)
counts = [ave + 1 if p < res else ave for p in range(nprocs)]
# determine the starting and ending indices of each sub-task
starts = [sum(counts[:p]) for p in range(nprocs)]
ends = [sum(counts[:p+1]) for p in range(nprocs)]
# converts data into a list of arrays
data = [data[starts[p]:ends[p]] for p in range(nprocs)]
else:
data = None
data = comm.scatter(data, root=0)
print('Process {} has data:'.format(rank), data)
The output from running the above code (scatter-array.py) on 4 processes is as follows.
$ srun -n 4 python3 scatter-array.py
Process 0 has data: [0. 1. 2. 3.]
Process 1 has data: [4. 5. 6. 7.]
Process 2 has data: [8. 9. 10. 11.]
Process 3 has data: [12. 13. 14.]
Since there are 15 elements in total, we can see that each of the first 3 processes received 4 elements, and that the last process received 3 elements.
The gather
method does the opposite to scatter
. If each process has an element, one can use gather
to collect them into a list of elements on the master process. The example code below uses scatter
and gather
to compute π in parallel.
from mpi4py import MPI
import time
import math
t0 = time.time()
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
# number of integration steps
nsteps = 10000000
# step size
dx = 1.0 / nsteps
if rank == 0:
# determine the size of each sub-task
ave, res = divmod(nsteps, nprocs)
counts = [ave + 1 if p < res else ave for p in range(nprocs)]
# determine the starting and ending indices of each sub-task
starts = [sum(counts[:p]) for p in range(nprocs)]
ends = [sum(counts[:p+1]) for p in range(nprocs)]
# save the starting and ending indices in data
data = [(starts[p], ends[p]) for p in range(nprocs)]
else:
data = None
data = comm.scatter(data, root=0)
# compute partial contribution to pi on each process
partial_pi = 0.0
for i in range(data[0], data[1]):
x = (i + 0.5) * dx
partial_pi += 4.0 / (1.0 + x * x)
partial_pi *= dx
partial_pi = comm.gather(partial_pi, root=0)
if rank == 0:
print('pi computed in {:.3f} sec'.format(time.time() - t0))
print('error is {}'.format(abs(sum(partial_pi) - math.pi)))
In addition to the gather
method which collects the elements into a list, one can also use the reduce
method to collect the results. The last five lines of the above example can be rewritten as follows, where “op=MPI.SUM
” requires that pi
is obtained as the sum of partial_pi
from all processes.
pi = comm.reduce(partial_pi, op=MPI.SUM, root=0)
if rank == 0:
print('pi computed in {:.3f} sec'.format(time.time() - t0))
print('error is {}'.format(abs(pi - math.pi)))
Summary
We have briefly introduced the mpi4py
module and the ways it enables communication of Python objects via all-lowercase methods.
Get_rank
andGet_size
provide the rank of the process and the size of the communicator (total number of processes), respectively.send
andrecv
are blocking point-to-point communication methods, whileisend
andirecv
methods are non-blocking methods.bcast
,scatter
,gather
andreduce
are collective communication methods.
You may read the MPI for Python tutorial page for more information about the mpi4py
module.