Redis message queue

Redis message queue

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括三个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)。
  • 生产者:负责发送消息到消息队列。
  • 消费者:负责从消息队列中获取消息并处理。

与阻塞队列的区别

  • 不使用JVM内存,因此不会出现内存溢出的问题。
  • 持久化:Redis支持将数据持久化到磁盘上,而JVM内存中的数据在程序退出后会丢失。
  • 确认机制:Redis支持消息的确认机制,可以确保消息被成功消费。

Redis消息队列的实现

  • list结构:基于List结构模拟消息队列。
  • PubSub:基本的点对点消息模型。
  • Stream:基于Stream结构实现的消息队列,支持持久化和消费确认机制。

基于List结构的消息队列

使用Redis的List结构实现消息队列,生产者使用LPUSH命令将消息推送到队列的左侧,消费者使用BRPOP命令从队列的右侧获取消息。BRPOP命令是阻塞式的,如果队列为空,消费者会等待直到有新消息到达。

缺点:

  • 消息丢失:如果消费者在处理消息时崩溃,消息会丢失。(BRPOP命令会删除消息)
  • 只支持单个消费者:如果有多个消费者同时消费同一个队列,可能会导致消息被多个消费者消费,造成重复消费。

基于PubSub的消息队列

消费者可以订阅一个或多个频道,生产者将消息发布到这些频道。消费者会接收到发布到它们订阅的频道的消息。

  • SUBSCRIBE channel [channel …]:订阅一个或多个频道。
  • PUBLISH channel message:将消息发布到指定频道。
  • PSUBSCRIBE pattern [pattern …]:订阅与pattern格式匹配的所有频道。

缺点:

  • 消息不持久化:如果消费者在订阅频道之前发布了消息,消费者将无法接收到这些消息。
  • 消息丢失:如果消费者在处理消息时崩溃,消息会丢失。
  • 消息堆积有上限:消息堆积有上限,超出时数据丢失。

基于Stream的消息队列

Stream是Redis 5.0引入的一种新的数据结构,支持持久化和消费确认机制。Stream可以看作是一个有序的消息列表,每个消息都有一个唯一的ID。
Stream支持多个消费者组,每个消费者组可以有多个消费者。每个消费者组可以独立地消费消息,确保消息不会被重复消费。

Stream的XREAD的缺点:

  • 消息可回溯
  • 一个消息可以被多个消费者消费
  • 可以阻塞读取
  • 有消息漏读的风险

消费者组

消费者组是Stream的一个重要特性,允许多个消费者共享同一个消息队列。每个消费者组都有一个唯一的名称,消费者组中的每个消费者都可以独立地消费消息。

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复的消费,从而加快消息处理的速度。
  • 消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者崩溃,消息也不会丢失。
  • 消息确认:消费者在处理完消息后,需要向Redis发送确认消息,表示该消息已经被成功处理。Redis会将该消息从消费者组的pending-list中删除。

三种Redis消息队列的对比

List PubSub Stream
消息持久化 支持 不支持 支持
阻塞读取 支持 支持 支持
消息堆积处理 受限于内存空间,可以利用多消费者加快处理 受限于消费者缓冲区 受限于队列长度,可以利用消费者组提高消费速度,减少消息堆积
消息确认 不支持 不支持 支持
消息回溯 不支持 不支持 支持

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private class VoucherOrderHandler implements Runnable {
String queueName = "stream.orders";
@Override
public void run(){
while (true) {
try {
// 1. 获取消息队列中的订单 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 stream.orders >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), // 消费组
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), // 阻塞2秒
StreamOffset.create(queueName, ReadOffset.lastConsumed()) // 从最后一个未读的消息开始读取
);
// 2. 判断消息获取是否成功
if (list == null || list.isEmpty()) {
// 如果获取失败 则继续循环
continue;
}
// 3. 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 4. 创建订单
handleVoucherOrder(voucherOrder);
// 5. ack确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}

private void handlePendingList() {
while (true) {
try {
// 1. 获取pending-list中的订单 XREADGROUP GROUP g1 c1 COUNT 1 stream.orders 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), // 消费组
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0")) // 从0开始读取
);
// 2. 判断消息获取是否成功
if (list == null || list.isEmpty()) {
// 如果获取失败 说明pending-list中没有异常消息 则结束循环
break;
}
// 3. 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 4. 创建订单
handleVoucherOrder(voucherOrder);
// 5. ack确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理pending-list中的订单异常", e);
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}

}
}
}

创建线程池,并在spring类初始化后立刻执行线程池中的任务。
run 方法中使用XREADGROUP命令从消息队列中获取消息,并解析消息,创建订单。
如果获取消息失败,则继续循环。
如果获取消息成功,则解析消息,创建订单,并使用SACK命令确认消息。
如果处理订单失败,则调用handlePendingList方法处理pending-list中的异常消息。
handlePendingList 基本上和run方法一致,只是从pending-list中获取消息。

总结

Redis消息队列是一个轻量级的消息队列实现,适用于简单的消息传递场景。对于复杂的消息传递场景,可以考虑使用其他成熟的消息队列解决方案,如Kafka、RabbitMQ等。