杰瑞科技汇

Redis如何实现Java消息队列?

Redis 本身不是一个专业的消息队列系统(如 RabbitMQ、Kafka),但它提供了一些数据结构,可以非常方便地快速搭建一个轻量级、功能尚可的消息队列。

Redis如何实现Java消息队列?-图1
(图片来源网络,侵删)

下面我将从原理、具体实现、优缺点、以及生产级方案演进等多个角度进行阐述。


核心原理:Redis 的哪些数据结构适合做 MQ?

消息队列的核心操作是:生产者将消息放入队列,消费者从队列中取出消息,在 Redis 中,以下几个数据结构天然适合这个场景:

a) 列表 - 最经典、最简单的实现

这是实现 MQ 最直接、最常用的方式。

  • 数据结构: List
  • 核心命令:
    • LPUSH key value: 将消息推入列表的头部(左端),生产者常用。
    • RPUSH key value: 将消息推入列表的尾部(右端),生产者常用。
    • RPOP key: 从列表的尾部(右端)弹出一个消息,消费者常用。
    • LPOP key: 从列表的头部(左端)弹出一个消息,消费者常用。
  • 工作模式:
    • 生产者: LPUSH my_queue "message_content"
    • 消费者: RPOP my_queue
  • 特点:
    • 先进先出: 非常符合队列的定义。
    • 简单: 实现逻辑最简单。

b) Streams - 最专业、功能最强大的实现 (Redis 5.0+)

Redis Streams 是专门为消息流场景设计的数据结构,功能非常强大,是目前 Redis 实现生产级 MQ 的首选方案

Redis如何实现Java消息队列?-图2
(图片来源网络,侵删)
  • 数据结构: Stream
  • 核心概念:
    • Stream: 消息的集合,相当于一个队列。
    • Producer: 生产者,向 Stream 添加消息。
    • Consumer: 消费者,从 Stream 读取消息。
    • Consumer Group: 消费者组,允许多个消费者协同消费一个 Stream,每条消息只被组内的一个消费者处理,实现负载均衡。
    • Message ID: 每条消息的唯一 ID,格式为 timestamp-sequence
    • Pending Entries List (PEL): 待处理列表,记录已经被分发给某个消费者但尚未被确认的消息,防止消息丢失。
  • 核心命令:
    • XADD stream_name * field1 value1 field2 value2: 向 Stream 添加一条消息。
    • XREAD COUNT count STREAMS stream_name id: 从一个或多个 Stream 读取消息。
    • XREADGROUP GROUP group_name consumer_name COUNT count STREAMS stream_name >: 从消费者组中读取新消息(> 表示从未读取过的消息开始)。
    • XACK stream_name group_name message_id: 消费者确认消息已处理,从 PEL 中移除。
    • XGROUP CREATE stream_name group_id id: 创建消费者组。
  • 特点:
    • 消费者组: 支持多消费者消费,实现负载均衡和消息不重复消费。
    • 消息持久化: 消息会一直存在于 Stream 中,除非被主动删除。
    • 消息确认: 通过 XACK 机制确保消息被成功处理,防止丢失。
    • 阻塞读取: XREAD 命令支持阻塞,类似 BRPOP,可以实时获取新消息。

c) 发布/订阅 - 特殊场景的 MQ

Pub/Sub 不适合传统的队列模型,而更适合事件通知。

  • 数据结构: 不特定,基于频道。
  • 核心命令:
    • PUBLISH channel message: 发布消息到频道。
    • SUBSCRIBE channel: 订阅频道,接收消息。
  • 特点:
    • 一对多: 一个消息可以被多个订阅者接收。
    • 无状态: 消息一旦发布,如果订阅者不在线,消息就会丢失,它不存储历史消息。
    • 不保证顺序: 消息是并发的,不保证到达顺序。
    • 适用场景: 实时通知、聊天室、日志广播等。

Java 实现示例

我们将使用流行的 Java Redis 客户端 Lettuce(推荐,支持异步和响应式编程)和 Jedis 来演示。

