杰瑞科技汇

Java如何用RabbitMQ实现消息队列?

目录

  1. 核心概念回顾:理解 RabbitMQ 的几个关键组件。
  2. 环境准备:安装 RabbitMQ 和 Java 开发环境。
  3. Java 客户端库:介绍并选择合适的客户端库。
  4. 第一个程序:Hello World
    • 生产者
    • 消费者
  5. 工作队列:一个任务分发系统。
  6. 发布/订阅:消息广播给多个消费者。
  7. 路由:有选择地接收消息。
  8. 主题:基于模式的路由。
  9. RPC (Remote Procedure Call):实现远程调用。
  10. 高级主题与最佳实践
    • 连接管理
    • 消息确认机制
    • 持久化
    • 死信队列

核心概念回顾

在写代码之前,先快速回顾一下 RabbitMQ 的核心概念,这有助于理解代码中的各个部分。

Java如何用RabbitMQ实现消息队列?-图1
(图片来源网络,侵删)
  • 生产者:发送消息的应用程序。
  • 消费者:接收消息的应用程序。
  • 队列:消息的容器,存在于 RabbitMQ 服务器中,消息被投递到队列,然后从队列中被取出,队列就像一个 FIFO(先进先出)的缓冲区。
  • 交换机:生产者将消息发送到交换机,而不是直接发送到队列,交换机负责根据特定规则将消息路由到一个或多个队列。
  • 绑定:绑定是队列和交换机之间的规则,它告诉交换机如何将消息路由到队列。
  • 路由键:生产者在发送消息时可以携带一个路由键,交换机使用这个键与绑定的规则进行匹配,从而决定消息的去向。

环境准备

a. 安装 RabbitMQ

如果你还没有安装 RabbitMQ,最简单的方式是使用 Docker:

# 拉取 RabbitMQ 镜像 (包含管理界面)
docker pull rabbitmq:3-management
# 运行容器
# -d: 后台运行
# -p: 端口映射 (5672是客户端连接端口, 15672是管理界面端口)
# -e: 设置用户名和密码
docker run -d --name my-rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management

安装完成后,你可以通过浏览器访问 http://localhost:15672,使用用户名 admin 和密码 admin 登录管理界面。

b. Java 开发环境

确保你已经安装了 JDK (建议 8 或以上) 和 Maven 或 Gradle,这里我们以 Maven 为例。


Java 客户端库

RabbitMQ 官方提供了 Java 客户端库,这是最常用、最稳定的选择。

Java如何用RabbitMQ实现消息队列?-图2
(图片来源网络,侵删)

在你的 pom.xml 文件中添加依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version> <!-- 请使用最新版本 -->
</dependency>

第一个程序:Hello World

这是一个最简单的例子,包含一个生产者发送一条消息,一个消费者接收并打印这条消息。

生产者

Sender.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost"); // RabbitMQ 服务器地址
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 2. 创建连接和通道
        // try-with-resources 语句确保资源在使用后被自动关闭
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 3. 声明一个队列
            // 如果队列不存在,则创建;如果存在,则什么都不做。
            // durable=false: 队列在 RabbitMQ 重启后会丢失
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            // 4. 发送消息到队列
            // basicPublish 的第一个参数是交换机名称,我们使用默认交换机 ""。
            // 默认交换机的特点是:路由键必须与队列名完全匹配,消息才会被路由到该队列。
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者

Receiver.java

import com.rabbitmq.client.*;
import java.io.IOException;
public class Receiver {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 1. 声明队列 (与生产者保持一致)
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            // 2. 创建消费者
            // DeliverCallback 是一个接口,当收到消息时,其 handle 方法会被调用。
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            // 3. 开始消费消息
            // basicConsume 的第二个参数 autoAck=true 表示自动确认消息。
            // 一旦消息被消费者接收,RabbitMQ 就会立即将其从队列中删除。
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
            // 为了让消费者持续运行,我们让主线程休眠
            // 在实际应用中,消费者通常是作为一个长期运行的服务
            Thread.sleep(100000);
        }
    }
}

运行步骤:

  1. 先运行 Receiver,它会启动并等待消息。
  2. 然后运行 Sender,它会发送一条消息。
  3. Receiver 的控制台,你会看到打印出的消息。

工作队列

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而是将其排队,由一个或多个工作进程稍后处理。

核心特性:公平分发 为了避免某个工作进程一直忙碌而其他进程空闲,我们需要使用 Qos 设置来限制通道上未确认消息的数量。

生产者 (NewTask.java)

