消息队列模型角色

image-4.webp

消息队列

  • 本质定义:字面意思是存放消息的队列(Message Queue),英文简称MQ
  • 核心功能:
    • 存储和管理消息
    • 作为消息代理(Message Broker)承担更复杂的职责
  • 类比说明:
    • 快递公司模式:需要揽件、运输保管、投递等完整流程
    • 快递柜简化版:仅承担存储功能,实现收发分离

消费者

  • 解耦优势:
    • 生产者与消费者无需实时交互(如快递员与收件人)
    • 避免相互等待造成的效率低下(快递员等待收件人取件)
  • 异步特性:
    • 生产者投递消息后可立即返回
    • 消费者按自身节奏处理消息(随时取快递)

秒杀业务与消息队列

  • 业务场景:
    • 抢购资格验证通过后写入消息队列
    • 独立消费者线程异步完成数据库写入
  • 技术优势:
    • 业务解耦:抢单与下单逻辑分离
    • 性能提升:前端并发能力增强,后端数据库压力可控
  • 与传统阻塞队列区别:
    • 独立服务:不受JVM内存限制
    • 持久化保障:服务重启后消息不丢失
    • 确认机制:确保消息至少被消费一次

消息队列的分类

  • 主流产品:
    • RocketMQ
    • RabbitMQ
    • Kafka

Redis消息队列

  • 实现方式:
    • List结构:利用链表模拟阻塞队列
    • PubSub:基础发布订阅模型
    • Stream:5.0+版本提供的完善消息队列模型
  • 适用场景:
    • 已有Redis集群的小型企业
    • 避免额外搭建消息队列服务的场景

基于List实现消息队列

  • 实现原理:Redis的list数据结构是双向链表,通过LPUSH/RPOP或RPUSH/LPOP组合实现队列效果(入口和出口不在同侧)
  • 阻塞改进:使用BRPOP/BLPOP替代RPOP/LPOP,当队列为空时会阻塞等待(可设置超时时间),避免立即返回null

基于List的消息队列的优缺点

  • 三大优点:
    • 内存独立:不受JVM内存限制,存储上限取决于Redis配置
    • 数据安全:通过Redis持久化机制保证消息不丢失
    • 有序保证:严格遵循先进先出(FIFO)原则
  • 主要缺点:
    • 消息丢失风险:POP操作立即移除消息,若消费者崩溃则消息永久丢失
    • 单消费者限制:一条消息只能被一个消费者处理,不支持发布/订阅模式
    • 无确认机制:缺乏消息处理成功的确认反馈机制

基于PubSub的消息队列

  • 模型定义: Redis 2.0引入的消息传递模型,消费者可订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
  • 名称由来: “PubSub”是publish(发布)和subscribe(订阅)的缩写组合。

image-6.webp

优缺点

  • 核心优势:
    • 支持真正的发布/订阅模式
    • 实现多生产者多消费者架构
    • 灵活的消息路由机制(精确/模式匹配)
  • 主要缺陷:
    • 无持久化:消息未被订阅时立即丢失
    • 无重试机制:消费者异常导致消息不可恢复
    • 堆积限制:客户端缓冲区溢出会导致消息丢弃
    • 无状态记录:无法追踪消息投递状态
  • 适用场景:实时性要求高但允许消息丢失的广播场景

基于Stream的消息队列

  • 数据类型特性:Stream是Redis 5.0引入的全新数据类型,与String、List等基础类型同级,支持持久化存储
  • 设计定位:专门为消息队列场景设计,提供最完善的消息队列功能实现
  • 底层结构:采用树形结构存储消息ID,支持高效查询

发送信息

image-7.webp

读取信息

image-8.webp

image-9.webp

总结

  • 核心优势:
    • 完整的持久化支持
    • 多消费者组协同处理
    • 类似Kafka的消费组设计
  • 适用场景:需要高可靠性、消息回溯的队列场景
  • 局限性:单消费者模式下存在消息漏读风险

基于Stream的消息队列—消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

  1. 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
  2. 消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
  3. 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。

