杰瑞科技汇

python rabbitmq 广播

Python RabbitMQ 广播实现指南

RabbitMQ 广播(Publish/Subscribe)模式是一种消息传递模式,其中消息被发送到所有订阅了该交换机的队列,而不是特定的队列,这在 RabbitMQ 中通过使用 Fanout Exchange 实现。

基本概念

  • Exchange:接收消息并将其路由到一个或多个队列
  • Fanout Exchange:将消息广播到所有绑定的队列,忽略路由键
  • Queue:存储消息直到消费者处理
  • Binding:队列与交换机之间的连接关系

实现步骤

安装必要的库

pip install pika

广播发送者代码 (Publisher)

import pika
import time
# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个 fanout 类型的交换机
channel.exchange_declare(exchange='broadcast_exchange', exchange_type='fanout')
# 发送消息
message = "Hello, this is a broadcast message!"
channel.basic_publish(exchange='broadcast_exchange', routing_key='', body=message)
print(f" [x] Sent {message}")
# 关闭连接
connection.close()

广播接收者代码 (Consumer)

import pika
import time
# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 fanout 交换机
channel.exchange_declare(exchange='broadcast_exchange', exchange_type='fanout')
# 创建随机队列名
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 将队列绑定到交换机
channel.queue_bind(exchange='broadcast_exchange', queue=queue_name)
print(' [*] Waiting for broadcast messages. To exit press CTRL+C')
# 定义回调函数
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")
# 开始消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

运行多个接收者

你可以启动多个接收者实例,每个实例都会接收到广播消息:

# 终端1
python consumer.py
# 终端2
python consumer.py
# 终端3
python producer.py

高级用法

持久化广播

为了确保消息在 RabbitMQ 重启后不丢失,可以:

# 发送者持久化
channel.exchange_declare(exchange='broadcast_exchange', exchange_type='fanout', durable=True)
# 接收者持久化
channel.exchange_declare(exchange='broadcast_exchange', exchange_type='fanout', durable=True)
result = channel.queue_declare(queue='', exclusive=True, durable=True)

使用不同交换机名称

你可以创建多个 fanout 交换机来实现不同的广播频道:

# 发送者
channel.exchange_declare(exchange='news_exchange', exchange_type='fanout')
channel.basic_publish(exchange='news_exchange', routing_key='', body='Breaking news!')
# 接收者1 (订阅新闻)
channel.queue_bind(exchange='news_exchange', queue='news_queue')
# 接收者2 (订阅体育)
channel.exchange_declare(exchange='sports_exchange', exchange_type='fanout')
channel.queue_bind(exchange='sports_exchange', queue='sports_queue')

注意事项

  1. Fanout 交换机会忽略路由键,routing_key 参数可以为空
  2. 每个接收者都会收到完整的消息副本
  3. 广播模式适用于事件通知、日志记录等场景
  4. 如果没有消费者绑定,消息会被丢弃(除非设置了队列持久化)

完整示例

你可以从 RabbitMQ 官方仓库获取完整的广播示例代码: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/python/send.py https://github.com/rabbitmq/rabbitmq-tutorials/blob/python/receive.py

希望这个指南对你有所帮助!如果有任何问题或需要更详细的解释,请随时提问。

分享:
扫描分享到社交APP
上一篇
下一篇