Of course! Using Python with Redis for queuing is a very common and powerful pattern. It's excellent for background task processing, decoupling application components, and improving application responsiveness.

Here’s a comprehensive guide covering the basics, best practices, and advanced patterns.
Why Use Redis for Queues?
- Speed: Redis is an in-memory data store, making it incredibly fast for adding and removing items from a queue.
- Simplicity: The data structures are simple (Lists, Sorted Sets), and the concepts are easy to grasp.
- Persistence: You can configure Redis to save data to disk, so your queue won't be lost on a server restart.
- Maturity: It's a battle-tested solution used by massive companies like GitHub, Twitter, and Instagram.
- Beyond Simple Queues: Redis can handle more complex scenarios like delayed tasks, priority queues, and rate limiting with its advanced data structures.
Core Concepts: Simple Queue (FIFO)
The foundation of a Redis queue is a List.
LPUSH/RPUSH: Add an item to the left (head) or right (tail) of the list. We'll useLPUSHto add new jobs.RPOP: Remove and return an item from the right (tail) of the list. This gives us a First-In, First-Out (FIFO) queue.
Visual:
[Job 3] [Job 2] [Job 1] <-- Head (Left) Tail (Right) -->
- Producer does
LPUSH my_queue "{job_data}". - Consumer does
RPOP my_queue.
Getting Started: Installation and Setup
First, you need to have Redis installed and running. You can install it easily with a package manager or use a Docker container.

# Using Docker (recommended for quick setup) docker run -d -p 6379:6379 redis
Next, install the Python Redis client. redis-py is the standard.
pip install redis
Example 1: A Simple Producer/Consumer
Let's create a basic producer that adds tasks to a queue and a consumer that processes them.
The Producer (producer.py)
This script will add a few simple "sleep" jobs to the queue.
import redis
import time
import json
# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Define a queue name
QUEUE_NAME = 'my_simple_queue'
# Create some "jobs" to do
jobs = [
{"task": "send_email", "to": "user@example.com"},
{"task": "process_image", "image_id": "12345"},
{"task": "generate_report", "type": "monthly"},
]
print("Producer: Adding jobs to the queue...")
for job in jobs:
# Use LPUSH to add the job to the front of the queue
# We serialize the job dict to a JSON string for storage
r.lpush(QUEUE_NAME, json.dumps(job))
print(f" -> Added job: {job['task']}")
time.sleep(1) # Simulate some work between adding jobs
print("Producer: Finished adding jobs.")
The Consumer (consumer.py)
This script will continuously run, check for jobs, and process them.

