Redis message queue
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括三个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)。
- 生产者:负责发送消息到消息队列。
- 消费者:负责从消息队列中获取消息并处理。
与阻塞队列的区别
- 不使用JVM内存,因此不会出现内存溢出的问题。
- 持久化:Redis支持将数据持久化到磁盘上,而JVM内存中的数据在程序退出后会丢失。
- 确认机制:Redis支持消息的确认机制,可以确保消息被成功消费。
Redis消息队列的实现
- list结构:基于List结构模拟消息队列。
- PubSub:基本的点对点消息模型。
- Stream:基于Stream结构实现的消息队列,支持持久化和消费确认机制。
基于List结构的消息队列
使用Redis的List结构实现消息队列,生产者使用LPUSH
命令将消息推送到队列的左侧,消费者使用BRPOP
命令从队列的右侧获取消息。BRPOP
命令是阻塞式的,如果队列为空,消费者会等待直到有新消息到达。
缺点:
- 消息丢失:如果消费者在处理消息时崩溃,消息会丢失。(
BRPOP
命令会删除消息) - 只支持单个消费者:如果有多个消费者同时消费同一个队列,可能会导致消息被多个消费者消费,造成重复消费。
基于PubSub的消息队列
消费者可以订阅一个或多个频道,生产者将消息发布到这些频道。消费者会接收到发布到它们订阅的频道的消息。
- SUBSCRIBE channel [channel …]:订阅一个或多个频道。
- PUBLISH channel message:将消息发布到指定频道。
- PSUBSCRIBE pattern [pattern …]:订阅与pattern格式匹配的所有频道。
缺点:
- 消息不持久化:如果消费者在订阅频道之前发布了消息,消费者将无法接收到这些消息。
- 消息丢失:如果消费者在处理消息时崩溃,消息会丢失。
- 消息堆积有上限:消息堆积有上限,超出时数据丢失。
基于Stream的消息队列
Stream是Redis 5.0引入的一种新的数据结构,支持持久化和消费确认机制。Stream可以看作是一个有序的消息列表,每个消息都有一个唯一的ID。
Stream支持多个消费者组,每个消费者组可以有多个消费者。每个消费者组可以独立地消费消息,确保消息不会被重复消费。
Stream的XREAD的缺点:
- 消息可回溯
- 一个消息可以被多个消费者消费
- 可以阻塞读取
- 有消息漏读的风险
消费者组
消费者组是Stream的一个重要特性,允许多个消费者共享同一个消息队列。每个消费者组都有一个唯一的名称,消费者组中的每个消费者都可以独立地消费消息。
- 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复的消费,从而加快消息处理的速度。
- 消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者崩溃,消息也不会丢失。
- 消息确认:消费者在处理完消息后,需要向Redis发送确认消息,表示该消息已经被成功处理。Redis会将该消息从消费者组的pending-list中删除。
三种Redis消息队列的对比
List | PubSub | Stream | |
---|---|---|---|
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少消息堆积 |
消息确认 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
代码实现
1 | private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); |
创建线程池,并在spring类初始化后立刻执行线程池中的任务。run
方法中使用XREADGROUP
命令从消息队列中获取消息,并解析消息,创建订单。
如果获取消息失败,则继续循环。
如果获取消息成功,则解析消息,创建订单,并使用SACK
命令确认消息。
如果处理订单失败,则调用handlePendingList
方法处理pending-list中的异常消息。handlePendingList
基本上和run
方法一致,只是从pending-list中获取消息。
总结
Redis消息队列是一个轻量级的消息队列实现,适用于简单的消息传递场景。对于复杂的消息传递场景,可以考虑使用其他成熟的消息队列解决方案,如Kafka、RabbitMQ等。