【黑马点评Redis——003优惠券秒杀4——消息队列Stream】

embedded/2024/9/24 21:10:56/

1. 目前还存在的问题

  • 设置的阻塞队列可能会超出最大长度
  • 系统重启会导致阻塞队列中的信息消失,可能会出现问题

2. 消息队列

  • 消息队列 (Message Queue)。
    • 字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色消息队列:存储和管理消息,也被称为消息代理 (Message Broker)
  • 生产者
    • 发送消息到消息队列
  • 消费者
    • 从消息队列获取消息并处理消息

在这里插入图片描述

  • Redis提供了三种不同的方式来实现消息队列
  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息队列
  • Stream:比较完善的消息队列模型

2.1 基于List结构模拟消息队列

在这里插入图片描述
优点

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点

  • 无法避免消息丢失(消息拿出来后没来得及处理就挂了,导致这条消息没有处理)
  • 只支持单消费者(一个人获取了这条消息,另一个人就不能获取了)

2.2 PubSub基本的点对点消息队列

在这里插入图片描述
优点

  • 采用发布订阅模型,支持多生产,多消费

缺点

  • 不支持数据持久化(服务一旦关闭就消失了)
  • 无法避免消息丢失(没人接收这条消息就丢了)
  • 消息堆积有上限,超出时消息丢失(缓存在客户端,有上限)

2.3 基于Stream的消息队列(重点)

Redis 中的 Stream 是一种在 Redis 5.0 版本引入的新数据类型,它专为实现高性能、高可靠性的消息队列和流式数据处理而设计。Stream 结构提供了一种有序、可持久化、可重复消费且支持多路写入与多消费者并行消费的消息存储模型。

写入命令XADD

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
  • key:消息队列的名称
  • NOMKSTREAM:如果队列不存在,是否自动创建队列。默认是自动创建
  • MAXLEN:最大长度,默认不设置上限
  • *|ID:消息的唯一id,*代表由Redis自动生成。
  • field value [field value …]:称为一个Entry,格式是多个key-value键值对
    在这里插入图片描述
    读取命令XREAD
    在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

特点

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

2.4 基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备以下特点:

  • 消息分流
    • 队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
  • 消息标识
    • 消费者者会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费。
  • 消息确认
    • 消费者获取消息后没消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除

2.4.1 创建消费者组

在这里插入图片描述

2.4.2 从消费者组读取消息

在这里插入图片描述

2.4.3 确认消息命令

将pending-list中的某个消息,标记为已处理。

XACK KEY GROUP ID

小结

  • 消息可回溯
  • 可以多消费者争抢消息,加快消息速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消息一次

2.4.4 XPENDING

XPENDING 是 Redis 5.0 引入的一项新命令,用于管理 Redis Streams 中的待处理消息。Redis Streams 是一个用于处理实时数据流的数据结构,而 XPENDING 则允许你查看、管理待处理消息的信息。
XPENDING 命令的一般语法如下:

XPENDING stream_name group_name [start end count] [consumer]

其中:
stream_name 是待处理消息所在的流的名称。
group_name 是消费者组的名称。
start 和 end 是两个可选参数,用于指定待处理消息的范围。
count 是一个可选参数,用于指定要返回的待处理消息的数量。
consumer 是一个可选参数,用于指定特定的消费者。

2.5 Java的伪代码

消费者监听消息的基本思路

while(true){//尝试监听队列,使用阻塞模式,最长等待2000msObejct msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAM s1 >");if(msg == null){// null说明没有消息,继续下一次continue;}try{//处理消息,完成后需要ACKhandleMessage(msg)}catch(Exception e){while(true){Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAM s1 0");if(msg == null){//null说明没有异常消息,所有消息都已确认,结束循环break;}try{//处理消息,完成后需要ACKhandleMessage(msg)}catch(Exception e){//再次出现异常,记录日志,继续循环continue;}}}
}

2.6 对比

在这里插入图片描述

3.代码优化目标

在这里插入图片描述

3.1 修改Lua脚本

-- 1.参数列表
-- 1.1. 优惠券id
local voucherId = ARGV[1]
-- 1.2. 用户id
local userId = ARGV[2]
-- 1.3. 订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足
if(tonumber(redis.call('get',stockKey))<=0)then-- 3.2.库存不足,返回1return 1
end-- 3.3.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call("sismember",orderKey,userId) == 1) then-- 3.4.存在,说明是重复下单return 2
end-- 3.5.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.6.下单(保存用户)sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 3.7.发送消息到队列中, XADD stream.order * K1 v1 K2 v2 ...
redis.call('xadd','stream.orders','*',"userId",userId,"voucherId",voucherId,"id",orderId)
return 0

