In the previous post we introduced the Pool
class of the multiprocessing
module. In this post we continue on and introduce the Process
class, which makes it possible to have direct control over individual processes.
A process can be created by providing a target function and its input arguments to the Process
constructor. The process can then be started with the start
method and ended using the join
method. Below is a very simple example that prints the square of a number.
import multiprocessing as mp
def square(x):
print(x * x)
p = mp.Process(target=square, args=(5,))
p.start()
p.join()
Process and Queue
In practice, we often want the process and the function to return the computed result, rather than just printing the result as in the previous example. This can be achieved with help from the Queue
class in the multiprocessing
module. The Queue
class includes the put
method for depositing data and the get
method for retrieving data. The code in the earlier example can be changed as follows so that it returns the result (rather than printing it). In this example, a Queue
object named qout
is used to save the result.
import multiprocessing as mp
def square(x, q):
q.put(x * x)
qout = mp.Queue()
p = mp.Process(target=square, args=(5, qout))
p.start()
p.join()
result = qout.get()
print(result)
Now we can parallelize the code by creating multiple processes and running them simultaneously.
import multiprocessing as mp
def square(x, q):
q.put(x * x)
qout = mp.Queue()
processes = [mp.Process(target=square, args=(i, qout))
for i in range(2, 10)]
for p in processes:
p.start()
for p in processes:
p.join()
result = [qout.get() for p in processes]
print(result)
We would expect the output to be as follows.
[4, 9, 16, 25, 36, 49, 64, 81]
However the values in the Queue
object will be stored in the order in which the individual processes finish. This means that the order of the squares in qout
will not necessarily be in accordance with the ascending order of the input values to be squared. To demonstrate this, we can introduce some randomness in the execution time of the square function. This mimics what happens in realistic calculations where the computational load is not perfectly balanced amongst the processes.
from time import sleep
from random import randint
def square(x, q):
sleep(0.01 * randint(0, 100))
q.put(x * x)
Then, for example, we might obtain the following results, instead of the list of square numbers in ascending order.
[36, 9, 16, 81, 25, 64, 4, 49]
Bear in mind that the randomising function is only introduced in this short example to show the effect of asynchronous execution of processes. In realistic code that would perform far more calculations than in the short example, the order of the output values is almost always different from the order of the corresponding input values.
If we want the results to be output in the same order as the input values (so that in the initial short example the results would be in ascending order), we can pack each input value and the resulting value for its square in a tuple, and save the tuples in the Queue
object.
def square(x, q):
sleep(0.01 * randint(0, 100))
q.put((x, x * x))
After the processes have been executed, the ascending order is recovered by sorting the results saved in qout
.
unsorted_result = [qout.get() for p in processes]
result = [t[1] for t in sorted(unsorted_result)]
print(result)
You may have noticed that the above example is special in that the order of the processes coincides with that of the input values. However, more generally, if the input values are not in ascending order, we need to index the output values using the “process index” or “input index”, rather than the actual input value, in order to obtain the output values in the desired order. This is illustrated in the following code.
import multiprocessing as mp
from random import randint
from time import sleep
def square(i, x, q):
sleep(0.01 * randint(0, 100))
q.put((i, x * x))
input_values = [2, 4, 6, 8, 3, 5, 7, 9]
qout = mp.Queue()
processes = [mp.Process(target=square, args=(ind, val, qout))
for ind, val in enumerate(input_values)]
for p in processes:
p.start()
for p in processes:
p.join()
unsorted_result = [qout.get() for p in processes]
result = [t[1] for t in sorted(unsorted_result)]
print(result)
The output would then be as follows, where the values are in one-to-one correspondence with the input values [2, 4, 6, 8, 3, 5, 7, 9]
.
[4, 16, 36, 64, 9, 25, 49, 81]
Example: computing π
Now let’s revisit the example of computing π in the previous post. As mentioned before, the computed result from each process should be saved in a Queue
object. We therefore need to modify the calc_partial_pi
routine.
def calc_partial_pi(rank, nprocs, nsteps, dx, qout):
partial_pi = 0.0
for i in range(rank, nsteps, nprocs):
x = (i + 0.5) * dx
partial_pi += 4.0 / (1.0 + x * x)
partial_pi *= dx
qout.put(partial_pi)
Then we can set the number of steps and step size for numerical integration, as well as the desired number of processes. Ideally we would like to have one process per CPU core.
nsteps = 10000000
dx = 1.0 / nsteps
nprocs = mp.cpu_count()
Based on the desired number of processes, we prepare a list of input arguments and create the processes.
qout = mp.Queue()
inputs = [(rank, nprocs, nsteps, dx, qout) for rank in range(nprocs)]
processes = [mp.Process(target=calc_partial_pi, args=inp)
for inp in inputs]
After asynchronous execution of the processes, the computed value of π is obtained.
for p in processes:
p.start()
for p in processes:
p.join()
result = [qout.get() for p in processes]
pi = sum(result)
For a simple example like computing π, the Process
class provides very similar scaling to the Pool
class introduced in the previous post.
Summary
- The
Process
class makes it possible to control the processes directly. - The
Queue
class can be used to save results from the processes. - The processes are executed asynchronously.
- The order of the output is not guaranteed to correspond to that of the input values.