准备工作

pom.xml 中添加依赖:

<!-- 使用 Lettuce 作为客户端 -->
<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>6.2.6.RELEASE</version> <!-- 使用最新稳定版 -->
</dependency>
<!-- 或者使用 Jedis -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.3.1</version> <!-- 使用最新稳定版 -->
</dependency>

基于 List 的简单队列

生产者

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class SimpleMQProducer {
    private Jedis jedis;
    public SimpleMQProducer() {
        this.jedis = new Jedis("localhost", 6379);
    }
    public void produce(String queueName, String message) {
        // 使用 Pipeline 可以提高批量操作的性能
        Pipeline pipeline = jedis.pipelined();
        // LPUSH 将消息放入队列头部
        pipeline.lpush(queueName, message);
        pipeline.sync(); // 提交执行
        System.out.println("Produced message: " + message);
    }
    public static void main(String[] args) throws InterruptedException {
        SimpleMQProducer producer = new SimpleMQProducer();
        for (int i = 0; i < 10; i++) {
            producer.produce("my_simple_queue", "Hello Redis MQ " + i);
            Thread.sleep(1000); // 每秒生产一条消息
        }
        producer.jedis.close();
    }
}

消费者

import redis.clients.jedis.Jedis;
public class SimpleMQConsumer {
    private Jedis jedis;
    public SimpleMQConsumer() {
        this.jedis = new Jedis("localhost", 6379);
    }
    public void consume(String queueName) {
        // BRPOP 是阻塞版本的 RPOP,当列表为空时,会阻塞直到有新消息或超时
        // 0 表示无限期阻塞
        while (true) {
            // RPOP 阻塞获取列表尾部元素
            String message = jedis.brpop(0, queueName).get(1); // 返回的是一个列表,第二个元素是消息内容
            if (message != null) {
                System.out.println("Consumed message: " + message);
                // 模拟消息处理
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {
        SimpleMQConsumer consumer = new SimpleMQConsumer();
        consumer.consume("my_simple_queue");
    }

问题: 这个简单的实现有严重缺陷:如果消费者在处理消息时崩溃了,消息就会丢失,因为没有消息确认机制。


基于 Streams 的专业队列 (推荐)

生产者

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
public class StreamMQProducer {
    private RedisStreamCommands<String, String> syncCommands;
    public StreamMQProducer() {
        RedisClient redisClient = RedisClient.create("redis://localhost:6379");
        StatefulRedisConnection<String, String> connection = redisClient.connect();
        this.syncCommands = connection.sync();
    }
    public void produce(String streamName, String message) {
        // XADD 添加消息,* 表示让 Redis 自动生成 ID
        String messageId = syncCommands.xadd(streamName, Map.of("message", message));
        System.out.println("Produced message with ID: " + messageId);
    }
    public static void main(String[] args) throws InterruptedException {
        StreamMQProducer producer = new StreamMQProducer();
        for (int i = 0; i < 10; i++) {
            producer.produce("my_stream", "Hello Redis Stream MQ " + i);
            Thread.sleep(1000);
        }
    }
}

消费者

import io.lettuce.core.RedisClient;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
import java.util.Map;
public class StreamMQConsumer {
    private RedisStreamCommands<String, String> syncCommands;
    private String groupName = "my_consumer_group";
    private String consumerName = "consumer-1";
    public StreamMQConsumer() {
        RedisClient redisClient = RedisClient.create("redis://localhost:6379");
        StatefulRedisConnection<String, String> connection = redisClient.connect();
        this.syncCommands = connection.sync();
        // 创建消费者组,如果已存在则忽略
        // '0' 表示从 Stream 的第一条消息开始读
        try {
            syncCommands.xgroupCreate(StreamMQKey.Stream, groupName, "0");
        } catch (Exception e) {
            System.out.println("Consumer group already exists.");
        }
    }
    public void consume() {
        while (true) {
            // XREADGROUP 从消费者组中读取新消息
            // '>' 表示从未处理过的消息开始
            // COUNT 1 表示一次只读取一条
            // BLOCK 0 表示无限期阻塞
            Map<String, StreamMessage<String, String>> messages = syncCommands.xreadgroup(
                    Map.entry(groupName, consumerName),
                    Map.entry(StreamMQKey.Stream, ">"),
                    1, 0);
            if (!messages.isEmpty()) {
                StreamMessage<String, String> message = messages.get(StreamMQKey.Stream);
                System.out.println("Consumer " + consumerName + " received message: " + message.getBody());
                // 模拟处理消息
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 处理完成后,发送 ACK 确认
                syncCommands.xack(StreamMQKey.Stream, groupName, message.getId());
                System.out.println("Acknowledged message ID: " + message.getId());
            }
        }
    }
    public static void main(String[] args) {
        StreamMQConsumer consumer = new StreamMQConsumer();
        consumer.consume();
    }
}

注意:在实际应用中,消费者组名和消费者名应该是动态生成的,例如使用 UUID。


优缺点分析

特性 基于列表 基于 Streams 基于发布/订阅
实现复杂度
消息持久化 是(数据在内存)
消息可靠性 (无ACK) (有ACK机制)
多消费者 可以,但需自行实现负载均衡 (消费者组,自动负载均衡) (一对多)
消息不重复消费 难以保证 可以保证 (通过消费者组) 不适用
消息顺序 保证 FIFO 保证单个消费者顺序 不保证
阻塞消费 BRPOP XREAD with BLOCK SUBSCRIBE (天生阻塞)
适用场景 简单任务、日志收集、非核心业务 绝大多数 MQ 场景、可靠的消息传递 实时通知、事件驱动架构

生产级演进与方案对比

当你的业务对消息队列的可靠性、高可用性有较高要求时,Redis MQ 可能会遇到瓶颈,这时需要考虑更专业的方案。

特性 Redis (List/Streams) RabbitMQ Apache Kafka
可靠性 中 (Streams 较好) 非常高 (ACK, 持久化, 事务) 非常高 (副本, 分区)
高可用 需要依赖 Redis Cluster 原生支持 (镜像队列) 原生支持 (多副本)
性能 高 (内存操作) 高 (单机/小集群) 极高 (分布式, 顺序写盘)
功能丰富度 基础 非常丰富 (路由, 死信, 延迟队列等) 非常丰富 (流处理, 连接器)
运维复杂度 低 (与 Redis 一同运维)
适用场景 缓存/会话旁路、轻量级MQ 企业应用、复杂路由、任务调度 大数据、日志收集、事件溯源

如何选择?

  1. 简单、快速、非核心: 使用 Redis List,比如一个后台的日志收集任务,丢了无所谓,实现起来最快。
  2. 需要基本可靠性、想用 Redis: 使用 Redis Streams,它提供了生产 MQ 所需的核心功能(持久化、ACK、消费者组),足以应对大部分中等规模的应用场景。
  3. 金融、电商等对可靠性要求极高: 选择 RabbitMQ,它的成熟度和功能完整性是 Redis 无法比拟的。
  4. 海量数据、高吞吐、日志/流处理: 选择 Kafka,它是分布式消息系统的王者,专为大数据场景设计。

在 Java 中使用 Redis 实现消息队列是一个灵活且高效的方案,尤其是在已经使用 Redis 的项目中。

  • 对于学习和快速原型: 从 List + LPUSH/RPOP 开始,理解其基本原理。
  • 对于生产环境: 强烈推荐使用 Redis Streams,它提供了接近专业 MQ 的核心功能,同时保持了 Redis 的简洁和高效。
  • 对于核心业务系统: 在评估 Redis Streams 是否能满足需求后,如果对可靠性、功能有更高要求,请务必考虑 RabbitMQKafka 这样的专业消息队列中间件。
分享:
扫描分享到社交APP
上一篇
下一篇