3.2 修改Java端代码

    private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run(){while (true){try {// 1.获取消息队列中的订单信息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 2.判断消息获取是否成功if ( list==null || list.isEmpty()){// 2.1.如果获取失败,说明没有消息,继续下一次循环continue;}// 3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(record.getValue(), new VoucherOrder(), true);// 4.如果获取成功,可以下单handleVoucherOrder(voucherOrder);// 5.ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());}catch (Exception e){handlePendingList();log.error("处理订单异常:",e);}}}private void handlePendingList() {while (true){try {// 1.获取pending-list队列中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 2.判断消息获取是否成功if ( list==null || list.isEmpty()){// 2.1.如果获取失败,说明没有消息,结束循环break;}// 3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(record.getValue(), new VoucherOrder(), true);// 4.如果获取成功,可以下单handleVoucherOrder(voucherOrder);// 5.ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());}catch (Exception e){log.error("处理订单异常:",e);try {Thread.sleep(100);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}}

4. 总结

目前我们已经能够使用Stream消息队列来实现一个功能较为完全的秒杀功能。


http://www.ppmy.cn/embedded/12923.html

相关文章

基于快照行情的股票/基金 1分钟 K 线合成指南

1. 概述 由于不同交易所不同资产的交易规则是有差异的&#xff0c;导致不同交易所基于快照行情或逐笔成交合成不同资产1分钟 K 线的计算方法是不同的。 本教程旨在提高 DolphinDB 在具体业务场景下的落地效率&#xff0c;降低 DolphinDB 在实际业务使用中的开发难度。 本教程…

【vue2+antvx6】节点大小不一致,点击按钮流程图自动布局

需求&#xff1a; 1、点击优化布局的按钮&#xff0c;自动布局&#xff08;从左到右&#xff09;&#xff0c;按钮变成撤销布局按钮 2、点击撤销布局的按钮&#xff0c;返回之前的布局 3、在点击优化布局的按钮后&#xff0c;如果移动了节点&#xff0c;则自动将撤销布局的按…

VulnHub靶机 DC-5 打靶 渗透测试详情过程

VulnHub靶机 DC-5 打靶 详细渗透测试过程 目录 VulnHub靶机 DC-5 打靶 详细渗透测试过程一、将靶机导入到虚拟机当中二、渗透流程主机发现端口扫描目录爆破文件包含getshell反弹shell提权 一、将靶机导入到虚拟机当中 靶机地址&#xff1a; https://download.vulnhub.com/dc/…

异步线程与RabbitMQ应该如何选择?

异步线程&#xff08;Asynchronous Threading&#xff09; 定义与特点&#xff1a; 异步线程是一种编程技术&#xff0c;它允许程序在执行长时间操作&#xff08;如I/O操作、网络请求等&#xff09;时&#xff0c;不阻塞主执行线程。这可以通过多线程或使用语言特性&#xff0…

运行Java或Python的时候,Git是必要的吗?

在运行Java或Python代码时&#xff0c;Git并不是必需的&#xff0c;但它可以成为一个非常有用的工具&#xff0c;特别是在团队协作、版本控制和代码管理方面。 Git的作用和优势 版本控制&#xff1a; Git是一个分布式版本控制系统&#xff0c;可以跟踪文件的更改历史&#xff…

跨境电商自建外贸独立站的优势和必要性

跨境电商自建外贸独立站的优势和必要性主要体现在以下几个方面&#xff1a; 1. 品牌塑造&#xff1a;独立站允许商家自主设计网站&#xff0c;展示品牌形象和产品特色&#xff0c;提高品牌认知度和忠诚度。 2. 自主权和控制权&#xff1a;商家可以自主制定并执行网站运营规则…

2024五一杯数学建模B题思路分析

文章目录 1 赛题思路2 比赛日期和时间3 组织机构4 建模常见问题类型4.1 分类问题4.2 优化问题4.3 预测问题4.4 评价问题 5 建模资料 1 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 2 比赛日期和时间 报名截止时间&#xff1a;2024…

Prompt Engineering,提示工程

什么是提示工程&#xff1f; 提示工程也叫【指令工程】。 Prompt发送给大模型的指令。比如[讲个笑话]、[用Python编个贪吃蛇游戏]、[给男/女朋友写情书]等看起来简单&#xff0c;但上手简单精通难 [Propmpt]是AGI时代的[编程语言][Propmpt]是AGI时代的[软件工程][提示工程]是…