创建消费者组

  • 基本语法: XGROUP CREATE key groupName ID [MKSTREAM]
  • 参数说明:
    • 队列名称: 指定要创建消费者组的队列名称
    • 组名称: 自定义消费者组名称
    • 起始ID:
      • $代表从队列最后一个消息开始监听
      • 0代表从队列第一个消息开始监听
    • 自动创建选项: MKSTREAM参数可在队列不存在时自动创建队列
  • 使用建议:
    • 队列无消息或不需要消费历史消息时建议使用$
    • 需要重新消费已有消息时建议使用0
  • 相关命令:
    • 删除消费者组: XGROUP DESTORY key groupName
    • 添加消费者: XGROUP CREATECONSUMER key groupname consumername
    • 删除消费者: XGROUP DELCONSUMER key groupname consumername

从消费者组读取信息

  • 基本语法: XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key...] ID [ID...]
  • 参数详解:
    • 组与消费者: 必须指定组名和消费者名,不存在的消费者会自动创建
    • 数量控制: COUNT参数限制单次查询消息数量
    • 阻塞模式: BLOCK设置最长等待时间(毫秒),0表示非阻塞
    • 自动确认: NOACK参数使消息投递后自动确认(不推荐使用)
    • 特殊ID:
      • ”>”: 从下一个未消费的消息开始(推荐正常使用)
      • 其它: 根据指定id从pending-List中获取已消费但未确认的消息,例如0,是从pending-List中的第一个消息开始
  • 注意事项:
    • 处理完消息必须执行XACK确认
    • 未确认消息会进入pending-list
    • 避免使用NOACK参数,可能导致消息丢失

消费者监听消息基本思路

    • 核心逻辑:
      • 外层循环持续监听新消息(>模式)
      • 内层循环处理异常消息(0模式)
  • 实现要点:
    • 正常流程:
      • 阻塞式读取未消费消息
      • 成功处理后立即确认
    • 异常处理:
      • 捕获处理异常后进入pending-list处理循环
      • 持续尝试处理直到成功确认
      • 多次失败可记录日志人工介入
  • 保障机制:
    • 确保消息至少被消费一次
    • 防止消息因异常丢失
    • 支持异常消息重试机制
while(true){
    // 尝试监听队列,使用阻塞模式,最长等待 2000 毫秒
    Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
    if(msg == null){ // null说明没有消息,继续下一次
        continue;
    }
    try {
        // 处理消息,完成后一定要ACK
        handleMessage(msg);
    } catch(Exception e){
        while(true){
            Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
            if(msg == null){ // null说明没有异常消息,所有消息都已确认,结束循环
                break;
            }
            try {
                // 说明有异常消息,再次处理
                handleMessage(msg);
            } catch(Exception e){
                // 再次出现异常,记录日志,继续循环
                continue;
            }
        }
    }
}

特点

    • 消息回溯: 消息消费后不会从队列删除,不同消费者组可重复消费
  • 并发消费: 同一消费者组内多个消费者形成争抢关系,加快消费速度
  • 阻塞读取: 支持循环阻塞读取,避免while盲等
  • 防漏读机制: 通过消费标记记录位置,确保下次从正确位置继续读取
  • 消息确认: 提供ACK机制保证消息至少被消费一次
  • 持久化优势: 独立于JVM内存限制,支持Redis持久化机制
  • 堆积处理: 多消费者并发消费可有效避免消息堆积

总结

ListPubSubStream
消息持久化支持不支持支持
阻塞读取支持支持支持
消息堆积处理受限于内存空间,可以利用多消费者加快处理受限于消费者缓冲区受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制不支持不支持支持
消息回溯不支持不支持支持
  • List模式特点
    • 持久化支持: 基于Redis List数据结构原生支持持久化
    • 阻塞操作: 提供BLPOP/BRPOP等阻塞式读取命令
    • 堆积限制: 受限于List内存空间,但可通过多消费者加速处理
    • 机制缺失: 无消息确认机制,消费后立即移除;不支持消息回溯
  • PubSub模式特点
    • 持久化缺陷: 不支持持久化,服务重启会导致消息丢失
    • 通道模式: 本质是发布订阅通道,无存储能力
    • 缓冲限制: 消息仅暂存于消费者缓冲区,空间非常有限
    • 可靠性低: 无确认机制,消息无人订阅则直接丢弃
  • Stream模式综合优势
    • 功能完备性: 同时具备持久化、阻塞读取、确认机制、回溯等核心功能
    • 消费效率: 消费者组模式显著提升消费速度,有效防止堆积
    • 可靠性保障: ACK机制+PENDING LIST确保至少消费一次
    • 适用场景: 特别适合中小型企业对消息队列的基础需求
  • 专业队列对比
    • 持久化局限: 依赖Redis自身持久化,仍有数据丢失风险
    • 生产者确认: 缺乏生产者发送确认机制
    • 高级特性缺失: 不支持多消费者下的严格有序性等高级特性
    • 专业建议: 对可靠性要求高的场景建议使用RabbitMQ等专业队列

