消息队列模型角色

消息队列
- 本质定义:字面意思是存放消息的队列(Message Queue),英文简称MQ
- 核心功能:
- 存储和管理消息
- 作为消息代理(Message Broker)承担更复杂的职责
- 类比说明:
- 快递公司模式:需要揽件、运输保管、投递等完整流程
- 快递柜简化版:仅承担存储功能,实现收发分离
消费者
- 解耦优势:
- 生产者与消费者无需实时交互(如快递员与收件人)
- 避免相互等待造成的效率低下(快递员等待收件人取件)
- 异步特性:
- 生产者投递消息后可立即返回
- 消费者按自身节奏处理消息(随时取快递)
秒杀业务与消息队列
- 业务场景:
- 抢购资格验证通过后写入消息队列
- 独立消费者线程异步完成数据库写入
- 技术优势:
- 业务解耦:抢单与下单逻辑分离
- 性能提升:前端并发能力增强,后端数据库压力可控
- 与传统阻塞队列区别:
- 独立服务:不受JVM内存限制
- 持久化保障:服务重启后消息不丢失
- 确认机制:确保消息至少被消费一次
消息队列的分类
- 主流产品:
- RocketMQ
- RabbitMQ
- Kafka
Redis消息队列
- 实现方式:
- List结构:利用链表模拟阻塞队列
- PubSub:基础发布订阅模型
- Stream:5.0+版本提供的完善消息队列模型
- 适用场景:
- 已有Redis集群的小型企业
- 避免额外搭建消息队列服务的场景
基于List实现消息队列
- 实现原理:Redis的list数据结构是双向链表,通过LPUSH/RPOP或RPUSH/LPOP组合实现队列效果(入口和出口不在同侧)
- 阻塞改进:使用BRPOP/BLPOP替代RPOP/LPOP,当队列为空时会阻塞等待(可设置超时时间),避免立即返回null
基于List的消息队列的优缺点
- 三大优点:
- 内存独立:不受JVM内存限制,存储上限取决于Redis配置
- 数据安全:通过Redis持久化机制保证消息不丢失
- 有序保证:严格遵循先进先出(FIFO)原则
- 主要缺点:
- 消息丢失风险:POP操作立即移除消息,若消费者崩溃则消息永久丢失
- 单消费者限制:一条消息只能被一个消费者处理,不支持发布/订阅模式
- 无确认机制:缺乏消息处理成功的确认反馈机制
基于PubSub的消息队列
- 模型定义: Redis 2.0引入的消息传递模型,消费者可订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
- 名称由来: “PubSub”是publish(发布)和subscribe(订阅)的缩写组合。

优缺点
- 核心优势:
- 支持真正的发布/订阅模式
- 实现多生产者多消费者架构
- 灵活的消息路由机制(精确/模式匹配)
- 主要缺陷:
- 无持久化:消息未被订阅时立即丢失
- 无重试机制:消费者异常导致消息不可恢复
- 堆积限制:客户端缓冲区溢出会导致消息丢弃
- 无状态记录:无法追踪消息投递状态
- 适用场景:实时性要求高但允许消息丢失的广播场景
基于Stream的消息队列
- 数据类型特性:Stream是Redis 5.0引入的全新数据类型,与String、List等基础类型同级,支持持久化存储
- 设计定位:专门为消息队列场景设计,提供最完善的消息队列功能实现
- 底层结构:采用树形结构存储消息ID,支持高效查询
发送信息

读取信息


