This is a getting started on python-rq tutorial and I will demonstrate how to work with asynchronous tasks using python redis queue (python-rq).
What will we be doing
We want a client to submit 1000’s of jobs in a non-blocking asynchronous fashion, and then we will have workers which will consume these jobs from our redis queue, and process those tasks at the rate of what our consumer can handle.
The nice thing about this is that, if our consumer is unavailable for processing the tasks will remain in the queue and once the consumer is ready to consume, the tasks will be executed. It’s also nice that its asynchronous, so the client don’t have to wait until the task has finished.
We will run a redis server using docker, which will be used to queue all our jobs, then we will go through the basics in python and python-rq such as:
Writing a Task
Enqueueing a Job
Getting information from our queue, listing jobs, job statuses
Running our workers to consume from the queue and action our tasks
Basic application which queues jobs to the queue, consumes and action them and monitors the queue
Redis Server
You will require docker for this next step, to start the redis server:
1
$ docker run --rm -itd --name redis -p 6379:6379 redis:alpine
Python RQ
Install python-rq:
1
$ pip install rq
Create the task which will be actioned by our workers, in our case it will just be a simple function that adds all the numbers from a given string to a list, then adds them up and return the total value.
This is however a very basic task, but its just for demonstration.
Our tasks.py:
12345678910
def sum_numbers_from_string(string):
numbers = []
for each_character in string:
if each_character.isdigit():
numbers.append(int(each_character))
total = 0
for each_number in numbers:
total=total+each_number
return total
To test this locally:
123
>>> from tasks import sum_numbers_from_string
>>> sum_numbers_from_string('adje-fje5-sjfdu1s-gdj9-asd1fg')
16
Now, lets import redis and redis-queue, with our tasks and instantiate a queue object:
12345
>>> from redis import Redis
>>> from rq import Connection, Queue, Worker
>>> from tasks import sum_numbers_from_string
>>> redis_connection = Redis(host='localhost', port=6379, db=0)
>>> q = Queue(connection=redis_connection)
Submit a Task to the Queue
Let’s submit a task to the queue:
1
>>> result = q.enqueue(sum_numbers_from_string, 'hbj2-plg5-2xf4r1s-f2lf-9sx4ff')
We have a couple of properties from result which we can inspect, first let’s have a look at the id that we got back when we submitted our task to the queue:
Now that our task is queued, let’s fire of our worker to consume the job from the queue and action the task:
12345678910
>>> w = Worker([q], connection=redis_connection)
>>> w.work()
14:05:35 Worker rq:worker:49658973741d4085961e34e9641227dd: started, version 1.4.1
14:05:35 Listening on default...
14:05:35 Cleaning registries for queue: default
14:05:35 default: tasks.sum_numbers_from_string('hbj2-plg5-2xf4r1s-f2lf-9sx4ff') (5a607474-cf1b-4fa5-9adb-f8437555a7e7)
14:05:40 default: Job OK (5a607474-cf1b-4fa5-9adb-f8437555a7e7)
14:05:40 Result is kept for 500 seconds
14:05:59 Warm shut down requested
True
Now, when we get the status of our job, you will see that it finished:
12
>>> result.get_status()
'finished'
And to get the result from our worker:
12
>>> result.result
29
And like before, if you dont have context of your job id, you can get the job id, then return the result:
123
>>> result = fetched_job = q.fetch_job('5a607474-cf1b-4fa5-9adb-f8437555a7e7')
>>> result.result
29
Naming Queues
We can namespace our tasks into specific queues, for example if we want to create queue1: