目录
- 核心概念回顾:理解 RabbitMQ 的几个关键组件。
- 环境准备:安装 RabbitMQ 和 Java 开发环境。
- Java 客户端库:介绍并选择合适的客户端库。
- 第一个程序:Hello World
- 生产者
- 消费者
- 工作队列:一个任务分发系统。
- 发布/订阅:消息广播给多个消费者。
- 路由:有选择地接收消息。
- 主题:基于模式的路由。
- RPC (Remote Procedure Call):实现远程调用。
- 高级主题与最佳实践
- 连接管理
- 消息确认机制
- 持久化
- 死信队列
核心概念回顾
在写代码之前,先快速回顾一下 RabbitMQ 的核心概念,这有助于理解代码中的各个部分。

- 生产者:发送消息的应用程序。
- 消费者:接收消息的应用程序。
- 队列:消息的容器,存在于 RabbitMQ 服务器中,消息被投递到队列,然后从队列中被取出,队列就像一个 FIFO(先进先出)的缓冲区。
- 交换机:生产者将消息发送到交换机,而不是直接发送到队列,交换机负责根据特定规则将消息路由到一个或多个队列。
- 绑定:绑定是队列和交换机之间的规则,它告诉交换机如何将消息路由到队列。
- 路由键:生产者在发送消息时可以携带一个路由键,交换机使用这个键与绑定的规则进行匹配,从而决定消息的去向。
环境准备
a. 安装 RabbitMQ
如果你还没有安装 RabbitMQ,最简单的方式是使用 Docker:
# 拉取 RabbitMQ 镜像 (包含管理界面) docker pull rabbitmq:3-management # 运行容器 # -d: 后台运行 # -p: 端口映射 (5672是客户端连接端口, 15672是管理界面端口) # -e: 设置用户名和密码 docker run -d --name my-rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
安装完成后,你可以通过浏览器访问 http://localhost:15672,使用用户名 admin 和密码 admin 登录管理界面。
b. Java 开发环境
确保你已经安装了 JDK (建议 8 或以上) 和 Maven 或 Gradle,这里我们以 Maven 为例。
Java 客户端库
RabbitMQ 官方提供了 Java 客户端库,这是最常用、最稳定的选择。