总结
- 核心优势:
- 完整的持久化支持
- 多消费者组协同处理
- 类似Kafka的消费组设计
- 适用场景:需要高可靠性、消息回溯的队列场景
- 局限性:单消费者模式下存在消息漏读风险
基于Stream的消息队列—消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
- 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
- 消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
- 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。
创建消费者组
- 基本语法:
XGROUP CREATE key groupName ID [MKSTREAM] - 参数说明:
- 队列名称: 指定要创建消费者组的队列名称
- 组名称: 自定义消费者组名称
- 起始ID:
- $代表从队列最后一个消息开始监听
- 0代表从队列第一个消息开始监听
- 自动创建选项: MKSTREAM参数可在队列不存在时自动创建队列
- 使用建议:
- 队列无消息或不需要消费历史消息时建议使用$
- 需要重新消费已有消息时建议使用0
- 相关命令:
- 删除消费者组:
XGROUP DESTORY key groupName - 添加消费者:
XGROUP CREATECONSUMER key groupname consumername - 删除消费者:
XGROUP DELCONSUMER key groupname consumername
- 删除消费者组:
从消费者组读取信息
- 基本语法:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key...] ID [ID...] - 参数详解:
- 组与消费者: 必须指定组名和消费者名,不存在的消费者会自动创建
- 数量控制: COUNT参数限制单次查询消息数量
- 阻塞模式: BLOCK设置最长等待时间(毫秒),0表示非阻塞
- 自动确认: NOACK参数使消息投递后自动确认(不推荐使用)
- 特殊ID:
- ”>”: 从下一个未消费的消息开始(推荐正常使用)
- 其它: 根据指定id从pending-List中获取已消费但未确认的消息,例如0,是从pending-List中的第一个消息开始
- 注意事项:
- 处理完消息必须执行XACK确认
- 未确认消息会进入pending-list
- 避免使用NOACK参数,可能导致消息丢失
消费者监听消息基本思路
-
- 核心逻辑:
- 外层循环持续监听新消息(>模式)
- 内层循环处理异常消息(0模式)
- 核心逻辑:
- 实现要点:
- 正常流程:
- 阻塞式读取未消费消息
- 成功处理后立即确认
- 异常处理:
- 捕获处理异常后进入pending-list处理循环
- 持续尝试处理直到成功确认
- 多次失败可记录日志人工介入
- 正常流程:
- 保障机制:
- 确保消息至少被消费一次
- 防止消息因异常丢失
- 支持异常消息重试机制
while(true){
// 尝试监听队列,使用阻塞模式,最长等待 2000 毫秒
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
if(msg == null){ // null说明没有消息,继续下一次
continue;
}
try {
// 处理消息,完成后一定要ACK
handleMessage(msg);
} catch(Exception e){
while(true){
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if(msg == null){ // null说明没有异常消息,所有消息都已确认,结束循环
break;
}
try {
// 说明有异常消息,再次处理
handleMessage(msg);
} catch(Exception e){
// 再次出现异常,记录日志,继续循环
continue;
}
}
}
}特点
-
- 消息回溯: 消息消费后不会从队列删除,不同消费者组可重复消费
- 并发消费: 同一消费者组内多个消费者形成争抢关系,加快消费速度
- 阻塞读取: 支持循环阻塞读取,避免while盲等
- 防漏读机制: 通过消费标记记录位置,确保下次从正确位置继续读取
- 消息确认: 提供ACK机制保证消息至少被消费一次
- 持久化优势: 独立于JVM内存限制,支持Redis持久化机制
- 堆积处理: 多消费者并发消费可有效避免消息堆积
总结
| List | PubSub | Stream | |
|---|---|---|---|
| 消息持久化 | 支持 | 不支持 | 支持 |
| 阻塞读取 | 支持 | 支持 | 支持 |
| 消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少堆积 |
| 消息确认机制 | 不支持 | 不支持 | 支持 |
| 消息回溯 | 不支持 | 不支持 | 支持 |
- List模式特点
- 持久化支持: 基于Redis List数据结构原生支持持久化
- 阻塞操作: 提供BLPOP/BRPOP等阻塞式读取命令
- 堆积限制: 受限于List内存空间,但可通过多消费者加速处理
- 机制缺失: 无消息确认机制,消费后立即移除;不支持消息回溯
- PubSub模式特点
- 持久化缺陷: 不支持持久化,服务重启会导致消息丢失
- 通道模式: 本质是发布订阅通道,无存储能力
- 缓冲限制: 消息仅暂存于消费者缓冲区,空间非常有限
- 可靠性低: 无确认机制,消息无人订阅则直接丢弃
- Stream模式综合优势
- 功能完备性: 同时具备持久化、阻塞读取、确认机制、回溯等核心功能
- 消费效率: 消费者组模式显著提升消费速度,有效防止堆积
- 可靠性保障: ACK机制+PENDING LIST确保至少消费一次
- 适用场景: 特别适合中小型企业对消息队列的基础需求
- 专业队列对比
- 持久化局限: 依赖Redis自身持久化,仍有数据丢失风险
- 生产者确认: 缺乏生产者发送确认机制
- 高级特性缺失: 不支持多消费者下的严格有序性等高级特性
- 专业建议: 对可靠性要求高的场景建议使用RabbitMQ等专业队列
基于Stream消息队列实现异步秒杀
需求
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
创建消息队列
XGROUP CREATE stream.orders g1 0 MKSTREAM修改lua
-- 1.参数列表
-- 1.1. 优惠券id
local voucherId = ARGV[1]
-- 1.2. 用户id
local userId = ARGV[2]
-- 1.3. 订单id
local orderId = ARGV[3]
-- 2.定义key
-- 2.1. 库存key
local stockKey = "seckill:stock:" .. voucherId
-- 2.2. 订单key
local orderKey = "seckill:order:" .. voucherId
-- 3.业务逻辑
-- 3.1. 判断库存是否充足
if (tonumber(redis.call("get", stockKey)) <= 0) then
-- 库存不足
return 1
end
-- 3.2. 判断用户是否重复下单
if (redis.call("sismember", orderKey, userId) == 1) then
-- 用户重复下单
return 2
end
-- 3.3. 扣减库存
redis.call("incrby", stockKey, -1)
-- 3.4. 下单
redis.call("sadd", orderKey, userId)
-- 3.5. 发送消息到队列中
redis.call("xadd", "stream.orders", "*", "userId", userId, "voucherId", voucherId, "Id", orderId)
return 0
业务代码实现
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
private IVoucherOrderService proxy;
private static final String QUENE_NAME = "stream.orders";
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 获取队列中的订单信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(QUENE_NAME, ReadOffset.lastConsumed())
);
// 判断消息是否获取成功
if (list == null || list.isEmpty()) {
// 获取锁失败,说明没有消息,继续下一次循环
continue;
}
// 获取成功,解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder vouncherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 下单
handleVoucherOrder(vouncherOrder);
// ACK确认
stringRedisTemplate.opsForStream().acknowledge(QUENE_NAME, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}
}
private void handlePendingList() {
while (true) {
try {
// 获取队列中的订单信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(QUENE_NAME, ReadOffset.from("0"))
);
// 判断消息是否获取成功
if (list == null || list.isEmpty()) {
// 获取锁失败,说明没有pending-list消息,结束循环
break;
}
// 获取成功,解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder vouncherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 下单
handleVoucherOrder(vouncherOrder);
// ACK确认
stringRedisTemplate.opsForStream().acknowledge(QUENE_NAME, "g1", record.getId());
} catch (Exception e) {
log.error("处理pending-list订单异常", e);
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// 获取用户
Long userId = voucherOrder.getUserId();
// 创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 获取锁
boolean isLock = lock.tryLock();
// 判断获取锁成功与否
if (!isLock) {
// 获取锁失败,返回错误或者重试
log.error("不允许重复下单");
return;
}
try {
proxy.createVoucherOrder(voucherOrder);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 秒杀优惠券
*
* @param voucherId
* @return
*/ public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 执行lua脚本
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(), String.valueOf(orderId));
// 判断结果是否为0
int r = result.intValue();
if (r != 0) {
// 不为0,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 一人一单
// 根据用户id和秒杀券id查询订单
Long userId = voucherOrder.getUserId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
if (count > 0) {
log.error("用户已经购买过一次了");
return;
}
// 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock=stock-1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0) // 乐观锁
.update();
if (!success) {
log.error("库存不足");
return;
}
// 保存订单
save(voucherOrder);
}
}测试抢优惠券后,可在redis中看到对应的消息

主要代码调整
1. 技术架构调整
- 从阻塞队列改为Redis Stream消息队列:
- 移除了原来的
BlockingQueue<VoucherOrder> orderTasks阻塞队列实现 - 引入了Redis Stream作为消息队列,使用
stream.orders作为队列名 - 增加了相关的Redis Stream操作依赖和类导入
- 移除了原来的
2. 处理逻辑调整
- 异步处理方式变更:
- 之前:从阻塞队列中取出订单任务直接处理
- 现在:从Redis Stream中读取消息,解析后处理订单
- 增加了消费者组和消费者标识(“g1”, “c1”)来确保消息的可靠处理
3. 消息可靠性增强
- 引入Pending List机制:
- 新增handlePendingList()方法处理未确认的消息
- 在主处理循环出现异常时调用该方法,确保消息不丢失
- 使用ACK机制确认消息处理完成
4. Lua脚本调整
- 增加订单ID参数:
- Lua脚本中新增
orderId参数 - 脚本执行时直接将订单信息写入Redis Stream队列
- Lua脚本中新增
调整后的业务逻辑
1. 秒杀请求处理流程
- 用户发起秒杀请求
- 执行Lua脚本检查库存和重复下单
- Lua脚本中直接将订单信息发送到Redis Stream队列
- 立即返回订单ID给用户
2. 异步订单处理流程
- 后台线程持续监听Redis Stream队列
- 读取订单消息并解析为VoucherOrder对象
- 加入分布式锁防止重复处理
- 调用createVoucherOrder方法创建订单
- 成功处理后发送ACK确认消息
3. 异常处理机制
- 当处理订单出现异常时,会调用handlePendingList方法
- handlePendingList方法会重新处理未确认的消息
- 通过重试机制确保消息最终被处理
优势分析
- 更高的可靠性:Redis Stream提供持久化存储和ACK确认机制,避免消息丢失
- 更好的扩展性:支持消费者组,便于横向扩展处理能力
- 更强的一致性:通过Lua脚本保证检查和入队的原子性
- 更清晰的架构:将订单生成和订单处理完全解耦
这种调整使得系统在高并发场景下更加稳定可靠,同时具备更好的容错能力。