In part 1 of this post, we introduced the mpi4py
module (MPI for Python) which provides an object-oriented interface for Python resembling the message passing interface (MPI) and enables Python programs to exploit multiple processors on multiple compute nodes.
The mpi4py
module provides methods for communicating various types of Python objects in different ways. In part 1 of this post we showed you how to communicate generic Python objects between MPI processes – the methods for doing this have names that are all lowercase letters. Some of these methods were introduced in part 1 of this post. It is also possible to directly send buffer-like objects, where the data is exposed in a raw format and can be accessed without copying, between MPI processes. The methods for doing this start with an uppercase letter.
In this post we continue introducing the mpi4py
module, with a focus on the direct communication of buffer-like objects using the latter type of methods (that is, those starting with a capital letter), including Send
, Recv
, Isend
, Irecv
, Bcast
, and Reduce
, as well as Scatterv
and Gatherv
, which are vector variants of Scatter
and Gather
, respectively.
Buffer-like objects
The mpi4py
module provides methods for directly sending and receiving buffer-like objects. The advantage of working with buffer-like objects is that the communication is fast (close to the speed of MPI communication in C). However, there is a disadvantage in that the user needs to be more explicit when it comes to handling the allocation of memory space. For example, the memory of the receiving buffer needs to be allocated prior to the communication, and the size of the sending buffer should not exceed that of the receiving buffer. One should also be aware that mpi4py
expects the buffer-like objects to have contiguous memory. Fortunately, this is usually the case with Numpy arrays, which are probably the most commonly used buffer-like objects in scientific computing in Python.
In mpi4py
, a buffer-like object can be specified using a list or tuple with 2 or 3 elements (or 4 elements for the vector variants, which will be elaborated in later sections). For example, for a Numpy array that is named “data
” and that consists of double-precision numbers, we can use [data, n, MPI.DOUBLE]
, where n
is a positive integer, to refer to the buffer of the first n
elements. It is also possible to use [data, MPI.DOUBLE]
, or simply data
, to refer to the buffer of the whole array. In the following sections, we’ll demonstrate the communication of Numpy arrays using mpi4py
.
Point-to-point communication
In the previous post we have introduced point-to-point communication using all-lowercase methods (send
and recv
) in mpi4py
. The use of methods with a leading uppercase letter (Send
and Recv
) is quite similar, except that the receiving buffer needs to be initialized before the Recv
method is called. An example code for passing a Numpy array from the master node (which has rank = 0) to the worker processes (that all have rank > 0) is shown below.
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# master process
if rank == 0:
data = np.arange(4.)
# master process sends data to worker processes by
# going through the ranks of all worker processes
for i in range(1, size):
comm.Send(data, dest=i, tag=i)
print('Process {} sent data:'.format(rank), data)
# worker processes
else:
# initialize the receiving buffer
data = np.zeros(4)
# receive data from master process
comm.Recv(data, source=0, tag=rank)
print('Process {} received data:'.format(rank), data)
Note that the data
array was initialized on the worker processes before the Recv
method was called, and that the Recv
method takes data
as the first argument (in contrast to the recv
method which returns the data
object). The output from running the above example (send-recv.py) on 4 processes is as follows:
$ module load mpi4py/3.0.2/py37
$ salloc --nodes 1 -t 00:10:00 -A <your-project-account>
salloc: Granted job allocation <your-job-id>
$ srun -n 4 python3 send-recv.py
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 1 received data: [0. 1. 2. 3.]
Process 2 received data: [0. 1. 2. 3.]
Process 3 received data: [0. 1. 2. 3.]
When using the Send
and Recv
methods, one thing to keep in mind is that the sending buffer and the receiving buffer should match in size. An error will occur if the size of the sending buffer is greater than that of the receiving buffer. It is, however, possible to have a receiving buffer that is larger than the sending buffer. This is illustrated in the following example, where the sending buffer has 4 elements and the size of the receiving buffer is 6. The communication will complete without error, but only the first 4 elements of the receiving buffer will be overwritten (the last 2 elements will remain zero).
if rank == 0:
data = np.arange(4.)
for i in range(1, size):
comm.Send(data, dest=i, tag=i)
print('Process {} sent data:'.format(rank), data)
else:
# note: the size of the receiving buffer is larger than
# that of the sending buffer
data = np.zeros(6)
comm.Recv(data, source=0, tag=rank)
print('Process {} has data:'.format(rank), data)
The output is as follows. Note that the last two elements of the receiving buffers are zero.
$ srun -n 4 python3 send-recv.py
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 1 has data: [0. 1. 2. 3. 0. 0.]
Process 2 has data: [0. 1. 2. 3. 0. 0.]
Process 3 has data: [0. 1. 2. 3. 0. 0.]
In part 1 of this post we also discussed blocking and non-blocking methods for point-to-point communication. In mpi4py
, the non-blocking communication methods for buffer-like objects are Isend
and Irecv
. The use of non-blocking methods are shown in the example below.
if rank == 0:
data = np.arange(4.)
for i in range(1, size):
req = comm.Isend(data, dest=i, tag=i)
req.Wait()
print('Process {} sent data:'.format(rank), data)
else:
data = np.zeros(4)
req = comm.Irecv(data, source=0, tag=rank)
req.wait()
print('Process {} received data:'.format(rank), data)
Collective communication
As we mentioned in part 1 of this post, collective communication methods are very useful in parallel programming. In the example below we use the Bcast
method to broadcast a buffer-like object data
from the master process to all the worker processes. Note that data
needs to be initialized on the worker processes before Bcast
is called.
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = np.arange(4.0)
else:
data = np.zeros(4)
comm.Bcast(data, root=0)
print('Process {} has data:'.format(rank), data)
The output from running the above code (broadcast.py) on 4 processes is as follows:
$ srun -n 4 python3 broadcast.py
Process 0 has data: [0. 1. 2. 3.]
Process 1 has data: [0. 1. 2. 3.]
Process 2 has data: [0. 1. 2. 3.]
Process 3 has data: [0. 1. 2. 3.]
Another collective communication method is Scatter
, which sends slices of a large array to the worker processes. In part 1 of this post, we showed that the all-lowercase scatter
method is convenient when sending slices of Python objects. However, Scatter
is not as convenient in practice since it requires the size of the large array to be divisible by the number of processes. For instance, if we know beforehand that the array we are going to distribute has 16 elements, it will be straightforward to use Scatter
to distribute the array on 4 processes in such a way that each process gets 4 elements. The problem is that, in practice, the size of the array is not known beforehand and is therefore not guaranteed to be divisible by the number of available processes. It is more practical to use Scatterv
, the vector version of Scatter
, which offers a much more flexible way to distribute the array. The code below distributes 15 numbers over 4 processes. Note that we use “[sendbuf, count, displ, MPI.DOUBLE]
” to specify the buffer-like object, where count
contains the number of elements to be sent to each process and displ
contains the starting indices of the sub-tasks. (These indices are often known as displacements.)
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
if rank == 0:
sendbuf = np.arange(15.0)
# count: the size of each sub-task
ave, res = divmod(sendbuf.size, nprocs)
count = [ave + 1 if p < res else ave for p in range(nprocs)]
count = np.array(count)
# displacement: the starting index of each sub-task
displ = [sum(count[:p]) for p in range(nprocs)]
displ = np.array(displ)
else:
sendbuf = None
# initialize count on worker processes
count = np.zeros(nprocs, dtype=np.int)
displ = None
# broadcast count
comm.Bcast(count, root=0)
# initialize recvbuf on all processes
recvbuf = np.zeros(count[rank])
comm.Scatterv([sendbuf, count, displ, MPI.DOUBLE], recvbuf, root=0)
print('After Scatterv, process {} has data:'.format(rank), recvbuf)
The output from running the above code on 4 processes is as follows. Note that, as there are 15 elements in total, each of the first three processes received 4 elements, and the last process received 3 elements.
$ srun -n 4 python3 scatter-array.py
After Scatterv, process 0 has data: [0. 1. 2. 3.]
After Scatterv, process 1 has data: [4. 5. 6. 7.]
After Scatterv, process 2 has data: [8. 9. 10. 11.]
After Scatterv, process 3 has data: [12. 13. 14.]
Gatherv
is the reverse operation of Scatterv
. When using Gatherv
, one needs to specify the receiving buffer as “[recvbuf2, count, displ, MPI.DOUBLE]
”, as shown in the following code. The sendbuf2
arrays will be gathered into a large array recvbuf2
on the master process.
sendbuf2 = recvbuf
recvbuf2 = np.zeros(sum(count))
comm.Gatherv(sendbuf2, [recvbuf2, count, displ, MPI.DOUBLE], root=0)
if comm.Get_rank() == 0:
print('After Gatherv, process 0 has data:', recvbuf2)
The output from running the Gatherv
code on 4 processes is as follows.
$ srun -n 4 python3 scatter-gather.py
After Gatherv, process 0 has data: [ 0. 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14.]
Reduce
, on the other hand, can be used to compute the sum of (or to perform another operation on) the data collected from all processes. For example, after calling Scatterv
, we can compute the sum of the numbers in recvbuf
on each process, and then call Reduce
to add all of those partial contributions and store the result on the master process.
partial_sum = np.zeros(1)
partial_sum[0] = sum(recvbuf)
print('Partial sum on process {} is:'.format(rank), partial_sum[0])
total_sum = np.zeros(1)
comm.Reduce(partial_sum, total_sum, op=MPI.SUM, root=0)
if comm.Get_rank() == 0:
print('After Reduce, total sum on process 0 is:', total_sum[0])
The output from running the Reduce
code on 4 processes is as follows.
$ srun -n 4 python3 scatter-reduce.py
Partial sum on process 0 is: 6.0
Partial sum on process 1 is: 22.0
Partial sum on process 2 is: 38.0
Partial sum on process 3 is: 39.0
After Reduce, total sum on process 0 is: 105.0
Summary
We have shown how to directly communicate buffer-like objects using the mpi4py
module and its methods that start with an uppercase letter. The communication of buffer-like objects is faster, but less flexible, than the communication of Python objects.
You may read the MPI for Python tutorial page for more information about the mpi4py
module.