RabbitMQ 是一个功能强大且广泛使用的消息代理(Message Broker),它实现了高级消息队列协议(AMQP),它的核心作用是解耦生产者和消费者,实现异步通信、流量削峰、任务分发等。

我们将使用 Python 最流行的 RabbitMQ 客户端库:pika。
目录
- 准备工作
- 安装 RabbitMQ 服务器
- 安装 Python
pika库
- 核心概念
- 生产者
- 消费者
- 队列
- 交换器
- 路由键
- 绑定
- 示例代码
- 示例 1:最简单的 "Hello World" 模型(点对点)
- 示例 2:工作队列模型(任务分发)
- 示例 3:发布/订阅模型(广播)
- 示例 4:路由模型(有选择地接收)
- 进阶主题
- 消息确认
- 持久化
- 虚拟主机
准备工作
a. 安装 RabbitMQ 服务器
如果你还没有 RabbitMQ 服务器,最简单的方式是使用 Docker:
# 拉取并启动 RabbitMQ 镜像(默认启用管理界面) docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
启动后,你可以通过浏览器访问 http://localhost:15672 来查看管理界面。
- 默认用户名:
guest - 默认密码:
guest
b. 安装 Python pika 库
在你的 Python 项目中,使用 pip 安装 pika:

pip install pika
核心概念
在写代码之前,必须理解 RabbitMQ 的几个核心组件:
- 生产者: 发送消息的程序。
- 消费者: 接收消息的程序。
- 队列: 消息的容器,存在于 RabbitMQ 服务器中,消息只能被存放在队列里,多个消费者可以从一个队列中获取消息。
- 交换器: 生产者将消息发送到交换器,而不是直接发送到队列,交换器负责根据特定规则将消息路由到一个或多个队列。
- 路由键: 生产者在发送消息时指定一个路由键,交换器会根据这个键来决定消息的去向。
- 绑定: 将队列和交换器关联起来的规则,通常也会指定一个绑定键,用于匹配路由键。
示例代码
我们将从最简单的模型开始,逐步深入。
示例 1:最简单的 "Hello World" 模型(点对点)
这个模型中,一个生产者发送一条消息到一个队列,一个消费者从该队列中接收并消费这条消息。
核心思想: 生产者 -> 队列 -> 消费者
代码结构:
我们将创建两个文件:send.py (生产者) 和 receive.py (消费者)。
a. 生产者 (send.py)
import pika
# 1. 连接到 RabbitMQ 服务器
# 默认连接到本地服务器的 5672 端口
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 2. 声明一个队列
# 如果队列不存在,则创建它,如果已存在,则忽略。
# durable=True 表示队列持久化,即使 RabbitMQ 重启,队列也不会丢失。
channel.queue_declare(queue='hello', durable=True)
# 3. 发送消息到 'hello' 队列
# exchange: 空字符串表示使用默认交换器(default exchange)
# routing_key: 指定要发送到的队列名称
# body: 消息内容
# properties=pika.BasicProperties(delivery_mode=2) 表示消息持久化
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent 'Hello World!'")
# 4. 关闭连接
connection.close()
b. 消费者 (receive.py)
import pika
import time
# 1. 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 2. 声明队列(与生产者保持一致)
# 这一步是必要的,因为消费者可能在生产者之前启动。
# 确保队列存在,否则消费者将无法接收消息。
channel.queue_declare(queue='hello', durable=True)
# 定义一个回调函数,用于处理接收到的消息
def callback(ch, method, properties, body):
""" ch: channel, method: 消息方法, properties: 消息属性, body: 消息体 """
print(f" [x] Received {body.decode()}")
# 模拟一些耗时的工作
time.sleep(2)
print(" [x] Done")
# 手动发送消息确认,告诉 RabbitMQ 这个消息已经被处理完毕
# 只有在收到确认后,RabbitMQ 才会将该消息从队列中移除
ch.basic_ack(delivery_tag=method.delivery_tag)
# 3. 从队列 'hello' 中消费消息
# no_ack=False 表示需要手动发送消息确认
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 4. 开始消费,进入阻塞状态,等待消息
channel.start_consuming()
如何运行:
- 先运行消费者:
python receive.py,它会启动并等待消息。 - 在另一个终端窗口,运行生产者:
python send.py,你会看到消费者终端打印出接收到的消息。
示例 2:工作队列模型(任务分发)
这个模型允许多个消费者从同一个队列中获取任务,实现负载均衡。
核心思想: 多个生产者 -> 队列 -> 多个消费者
与示例 1 的关键区别:
- 公平分发: 默认情况下,RabbitMQ 会将消息按顺序发送下一个消费者,而不管消费者是否处理完毕,为了确保一个繁忙的消费者不会积压所有任务,而空闲的消费者没有任务,我们需要设置
prefetch_count。 - 手动确认: 这一点非常重要,可以防止消费者在处理任务崩溃时任务丢失。
修改消费者代码 (worker.py)
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个名为 'task_queue' 的持久化队列
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.')) # 模拟耗时任务,'.'越多耗时越长
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认
# 设置预取计数为 1
# 这告诉 RabbitMQ 不要一次给一个消费者超过一条消息。
# 只有当消费者处理完并确认了上一条消息后,才会发送下一条。
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
生产者代码 (new_task.py)
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个名为 'task_queue' 的持久化队列
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World..."
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2)) # 消息持久化
print(f" [x] Sent {message}")
connection.close()
如何运行:
- 启动两个或多个消费者实例:
python worker.py python worker.py
- 启动一个或多个生产者实例,发送不同耗时的任务:
python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message....
你会观察到,任务会公平地分配给不同的消费者,即使它们的处理速度不同。
示例 3:发布/订阅模型(广播)
在这个模型中,生产者发送消息到“交换器”,所有绑定到此交换器的队列都会收到该消息的副本。
核心思想: 生产者 -> 交换器 -> 队列1 -> 消费者1 -> 队列2 -> 消费者2 -> 队列3 -> 消费者3
关键点:
- 交换器类型:
fanout,它会将收到的所有消息广播到所有它知道的队列中。 - 临时队列: 为了演示,我们为每个消费者创建一个唯一的、临时的队列,当消费者断开连接时,队列自动删除。
生产者 (emit_log.py)
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个 'fanout' 类型的交换器
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()
消费者 (receive_logs.py)
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个 'fanout' 类型的交换器
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 创建一个唯一的、临时的队列
# exclusive=True 表示当消费者断开连接时,队列会被自动删除
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 将队列绑定到 'logs' 交换器
# 对于 fanout 交换器,routing_key 是无用的,所以设为空字符串
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
如何运行:
- 启动两个或多个消费者,它们都会等待消息:
python receive_logs.py python receive_logs.py
- 启动一个生产者发送消息:
python emit_log.py "This is a warning log."
你会看到所有消费者终端都打印出了这条消息。
示例 4:路由模型(有选择地接收)
这是发布/订阅模型的一个增强版,它允许根据路由键有选择地接收消息。
核心思想: 生产者 -> 交换器 -> 队列1 (绑定 key="info") -> 消费者1 -> 队列2 (绑定 key="error") -> 消费者2
关键点:
- 交换器类型:
direct,它会将消息路由到routing_key与binding_key完全匹配的队列。
生产者 (emit_log_direct.py)
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个 'direct' 类型的交换器
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or "Hello World!"
channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=message)
print(f" [x] Sent {severity}: '{message}'")
connection.close()
消费者 (receive_logs_direct.py)
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 创建临时队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 从命令行参数获取要监听的 severities (routing keys)
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
# 将队列绑定到交换器,可以绑定多个 binding_key
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}: {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
如何运行:
- 启动一个只接收
info和warning日志的消费者:python receive_logs_direct.py info warning
- 启动一个只接收
error日志的消费者:python receive_logs_direct.py error
- 发送不同级别的日志:
python emit_log_direct.py info "This is an info message." python emit_log_direct.py warning "This is a warning message." python emit_log_direct.py error "This is an error message."
你会看到,第一个消费者收到了
info和warning消息,而第二个消费者只收到了error消息。
进阶主题
消息确认
- 自动确认 (
auto_ack=True): 消费者一旦收到消息,RabbitMQ 就会立即将其从队列中移除,如果消费者在处理过程中崩溃,消息就会丢失。 - 手动确认 (
auto_ack=False): 这是推荐的做法,消费者在处理完消息后,需要调用ch.basic_ack()来确认,如果消费者在确认前崩溃,RabbitMQ 会将该消息重新投递给其他消费者,确保消息不丢失。
持久化
- 队列持久化:
channel.queue_declare(queue='my_queue', durable=True) - 消息持久化:
properties=pika.BasicProperties(delivery_mode=2) - 交换器持久化:
channel.exchange_declare(exchange='my_exchange', durable=True)
注意: 持久化并不能保证 100% 不丢失,在消息刚写入 RabbitMQ 内存但还未写入磁盘时,服务器发生宕机,消息仍然会丢失,但对于大多数应用场景,持久化已经足够。
虚拟主机
RabbitMQ 使用“虚拟主机”(Virtual Hosts, vhosts)来隔离不同环境的资源(如用户、队列、交换器),这类似于数据库中的 schema。
- 默认的 vhost 是 。
- 在连接时可以指定:
pika.ConnectionParameters('localhost', virtual_host='my_vhost') - 在 RabbitMQ 管理界面中可以方便地创建和管理 vhost。
| 模型 | 交换器类型 | 路由规则 | 典型场景 |
|---|---|---|---|
| Hello World | 默认 (AMQP default) |
消息直接发送到指定队列 | 简单的点对点通信 |
| 工作队列 | 默认 (AMQP default) |
轮询分发到多个消费者 | 耗时任务分发,实现负载均衡 |
| 发布/订阅 | fanout |
广播到所有绑定队列 | 向多个接收方广播通知(如日志、系统公告) |
| 路由 | direct |
根据 routing_key 精确匹配 |
根据消息类型有选择地接收(如只处理错误日志) |
| 主题 | topic |
根据通配符匹配 routing_key (如 *.stock.#) |
更灵活的路由规则,如按股票代码过滤行情 |
从 pika 开始是学习 RabbitMQ 的好方法,当你熟悉了这些基本模型后,可以进一步探索更高级的功能,如 RPC(远程过程调用)和 topic 交换器,希望这份指南对你有帮助!