// ... Sender.java 中的 factory, connection, channel 设置 ...
String message = String.join(" ", argv); // 从命令行参数获取消息
// 发布消息
channel.basicPublish("", "task_queue", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");

消费者 (Worker.java)

// ... Receiver.java 中的 factory, connection, channel 设置 ...
// 设置 QoS: 一次只接收一条消息,处理完前不再接收新的
// prefetchCount=1 表示 RabbitMQ 不会在同一时间给一个消费者超过一条消息
channel.basicQos(1);
// 消费消息,这次我们使用手动消息确认
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    try {
        // 模拟耗时任务
        doWork(message);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println(" [x] Done");
        // 手动发送确认,告诉 RabbitMQ 这个消息我已经处理完了
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
};
// autoAck=false 关闭自动确认,改为手动确认
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> { });
// ... doWork 方法 ...
private static void doWork(String task) throws InterruptedException {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            Thread.sleep(1000);
        }
    }
}

发布/订阅

在这种模式下,一个消息可以被多个消费者同时接收,这需要使用 交换机临时队列

生产者 (EmitLog.java)

// ... factory, connection, channel 设置 ...
// 声明一个 "fanout" 类型的交换机
// fanout 交换机会将消息广播到所有它知道的队列,忽略路由键
channel.exchangeDeclare("logs", "BuiltinExchangeType.FANOUT");
String message = "Info: This is a log message.";
// 发布消息到交换机,路由键设为空,因为 fanout 交换机不关心它
channel.basicPublish("logs", "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");

消费者 (ReceiveLog.java)

// ... factory, connection, channel 设置 ...
// 1. 声明 fanout 交换机
channel.exchangeDeclare("logs", "BuiltinExchangeType.FANOUT");
// 2. 声明一个临时队列
// 队列名由服务器随机生成,并且是唯一的
// exclusive=true: 当消费者断开连接时,队列自动删除
String queueName = channel.queueDeclare().getQueue();
// 3. 将队列绑定到交换机
// 对于 fanout 交换机,路由键是无关紧要的,通常为空字符串
channel.queueBind(queueName, "logs", "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

运行: 启动多个 ReceiveLog 实例,然后运行 EmitLog,你会看到每个消费者都收到了相同的消息。


路由

发布/订阅会将消息广播给所有消费者,而路由允许你根据消息的严重性等标准来有选择地接收消息,这需要使用 direct 交换机。

生产者 (EmitLogDirect.java)

// ... factory, connection, channel 设置 ...
// 声明一个 "direct" 类型的交换机
channel.exchangeDeclare("direct_logs", "BuiltinExchangeType.DIRECT");
String severity = getSeverity(argv); // 从命令行获取路由键,如 "info", "warning", "error"
String message = getMessage(argv);
// 发布消息,并指定路由键
channel.basicPublish("direct_logs", severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

消费者 (ReceiveLogDirect.java)

// ... factory, connection, channel 设置 ...
channel.exchangeDeclare("direct_logs", "BuiltinExchangeType.DIRECT");
String queueName = channel.queueDeclare().getQueue();
// 从命令行获取一个或多个绑定键
for (String severity : argv) {
    // 将队列绑定到交换机,并指定绑定键
    channel.queueBind(queueName, "direct_logs", severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

运行:

  • 消费者1: java ReceiveLogDirect info warning (接收 info 和 warning 消息)
  • 消费者2: java ReceiveLogDirect error (只接收 error 消息)
  • 生产者: java EmitLogDirect error "This is an error message." (只有消费者2会收到)

主题

direct 交换机精确匹配路由键,而 topic 交换机则支持模式匹配,更灵活。

  • (星号) 代表一个单词。
  • (井号) 代表零个或多个单词。

生产者 (EmitLogTopic.java)

// ... factory, connection, channel 设置 ...
// 声明一个 "topic" 类型的交换机
channel.exchangeDeclare("topic_logs", "BuiltinExchangeType.TOPIC");
String routingKey = getRoutingKey(argv); // 从命令行获取路由键,如 "quick.orange.rabbit"
String message = getMessage(argv);
channel.basicPublish("topic_logs", routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

消费者 (ReceiveLogTopic.java)

// ... factory, connection, channel 设置 ...
channel.exchangeDeclare("topic_logs", "BuiltinExchangeType.TOPIC");
String queueName = channel.queueDeclare().getQueue();
// 从命令行获取一个或多个绑定模式
for (String bindingKey : argv) {
    channel.queueBind(queueName, "topic_logs", bindingKey);
}
// ... 消费逻辑 ...

运行示例:

  • 消费者: java ReceiveLogTopic "*.orange.*" (匹配所有中间是 "orange" 的路由键,如 "quick.orange.rabbit")
  • 消费者: java ReceiveLogTopic "*.*.rabbit" (匹配所有结尾是 "rabbit" 的路由键)
  • 生产者: java EmitLogTopic "lazy.brown.fox" "A lazy brown fox." (没有消费者匹配)
  • 生产者: java EmitLogTopic "quick.orange.rabbit" "A quick orange rabbit." (第一个消费者会收到)

RPC (Remote Procedure Call)

使用 RabbitMQ 实现 RPC 客户端/服务器模式,基本思路是:

  1. 客户端发送一个请求消息,并附带一个唯一的 回调队列 的名字。
  2. 服务器处理请求,并将结果发送到这个回调队列。
  3. 客户端在回调队列中等待并接收响应。

客户端 (RpcClient.java)

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class RpcClient {
    private final Connection connection;
    private final Channel channel;
    private final String callbackQueueName;
    public RpcClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        this.connection = factory.newConnection();
        this.channel = connection.createChannel();
        // 声明一个临时的、独占的、自动删除的回调队列
        callbackQueueName = channel.queueDeclare().getQueue();
    }
    public int call(int n) throws IOException, InterruptedException, ExecutionException, TimeoutException {
        // 使用 CompletableFuture 异步等待结果
        final CompletableFuture<Integer> response = new CompletableFuture<>();
        String corrId = UUID.randomUUID().toString();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(callbackQueueName)
                .build();
        // 发送请求
        channel.basicPublish("", "rpc_queue", props, String.valueOf(n).getBytes("UTF-8"));
        // 设置消费者来接收响应
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.complete(Integer.parseInt(new String(delivery.getBody(), "UTF-8")));
            }
        };
        channel.basicConsume(callbackQueueName, true, deliverCallback, consumerTag -> {});
        // 等待结果,设置超时
        return response.get();
    }
    public void close() throws IOException {
        connection.close();
    }
}
// 客户端主程序
class RpcMain {
    private static final int FIBONACCI_RPC = 30;
    public static void main(String[] argv) {
        RpcClient rpcClient = null;
        try {
            rpcClient = new RpcClient();
            System.out.println(" [x] Requesting fib(" + FIBONACCI_RPC + ")");
            int response = rpcClient.call(FIBONACCI_RPC);
            System.out.println(" [.] Got '" + response + "'");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (rpcClient != null) {
                try {
                    rpcClient.close();
                } catch (IOException _ignore) {}
            }
        }
    }
}

服务器 (RpcServer.java)

import com.rabbitmq.client.*;
public class RpcServer {
    private static final int FIBONACCI_RPC = 30;
    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare("rpc_queue", false, false, false, null);
            channel.basicQos(1); // 公平分发
            System.out.println(" [x] Awaiting RPC requests");
            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();
                String response = "";
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);
                    System.out.println(" [.] fib(" + n + ")");
                    response += fib(n);
                } catch (NumberFormatException e) {
                    System.out.println(" [.] " + e);
                } finally {
                    // 将结果发送到客户端指定的回调队列
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // 通知主线程可以继续接收下一个请求
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };
            channel.basicConsume("rpc_queue", false, deliverCallback, consumerTag -> {});
            // 等待直到有请求到达
            while (true) {
                synchronized (monitor) {
                    monitor.wait();
                }
            }
        }
    }
}