基于Stream消息队列实现异步秒杀

需求

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

创建消息队列

XGROUP CREATE stream.orders g1 0 MKSTREAM

修改lua

-- 1.参数列表
-- 1.1. 优惠券id
local voucherId = ARGV[1]
-- 1.2. 用户id
local userId = ARGV[2]
-- 1.3. 订单id
local orderId = ARGV[3]
 
-- 2.定义key
-- 2.1. 库存key
local stockKey = "seckill:stock:" .. voucherId
-- 2.2. 订单key
local orderKey = "seckill:order:" .. voucherId
 
-- 3.业务逻辑
-- 3.1. 判断库存是否充足
if (tonumber(redis.call("get", stockKey)) <= 0) then
    -- 库存不足
    return 1
end
-- 3.2. 判断用户是否重复下单
if (redis.call("sismember", orderKey, userId) == 1) then
    -- 用户重复下单
    return 2
end
-- 3.3. 扣减库存
redis.call("incrby", stockKey, -1)
-- 3.4. 下单
redis.call("sadd", orderKey, userId)
-- 3.5. 发送消息到队列中
redis.call("xadd", "stream.orders", "*", "userId", userId, "voucherId", voucherId, "Id", orderId)
return 0
 

业务代码实现

@Service  
@Slf4j  
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {  
  
    @Resource  
    private ISeckillVoucherService seckillVoucherService;  
    @Resource  
    private RedisIdWorker redisIdWorker;  
    @Resource  
    private StringRedisTemplate stringRedisTemplate;  
    @Resource  
    private RedissonClient redissonClient;  
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;  
  
    static {  
        SECKILL_SCRIPT = new DefaultRedisScript<>();  
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));  
        SECKILL_SCRIPT.setResultType(Long.class);  
    }  
  
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();  
  
    private IVoucherOrderService proxy;  
    private static final String QUENE_NAME = "stream.orders";  
  
    @PostConstruct  
    private void init() {  
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());  
    }  
  
    private class VoucherOrderHandler implements Runnable {  
  
  
        @Override  
        public void run() {  
            while (true) {  
                try {  
                    // 获取队列中的订单信息  
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(  
                            Consumer.from("g1", "c1"),  
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),  
                            StreamOffset.create(QUENE_NAME, ReadOffset.lastConsumed())  
                    );  
                    // 判断消息是否获取成功  
                    if (list == null || list.isEmpty()) {  
                        // 获取锁失败,说明没有消息,继续下一次循环  
                        continue;  
                    }  
                    // 获取成功,解析消息中的订单信息  
                    MapRecord<String, Object, Object> record = list.get(0);  
                    Map<Object, Object> values = record.getValue();  
                    VoucherOrder vouncherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);  
                    // 下单  
                    handleVoucherOrder(vouncherOrder);  
                    // ACK确认  
                    stringRedisTemplate.opsForStream().acknowledge(QUENE_NAME, "g1", record.getId());  
                } catch (Exception e) {  
                    log.error("处理订单异常", e);  
                    handlePendingList();  
                }  
            }  
        }  
    }  
  
    private void handlePendingList() {  
        while (true) {  
            try {  
                // 获取队列中的订单信息  
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(  
                        Consumer.from("g1", "c1"),  
                        StreamReadOptions.empty().count(1),  
                        StreamOffset.create(QUENE_NAME, ReadOffset.from("0"))  
                );  
                // 判断消息是否获取成功  
                if (list == null || list.isEmpty()) {  
                    // 获取锁失败,说明没有pending-list消息,结束循环  
                    break;  
                }  
                // 获取成功,解析消息中的订单信息  
                MapRecord<String, Object, Object> record = list.get(0);  
                Map<Object, Object> values = record.getValue();  
                VoucherOrder vouncherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);  
                // 下单  
                handleVoucherOrder(vouncherOrder);  
                // ACK确认  
                stringRedisTemplate.opsForStream().acknowledge(QUENE_NAME, "g1", record.getId());  
            } catch (Exception e) {  
                log.error("处理pending-list订单异常", e);  
                try {  
                    Thread.sleep(50);  
                } catch (InterruptedException ex) {  
                    ex.printStackTrace();  
                }  
            }  
        }  
    }  
  
    private void handleVoucherOrder(VoucherOrder voucherOrder) {  
        // 获取用户  
        Long userId = voucherOrder.getUserId();  
        // 创建锁对象  
        RLock lock = redissonClient.getLock("lock:order:" + userId);  
        // 获取锁  
        boolean isLock = lock.tryLock();  
        // 判断获取锁成功与否  
        if (!isLock) {  
            // 获取锁失败,返回错误或者重试  
            log.error("不允许重复下单");  
            return;  
        }  
        try {  
            proxy.createVoucherOrder(voucherOrder);  
        } catch (Exception e) {  
            throw new RuntimeException(e);  
        }  
    }  
  
  
    /**  
     * 秒杀优惠券  
     *  
     * @param voucherId  
     * @return  
     */    public Result seckillVoucher(Long voucherId) {  
        Long userId = UserHolder.getUser().getId();  
        long orderId = redisIdWorker.nextId("order");  
        // 执行lua脚本  
        Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,  
                Collections.emptyList(),  
                voucherId.toString(),  
                userId.toString(), String.valueOf(orderId));  
        // 判断结果是否为0  
        int r = result.intValue();  
        if (r != 0) {  
            // 不为0,代表没有购买资格  
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");  
        }  
        // 获取代理对象  
        proxy = (IVoucherOrderService) AopContext.currentProxy();  
        return Result.ok(orderId);  
    }  
  
    @Transactional  
    public void createVoucherOrder(VoucherOrder voucherOrder) {  
        // 一人一单  
        // 根据用户id和秒杀券id查询订单  
        Long userId = voucherOrder.getUserId();  
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();  
        if (count > 0) {  
            log.error("用户已经购买过一次了");  
            return;  
        }  
  
        // 扣减库存  
        boolean success = seckillVoucherService.update()  
                .setSql("stock=stock-1")  
                .eq("voucher_id", voucherOrder.getVoucherId())  
                .gt("stock", 0) // 乐观锁  
                .update();  
        if (!success) {  
            log.error("库存不足");  
            return;  
        }  
        // 保存订单  
        save(voucherOrder);  
    }  
}

