Ruan Bekker's Blog

From a Curious mind to Posts on Github

Parallel Processing With Python and Multiprocessing Using Queue

Today I had the requirement to achieve a task by using parallel processing in order to save time.

The task to be achieved

For this demonstration, I have a list of people and each task needs to lookup its pet name and return to stdout. I want to spawn a task for each persons pet name lookup and run the tasks in parallel so that all the results can be returned back at once, instead of sequential.

This is a basic task, but you could have a CPU intensive job, where it will shine better.

Multiprocesing Queues

When using multiple processes, one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks.

The Queue type is a multi producer, multi consumer FIFO queues modelled on the queue.Queue class in the standard library. You can read more up on it here

Our Workflow

Our multiprocessing workflow will look like this:

  • We will define our data, which will be a dictionary of people and their pet names
  • We will define an output queue
  • Create a example function that will produce each task to the queue
  • Then we will setup a lost of processes that we want to run
  • From the list of processes that we defined, we will run each process, then wait and exit the completed processes
  • We will then consume from the queue. For each process in our processes list

Note that I also added a delay of 2 seconds, so that you can see that the tasks are run in parallel, so the delay will only be 2 seconds.

Our code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import multiprocessing as mp
import random
import string
import time

pet_maps = {
        "adam": {"pet_name": "max"},
        "steve": {"pet_name": "sylvester"},
        "michelle": {"pet_name": "fuzzy"},
        "frank": {"pet_name": "pete"},
        "will": {"pet_name": "cat"},
        "natasha": {"pet_name": "tweety"},
        "samantha": {"pet_name": "bob"},
        "peter": {"pet_name": "garfield"},
        "susan": {"pet_name": "zazu"},
        "josh": {"pet_name": "tom"},
    }

pet_owners = pet_maps.keys()

output = mp.Queue()

def get_pet_name(data, output):
    time.sleep(2)
    print('adding to queue')
    response = 'pet name: {}'.format(data)
    output.put(response)

processes = [mp.Process(target=get_pet_name, args=(pet_maps[name]['pet_name'], output)) for name in pet_owners]

for p in processes:
    p.start()

for p in processes:
    p.join()

print('consuming from queue:')
results = [output.get() for p in processes]
print(results)

Running the example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ python3 mp.py
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue

consuming from queue:
['pet name: max', 'pet name: sylvester', 'pet name: fuzzy', 'pet name: pete', 'pet name: cat', 'pet name: tweety', 'pet name: garfield', 'pet name: bob', 'pet name: zazu', 'pet name: tom']

Thank You

Please feel free to show support by, sharing this post, making a donation, subscribing or reach out to me if you want me to demo and write up on any specific tech topic.


Comments