高级主题与最佳实践

a. 连接管理

不要为每个消息创建一个新连接,连接和通道都是昂贵的资源,最佳实践是:

  • 一个连接:为整个应用程序创建一个长连接。
  • 一个通道:为每个线程创建一个通道,如果应用是多线程的,可以为每个线程创建一个通道,并重用它们。

b. 消息确认机制

  • 自动确认 (autoAck=true):RabbitMQ 一旦将消息投递给消费者,就认为消息已被成功处理,如果消费者在处理过程中崩溃,消息会丢失,简单,但不安全。
  • 手动确认 (autoAck=false):消费者在处理完消息后,必须调用 channel.basicAck() 来显式确认,这是强烈推荐的方式,如果消费者在确认前崩溃,RabbitMQ 会将消息重新投递给其他消费者。

c. 持久化

为了确保 RabbitMQ 重启后消息和队列不丢失,需要做以下设置:

  • 队列持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null); (将 durable 设为 true)
  • 消息持久化:发布消息时,设置 MessageProperties.PERSISTENT_TEXT_PLAIN
    channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
  • 交换机持久化channel.exchangeDeclare("my_exchange", "direct", true);

注意:即使设置了持久化,也不能保证消息100%不丢失,RabbitMQ 在接收到消息并将其写入磁盘之前,可能发生崩溃,对于“不丢失”要求极高的场景,需要使用 Publisher Confirms 机制。

d. 死信队列

当消息成为“死信”时,它可以被重新路由到另一个交换机,这个交换机绑定的队列就是死信队列。 以下情况会导致消息成为死信:

  1. 消息被消费者拒绝 (basicRejectbasicNack) requeue=false
  2. 消息过期。
  3. 队列达到其最大长度。

配置示例:

// 声明主队列,并设置其死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare("main.queue", false, false, false, args);
// 声明死信交换机和队列
channel.exchangeDeclare("dlx.exchange", "direct");
channel.queueDeclare("dlq.queue", false, false, false, null);
channel.queueBind("dlq.queue", "dlx.exchange", "dlx.routing.key");

希望这份详细的指南能帮助你全面掌握在 Java 中使用 RabbitMQ!

分享:
扫描分享到社交APP
上一篇
下一篇