测试抢优惠券后,可在redis中看到对应的消息

image-12.webp

主要代码调整

1. 技术架构调整
  • 从阻塞队列改为Redis Stream消息队列
    • 移除了原来的BlockingQueue<VoucherOrder> orderTasks阻塞队列实现
    • 引入了Redis Stream作为消息队列,使用stream.orders作为队列名
    • 增加了相关的Redis Stream操作依赖和类导入
2. 处理逻辑调整
  • 异步处理方式变更
    • 之前:从阻塞队列中取出订单任务直接处理
    • 现在:从Redis Stream中读取消息,解析后处理订单
    • 增加了消费者组和消费者标识(“g1”, “c1”)来确保消息的可靠处理
3. 消息可靠性增强
  • 引入Pending List机制
    • 新增handlePendingList()方法处理未确认的消息
    • 在主处理循环出现异常时调用该方法,确保消息不丢失
    • 使用ACK机制确认消息处理完成
4. Lua脚本调整
  • 增加订单ID参数
    • Lua脚本中新增orderId参数
    • 脚本执行时直接将订单信息写入Redis Stream队列

调整后的业务逻辑

1. 秒杀请求处理流程
  1. 用户发起秒杀请求
  2. 执行Lua脚本检查库存和重复下单
  3. Lua脚本中直接将订单信息发送到Redis Stream队列
  4. 立即返回订单ID给用户
2. 异步订单处理流程
  1. 后台线程持续监听Redis Stream队列
  2. 读取订单消息并解析为VoucherOrder对象
  3. 加入分布式锁防止重复处理
  4. 调用createVoucherOrder方法创建订单
  5. 成功处理后发送ACK确认消息
3. 异常处理机制
  1. 当处理订单出现异常时,会调用handlePendingList方法
  2. handlePendingList方法会重新处理未确认的消息
  3. 通过重试机制确保消息最终被处理

优势分析

  1. 更高的可靠性:Redis Stream提供持久化存储和ACK确认机制,避免消息丢失
  2. 更好的扩展性:支持消费者组,便于横向扩展处理能力
  3. 更强的一致性:通过Lua脚本保证检查和入队的原子性
  4. 更清晰的架构:将订单生成和订单处理完全解耦

这种调整使得系统在高并发场景下更加稳定可靠,同时具备更好的容错能力。