import redis
import time
import json
import os
# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)
QUEUE_NAME = 'my_simple_queue'
def process_job(job_data):
"""Simulates doing work."""
job = json.loads(job_data)
print(f"Consumer: Processing job -> {job['task']}...")
# Simulate a long-running task
time.sleep(2)
print(f"Consumer: Finished job -> {job['task']}")
def listen_for_jobs():
"""Continuously listens for jobs on the queue."""
print("Consumer: Listening for jobs...")
while True:
# RPOP blocks and waits for an item if the queue is empty.
# It returns None immediately if the queue is empty (non-blocking).
# We'll use a blocking version for a more realistic example.
# The `bpop` command (Blocking Pop) waits for a new item.
# `bpop` returns a tuple: (b'queue_name', b'job_data')
# We set a timeout of 5 seconds to avoid blocking indefinitely.
result = r.brpop(QUEUE_NAME, timeout=5)
if result:
# We only care about the job data, which is the second element
job_data = result[1]
process_job(job_data)
else:
# No job was received in the last 5 seconds
print("Consumer: No jobs in the last 5 seconds. Still listening...")
if __name__ == "__main__":
listen_for_jobs()
How to Run:
- Start the consumer in one terminal:
python consumer.py - Run the producer in another terminal:
python producer.py
You'll see the consumer pick up and process the jobs one by one as the producer adds them.
The Problem with RPOP and Race Conditions
The simple RPOP approach has a major flaw: Race Conditions.
Imagine a scenario:
- A consumer pops a job from the queue.
- The consumer crashes before it finishes processing the job.
- The job is lost forever! It's not in the queue anymore, and it was never completed.
The Solution: Reliable Queues with RPUSH/LPOP and BRPOPLPUSH
The industry-standard solution to this problem is the "consumer/producer with a waiting list" pattern. It uses two lists:
- The Main Queue (
my_queue): Where new jobs are added. - The "Processing" List (
my_queue:processing): A temporary holding area for a job while it's being worked on.
The workflow is:
- Consumer:
BRPOPLPUSH my_queue my_queue:processing 30- This is an atomic operation.
- It Blocks and Removes a job from the tail of
my_queue. - It Pushes that same job to the head of
my_queue:processing. - If no job is available, it waits for up to 30 seconds.
- Consumer: Now it has the job. It processes it.
- Consumer: If processing is successful, it removes the job from the
processinglist:LREM my_queue:processing 1 <job_data>. - Consumer: If the consumer crashes, the job will be stuck in the
processinglist. We can have a separate "reaper" process that moves jobs fromprocessingback to the main queue if they've been there for too long.
Updated Reliable Consumer (reliable_consumer.py)
import redis
import time
import json
# --- Connection ---
r = redis.Redis(host='localhost', port=6379, db=0)
MAIN_QUEUE = 'my_reliable_queue'
PROCESSING_QUEUE = f'{MAIN_QUEUE}:processing'
def process_job(job_data):
"""Simulates doing work."""
job = json.loads(job_data)
print(f"Consumer: Processing job -> {job['task']}...")
time.sleep(2)
# Simulate a potential failure
if "fail" in job:
raise Exception(f"Simulated failure for job: {job['task']}")
print(f"Consumer: Finished job -> {job['task']}")
def listen_for_jobs():
"""Reliably listens for jobs."""
print("Reliable Consumer: Listening for jobs...")
while True:
# 1. Atomically move a job from the main queue to the processing queue
# This command is atomic and prevents race conditions.
result = r.brpoplpush(MAIN_QUEUE, PROCESSING_QUEUE, timeout=5)
if result:
job_data = result
print(f"Consumer: Received job: {job_data.decode('utf-8')}")
try:
# 2. Process the job
process_job(job_data)
# 3. If successful, remove it from the processing queue
r.lrem(PROCESSING_QUEUE, 1, job_data)
print("Consumer: Job successfully completed and removed from processing queue.")
except Exception as e:
print(f"Consumer: Error processing job: {e}. Job will remain in processing queue.")
else:
print("Reliable Consumer: No jobs in the last 5 seconds. Still listening...")
if __name__ == "__main__":
listen_for_jobs()
This pattern is much more robust and is the basis for many production-ready queueing systems built on Redis.
Advanced Queues
a) Priority Queues
To implement a priority queue, you use a Sorted Set.
ZADD: Add an element with a "score" (the priority). Lower scores are higher priority.BZPOPMIN: Block and pop the element with the lowest score.
Example:
# Producer adds a high priority job (score=1)
r.zadd("priority_queue", {"{'task': 'critical_alert'}": 1})
# Producer adds a low priority job (score=10)
r.zadd("priority_queue", {"{'task': 'log_cleanup'}": 10})
# Consumer gets the highest priority job first
job_data = r.bzpopmin("priority_queue", timeout=5)
# job_data will be: (b'priority_queue', b"{'task': 'critical_alert'}", 1.0)
b) Delayed Queues
To schedule a job to run in the future, use a Sorted Set with the timestamp as the score.
- Producer:
ZADD delayed_queue "{job_data}" <timestamp_in_future> - Consumer (Worker):
- Get the current time:
now = time.time(). - Find all jobs ready to run:
r.zrangebyscore("delayed_queue", 0, now). - For each job found, remove it from the delayed queue and add it to the main processing queue:
r.zrem("delayed_queue", job_data)andr.lpush("main_queue", job_data).
- Get the current time:
Popular Libraries (Don't Reinvent the Wheel)
While it's great to know how it works under the hood, for production systems, consider using a well-tested library that handles these complexities for you.
-
RQ (Redis Queue): A simple, lightweight library. It's fantastic for getting started and for many common use cases. It uses the reliable
BRPOPLPUSHpattern we discussed.- Pros: Easy to use, great for simple background tasks.
- Cons: Less feature-rich than Celery.
-
Celery: The powerhouse of distributed task queues. It supports Redis, RabbitMQ, and other brokers. It has a massive ecosystem of features:
- Periodic tasks (like a cron job).
- Retries and error handling.
- Result backends (to store the result of a task).
- Monitoring and management tools (Flower).
- Groups and chains of tasks.
- Pros: Extremely powerful, scalable, and the industry standard for complex systems.
- Cons: More complex to set up and configure.
Summary
| Feature | Simple LPUSH/RPOP |
Reliable BRPOPLPUSH |
Library (RQ/Celery) |
|---|---|---|---|
| Reliability | Low (Jobs can be lost on crash) | High (Jobs are safe until completion) | High |
| Race Condition | Yes | No (Atomic operation) | No |
| Complexity | Very Low | Medium | Low to High |
| Best For | Quick scripts, learning, non-critical tasks | Most production use cases | Simple to complex, robust systems |
For any serious application, start with the BRPOPLPUSH pattern or use a library like RQ or Celery that implements it for you.
