Redis消息队列 | 黑马点评

news/2024/11/17 1:59:39/

目录

一、认识消息队列

二、List模拟消息队列

三、PubSub的消息队列

四、Stream的消息队列(重点)

        1、单消费模式

        2、消费者组

五、redis三种消息队列对比 

 六、优化秒杀实战

1、创建消息队列

2、修改下单脚本

 3、接收消息处理


一、认识消息队列

消息队列,字面意思就存放消息的队列。最简单的消息队列模型包括3个角色:

消息队列:存储和管理消息,也被称为消息代理

生产者:发送消息到消息队列

消费者:从消息队列获取消息并处理消息

解决了jvm堵塞队列内存不足的问题,而且消息队列是可以持久化的,宕机了依然能够保存。

redis提供三种不同方式实现消息队列:

  • list结构:基于list结构模拟消息队列
  • PubSub:基于的点对点消息队列
  • Stream:比较完善的消息队列模型(推荐)

二、List模拟消息队列

redis的list结构是一个双向链表,很容易模拟出队列效果

队列是入口和出口不在一边,因此可以用LPUSH结合RPOP、或者RPUSH结合LPOP实现

但是,当队列没有消息时pop就会返回null,并不会jvm堵塞队列那样堵塞并等待消息,因此这里应该使用BRPOP或者BLPOP来实现堵塞队列。

缺点:

无法避免消息丢失。从消息队列取到消息,还没来得及处理就挂掉了,这个消息就消失了。

只支持单消费者。一个人拿走就从队列里面弹出了。

三、PubSub的消息队列

PubSub(发布订阅)是redis2.0版本引入的消息传递模型,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

支持多生产、多消费

缺点:

不支持数据持久化(刚刚的list本质是做存储的我们拿来当队列所以可以持久化)

无法避免消息丢失。

消息堆积有上限,超出时数据丢失。(缓存空间是有上限的)

四、Stream的消息队列(重点)

Stream是redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

1、单消费模式

特点:

  • 消息可回溯。不消失永久保存在队列里。
  • 一个消息可以被多个消费者读取。读完不消失的,可以多个读
  • 可以堵塞读取
  • 有消息漏读的风险

2、消费者组

消费者组(Consumer Group):将多个消费者划分到一个组,监听同一个队列。

 消费者监听消息的基本思路

stream类型消息队列的消费者组特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏镀的风险
  • 有消息确认机制,保证消息至少被消费一次

五、redis三种消息队列对比 

 六、优化秒杀实战

1、创建消息队列

创建一个stream类型的消息队列,名为stream.orders

2、修改下单脚本

修改之前秒杀下单lua脚本,认定有抢购资格后,直接向steam.orders中添加消息,内容包含voucher、userId、orderId

-- 优惠券id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]-- 库存key
local stockKey = "seckill:stock:"..voucherId
-- 订单key
local orderKey = "seckill:order:"..voucherId-- 判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) thenreturn 1
end-- 判断用户是否已经下过单
if(redis.call('sismember', orderKey, userId) == 1) thenreturn 2
end-- 扣减库存
redis.call('incrby', stockKey, -1)-- 将 userId 存入当前优惠券的 set 集合
redis.call('sadd', orderKey, userId)-- 将订单信息存入到消息队列中 xadd stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

 3、接收消息处理

项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

    /**** 创建线程池*/private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();/**** 容器启动时,便开始创建独立线程,从队列中读取数据,创建订单*/@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while(true){try {// 获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 判断订单信息是否为空if(list == null || list.isEmpty()){// 如果为 null,说明没有消息,继续下一次循环continue;}// 解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 创建订单createVoucherOrder(voucherOrder);// 确认消息 xack s1 g1 idstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {log.error("处理订单异常!", e);handlePendingList();}}}private void handlePendingList() {while(true){try {// 获取 pending-list 中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 判断订单信息是否为空if(list == null || list.isEmpty()){break;}// 解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 创建订单createVoucherOrder(voucherOrder);// 确认消息 xack s1 g1 idstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {log.error("处理订单异常!", e);try {Thread.sleep(100);} catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}}}}}

http://www.ppmy.cn/news/19628.html

相关文章

APT之木马静态免杀

前言 这篇文章主要是记录手动编写代码进行木马免杀&#xff0c;使用工具也可以免杀&#xff0c;只不过太脚本小子了&#xff0c;而且工具的特征也容易被杀软抓到&#xff0c;指不定哪天就用不了了&#xff0c;所以要学一下手动去免杀木马&#xff0c;也方便以后开发一个只属于…

游戏启动器:LaunchBox Premium with Big Box v13.1

LaunchBox知道您会喜欢的功能&#xff0c;具有风格的游戏启动器&#xff0c;我们最初将 Launchbox 构建为 DOSBox 的一个有吸引力的前端&#xff0c;但它现在拥有对现代游戏和复古游戏模拟的支持。我们让您的所有游戏看起来都很漂亮。 整理您的游戏收藏 我们不仅漂亮&#xff…

第五十六章 历史监视器 - 基本指标

文章目录第五十六章 历史监视器基本指标收集数据第五十六章 历史监视器 History Monitor 维护性能和系统使用指标的历史数据库。其主要目的是&#xff1a; 提供性能基准并帮助分析性能问题。帮助分析一段时间内的系统使用情况以进行容量规划。 该数据库在 SYS.History 类包中…

提权漏洞和域渗透历史漏洞整理

Windows提权在线辅助工具 https://i.hacking8.com/tiquan/&#x1f334;Kernel privilege escalation vulnerability collection, with compilation environment, demo GIF map, vulnerability details, executable file (提权漏洞合集) https://github.com/Ascotbe/Kernelhu…

Java---微服务---elasticsearch安装部署

elasticsearch安装部署1.部署单点es1.1.创建网络1.2.加载镜像1.3.运行2.部署kibana2.1.部署2.2.DevTools3.安装IK分词器3.1.在线安装ik插件&#xff08;较慢&#xff09;3.2.离线安装ik插件&#xff08;推荐&#xff09;1&#xff09;查看数据卷目录2&#xff09;下载并解压缩分…

机器学习(七):Azure机器学习模型搭建实验

文章目录 Azure机器学习模型搭建实验 前言 Azure平台简介 Azure机器学习实验 Azure机器学习模型搭建实验 前言 了解Azure机器学习平台&#xff0c;知道机器学习流程。 Azure平台简介 Azure Machine Learning&#xff08;简称“AML”&#xff09;是微软在其公有云Azure上推…

内网安全——代理技术Socks5网络通讯控制上线

目录 (一)前置知识 0x01 单机——防火墙之限制出入站 常见主机配置不出网的方式

leetcode647 回文子串

题目 给你一个字符串 s &#xff0c;请你统计并返回这个字符串中 回文子串 的数目。 回文字符串 是正着读和倒过来读一样的字符串。 子字符串 是字符串中的由连续字符组成的一个序列。 具有不同开始位置或结束位置的子串&#xff0c;即使是由相同的字符组成&#xff0c;也会…