在你的 pom.xml 文件中添加依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version> <!-- 请使用最新版本 -->
</dependency>
第一个程序:Hello World
这是一个最简单的例子,包含一个生产者发送一条消息,一个消费者接收并打印这条消息。
生产者
Sender.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // RabbitMQ 服务器地址
factory.setUsername("admin");
factory.setPassword("admin");
// 2. 创建连接和通道
// try-with-resources 语句确保资源在使用后被自动关闭
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3. 声明一个队列
// 如果队列不存在,则创建;如果存在,则什么都不做。
// durable=false: 队列在 RabbitMQ 重启后会丢失
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// 4. 发送消息到队列
// basicPublish 的第一个参数是交换机名称,我们使用默认交换机 ""。
// 默认交换机的特点是:路由键必须与队列名完全匹配,消息才会被路由到该队列。
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消费者
Receiver.java
import com.rabbitmq.client.*;
import java.io.IOException;
public class Receiver {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 1. 声明队列 (与生产者保持一致)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 2. 创建消费者
// DeliverCallback 是一个接口,当收到消息时,其 handle 方法会被调用。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 3. 开始消费消息
// basicConsume 的第二个参数 autoAck=true 表示自动确认消息。
// 一旦消息被消费者接收,RabbitMQ 就会立即将其从队列中删除。
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
// 为了让消费者持续运行,我们让主线程休眠
// 在实际应用中,消费者通常是作为一个长期运行的服务
Thread.sleep(100000);
}
}
}
运行步骤:
- 先运行
Receiver,它会启动并等待消息。 - 然后运行
Sender,它会发送一条消息。 - 在
Receiver的控制台,你会看到打印出的消息。
工作队列
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而是将其排队,由一个或多个工作进程稍后处理。
核心特性:公平分发
为了避免某个工作进程一直忙碌而其他进程空闲,我们需要使用 Qos 设置来限制通道上未确认消息的数量。
生产者 (NewTask.java)
// ... Sender.java 中的 factory, connection, channel 设置 ...
String message = String.join(" ", argv); // 从命令行参数获取消息
// 发布消息
channel.basicPublish("", "task_queue", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
消费者 (Worker.java)
// ... Receiver.java 中的 factory, connection, channel 设置 ...
// 设置 QoS: 一次只接收一条消息,处理完前不再接收新的
// prefetchCount=1 表示 RabbitMQ 不会在同一时间给一个消费者超过一条消息
channel.basicQos(1);
// 消费消息,这次我们使用手动消息确认
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
// 模拟耗时任务
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
// 手动发送确认,告诉 RabbitMQ 这个消息我已经处理完了
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// autoAck=false 关闭自动确认,改为手动确认
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> { });
// ... doWork 方法 ...
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}
发布/订阅
在这种模式下,一个消息可以被多个消费者同时接收,这需要使用 交换机 和 临时队列。
生产者 (EmitLog.java)
// ... factory, connection, channel 设置 ...
// 声明一个 "fanout" 类型的交换机
// fanout 交换机会将消息广播到所有它知道的队列,忽略路由键
channel.exchangeDeclare("logs", "BuiltinExchangeType.FANOUT");
String message = "Info: This is a log message.";
// 发布消息到交换机,路由键设为空,因为 fanout 交换机不关心它
channel.basicPublish("logs", "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
消费者 (ReceiveLog.java)
// ... factory, connection, channel 设置 ...
// 1. 声明 fanout 交换机
channel.exchangeDeclare("logs", "BuiltinExchangeType.FANOUT");
// 2. 声明一个临时队列
// 队列名由服务器随机生成,并且是唯一的
// exclusive=true: 当消费者断开连接时,队列自动删除
String queueName = channel.queueDeclare().getQueue();
// 3. 将队列绑定到交换机
// 对于 fanout 交换机,路由键是无关紧要的,通常为空字符串
channel.queueBind(queueName, "logs", "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
运行:
启动多个 ReceiveLog 实例,然后运行 EmitLog,你会看到每个消费者都收到了相同的消息。
路由
发布/订阅会将消息广播给所有消费者,而路由允许你根据消息的严重性等标准来有选择地接收消息,这需要使用 direct 交换机。
生产者 (EmitLogDirect.java)
// ... factory, connection, channel 设置 ...
// 声明一个 "direct" 类型的交换机
channel.exchangeDeclare("direct_logs", "BuiltinExchangeType.DIRECT");
String severity = getSeverity(argv); // 从命令行获取路由键,如 "info", "warning", "error"
String message = getMessage(argv);
// 发布消息,并指定路由键
channel.basicPublish("direct_logs", severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
消费者 (ReceiveLogDirect.java)
// ... factory, connection, channel 设置 ...
channel.exchangeDeclare("direct_logs", "BuiltinExchangeType.DIRECT");
String queueName = channel.queueDeclare().getQueue();
// 从命令行获取一个或多个绑定键
for (String severity : argv) {
// 将队列绑定到交换机,并指定绑定键
channel.queueBind(queueName, "direct_logs", severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
运行:
- 消费者1:
java ReceiveLogDirect info warning(接收 info 和 warning 消息) - 消费者2:
java ReceiveLogDirect error(只接收 error 消息) - 生产者:
java EmitLogDirect error "This is an error message."(只有消费者2会收到)
主题
direct 交换机精确匹配路由键,而 topic 交换机则支持模式匹配,更灵活。
- (星号) 代表一个单词。
- (井号) 代表零个或多个单词。
生产者 (EmitLogTopic.java)
// ... factory, connection, channel 设置 ...
// 声明一个 "topic" 类型的交换机
channel.exchangeDeclare("topic_logs", "BuiltinExchangeType.TOPIC");
String routingKey = getRoutingKey(argv); // 从命令行获取路由键,如 "quick.orange.rabbit"
String message = getMessage(argv);
channel.basicPublish("topic_logs", routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
消费者 (ReceiveLogTopic.java)
// ... factory, connection, channel 设置 ...
channel.exchangeDeclare("topic_logs", "BuiltinExchangeType.TOPIC");
String queueName = channel.queueDeclare().getQueue();
// 从命令行获取一个或多个绑定模式
for (String bindingKey : argv) {
channel.queueBind(queueName, "topic_logs", bindingKey);
}
// ... 消费逻辑 ...
运行示例:
- 消费者:
java ReceiveLogTopic "*.orange.*"(匹配所有中间是 "orange" 的路由键,如 "quick.orange.rabbit") - 消费者:
java ReceiveLogTopic "*.*.rabbit"(匹配所有结尾是 "rabbit" 的路由键) - 生产者:
java EmitLogTopic "lazy.brown.fox" "A lazy brown fox."(没有消费者匹配) - 生产者:
java EmitLogTopic "quick.orange.rabbit" "A quick orange rabbit."(第一个消费者会收到)
RPC (Remote Procedure Call)
使用 RabbitMQ 实现 RPC 客户端/服务器模式,基本思路是:
- 客户端发送一个请求消息,并附带一个唯一的 回调队列 的名字。
- 服务器处理请求,并将结果发送到这个回调队列。
- 客户端在回调队列中等待并接收响应。
客户端 (RpcClient.java)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class RpcClient {
private final Connection connection;
private final Channel channel;
private final String callbackQueueName;
public RpcClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
this.connection = factory.newConnection();
this.channel = connection.createChannel();
// 声明一个临时的、独占的、自动删除的回调队列
callbackQueueName = channel.queueDeclare().getQueue();
}
public int call(int n) throws IOException, InterruptedException, ExecutionException, TimeoutException {
// 使用 CompletableFuture 异步等待结果
final CompletableFuture<Integer> response = new CompletableFuture<>();
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(callbackQueueName)
.build();
// 发送请求
channel.basicPublish("", "rpc_queue", props, String.valueOf(n).getBytes("UTF-8"));
// 设置消费者来接收响应
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.complete(Integer.parseInt(new String(delivery.getBody(), "UTF-8")));
}
};
channel.basicConsume(callbackQueueName, true, deliverCallback, consumerTag -> {});
// 等待结果,设置超时
return response.get();
}
public void close() throws IOException {
connection.close();
}
}
// 客户端主程序
class RpcMain {
private static final int FIBONACCI_RPC = 30;
public static void main(String[] argv) {
RpcClient rpcClient = null;
try {
rpcClient = new RpcClient();
System.out.println(" [x] Requesting fib(" + FIBONACCI_RPC + ")");
int response = rpcClient.call(FIBONACCI_RPC);
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (rpcClient != null) {
try {
rpcClient.close();
} catch (IOException _ignore) {}
}
}
}
}
服务器 (RpcServer.java)
import com.rabbitmq.client.*;
public class RpcServer {
private static final int FIBONACCI_RPC = 30;
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare("rpc_queue", false, false, false, null);
channel.basicQos(1); // 公平分发
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + n + ")");
response += fib(n);
} catch (NumberFormatException e) {
System.out.println(" [.] " + e);
} finally {
// 将结果发送到客户端指定的回调队列
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 通知主线程可以继续接收下一个请求
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume("rpc_queue", false, deliverCallback, consumerTag -> {});
// 等待直到有请求到达
while (true) {
synchronized (monitor) {
monitor.wait();
}
}
}
}
}
高级主题与最佳实践
a. 连接管理
不要为每个消息创建一个新连接,连接和通道都是昂贵的资源,最佳实践是:
- 一个连接:为整个应用程序创建一个长连接。
- 一个通道:为每个线程创建一个通道,如果应用是多线程的,可以为每个线程创建一个通道,并重用它们。
b. 消息确认机制
- 自动确认 (
autoAck=true):RabbitMQ 一旦将消息投递给消费者,就认为消息已被成功处理,如果消费者在处理过程中崩溃,消息会丢失,简单,但不安全。 - 手动确认 (
autoAck=false):消费者在处理完消息后,必须调用channel.basicAck()来显式确认,这是强烈推荐的方式,如果消费者在确认前崩溃,RabbitMQ 会将消息重新投递给其他消费者。
c. 持久化
为了确保 RabbitMQ 重启后消息和队列不丢失,需要做以下设置:
- 队列持久化:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);(将durable设为true) - 消息持久化:发布消息时,设置
MessageProperties.PERSISTENT_TEXT_PLAIN:channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); - 交换机持久化:
channel.exchangeDeclare("my_exchange", "direct", true);
注意:即使设置了持久化,也不能保证消息100%不丢失,RabbitMQ 在接收到消息并将其写入磁盘之前,可能发生崩溃,对于“不丢失”要求极高的场景,需要使用 Publisher Confirms 机制。
d. 死信队列
当消息成为“死信”时,它可以被重新路由到另一个交换机,这个交换机绑定的队列就是死信队列。 以下情况会导致消息成为死信:
- 消息被消费者拒绝 (
basicReject或basicNack)requeue=false。 - 消息过期。
- 队列达到其最大长度。
配置示例:
// 声明主队列,并设置其死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare("main.queue", false, false, false, args);
// 声明死信交换机和队列
channel.exchangeDeclare("dlx.exchange", "direct");
channel.queueDeclare("dlq.queue", false, false, false, null);
channel.queueBind("dlq.queue", "dlx.exchange", "dlx.routing.key");
希望这份详细的指南能帮助你全面掌握在 Java 中使用 RabbitMQ!
