2022黑马Redis跟学笔记.实战篇(四)

news/2024/11/8 15:14:30/

2022黑马Redis跟学笔记.实战篇 四

    • 4.3.秒杀优惠券功能
      • 4.3.1.秒杀优惠券的基本实现
        • 一、优惠卷秒杀
          • 1.1 全局唯一ID
          • 1.2 Redis实现全局唯一Id
          • 1.3 添加优惠卷
          • 1.4 实现秒杀下单
      • 4.3.2.超卖问题
      • 4.3.3.基于乐观锁解决超卖问题
        • 1. 悲观锁
        • 2. 乐观锁
        • 3. 乐观锁解决超卖问题
    • 4.4 秒杀的一人一单限制功能
      • 4.4.1 实现秒杀的一人一单限制
        • 优惠券秒杀一人一单
      • 4.4.2.单机模式下的线程安全问题
      • 4.4.3 集群模式下的线程安全问题
      • 4.4.4 分布式锁
        • 4.4.4.1 分布式锁原理
        • 4.4.4.2 Redis的String结构实现分布式锁
          • 4.4.4.2.1 实现分布式锁版本一
        • 4.4.4.3 锁误删问题
          • 1.Redis分布式锁误删情况说明
          • 2. 解决Redis分布式锁误删问题
        • 4.4.4.4 分布式锁的原子性操作问题
        • 4.4.4.5 Lua脚本解决原子性问题
          • 1. 利用Java代码调用Lua脚本改造分布式锁
        • 4.4.4.6 Redission分布式锁
          • 1.分布式锁 - redission功能介绍
          • 2. 分布式锁-Redission快速入门
        • 4.4.4.7 Hash结构解决锁的可重入问题
          • 分布式锁-redission可重入锁原理
        • 4.4.4.8 watchDog解决锁超时释放问题
          • 1.分布式锁-redission锁重试和WatchDog机制
          • 2. 分布式锁-redission锁的MutiLock原理

在这里插入图片描述

4.3.秒杀优惠券功能

4.3.1.秒杀优惠券的基本实现

一、优惠卷秒杀

1.1 全局唯一ID

每个店铺都可以发布优惠券:
在这里插入图片描述

当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显
  • 受单表数据量的限制

在这里插入图片描述

场景分析:如果我们的id具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。

场景分析二:随着我们商城规模越来越大,mysql的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
在这里插入图片描述
Redis都能实现以上5点
唯一性:想到了关键字incrby
高可用:集群方案、主从方案、哨兵方案
高性能:内存存储性能好
递增:采用递增
安全性:自增然后再拼接一些其它信息,让规律不要那么明显

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
在这里插入图片描述

ID的组成部分:符号位:1bit,永远为0

时间戳:31bit,以秒为单位,可以使用69年

序列号:32bit,秒内的计数器,支持每秒产生232个不同ID

1.2 Redis实现全局唯一Id

新建类RedisWorker.java
在这里插入图片描述
编写测试的主方法,查看设置的时间2023年1月1日0点0分,距离现在的时间偏移量

public static void main(String[] args) {// 设置初始时间2023年1月1日 0点0分0秒LocalDateTime time = LocalDateTime.of(2023, 1, 1, 0, 0, 0);// 该方法将此本地时间与作为参数传递的指定日期和偏移量相结合,以计算epoch-second值long second = time.toEpochSecond(ZoneOffset.UTC);System.out.println("second:" + second);}

运行主方法
在这里插入图片描述
得到这个second之后,就作为常量,当作初始时间戳。
修改RedisWorker.java

@Component
public class RedisWorker {/*** 把距离当前时间的偏移量作为时间戳*/private static final long BEGIN_TIMESTAMP = 1672531200L;/*** 序列号的长度(位数)*/private static final int COUNT_BITS = 32;@Resourceprivate StringRedisTemplate stringRedisTemplate;public long nextID(String keyPrefix) {// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1 获取当前日期,精确到天,保证一天生成一个key// 2.2 自增长String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));Long sequence = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);//3.拼接返回long id = (timestamp << COUNT_BITS) | sequence;return id;}}

测试类

知识小贴士:关于countdownlatch

countdownlatch名为信号枪:主要的作用是同步协调在多线程的等待于唤醒问题

我们如果没有CountDownLatch ,那么由于程序是异步的,当异步程序没有执行完时,主线程就已经执行完了,然后我们期望的是分线程全部走完之后,主线程再走,所以我们此时需要使用到CountDownLatch。

CountDownLatch 中有两个最重要的方法:

  • 1、countDown
  • 2、await

await 方法 是阻塞方法,我们担心分线程没有执行完时,main线程就先执行,所以使用await可以让main线程阻塞,那么什么时候main线程不再阻塞呢?当CountDownLatch 内部维护的 变量变为0时,就不再阻塞,直接放行,那么什么时候CountDownLatch 维护的变量变为0 呢,我们只需要调用一次countDown ,内部变量就减少1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,当分线程全部走完,CountDownLatch 维护的变量就是0,此时await就不再阻塞,统计出来的时间也就是所有分线程执行完后的时间。

修改CommentApplicationTests.java

@Testpublic void getId() throws InterruptedException {ExecutorService pools = CacheClient.newFixedThreadPool(500);// 程序计数器 设置的数量和循环数量一致CountDownLatch latch = new CountDownLatch(300);Runnable runnable = new Runnable() {@Overridepublic void run() {for (int i = 0; i < 100; i++) {long id = redisWorker.nextID("order");System.out.println("id:" + id);}// 每一个线程跑完,就剪掉一次计数(倒计时)latch.countDown();}};runnable.run();long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {pools.submit(runnable);}latch.await();long end = System.currentTimeMillis();System.out.println("总时间是:" + (end - begin));}

运行测试类查看结果,没有重复的ID
在这里插入图片描述
这些是10进制的,我们粘贴到科学计算器中用二进制看一下是64位的
在这里插入图片描述
看一下Redis,自增到30100
在这里插入图片描述

1.3 添加优惠卷

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:

在这里插入图片描述
tb_voucher:优惠券的基本信息,优惠金额、使用规则等
tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息

平价卷由于优惠力度并不是很大,所以是可以任意领取

而代金券由于优惠力度大,所以像第二种卷,就得限制数量,从表结构上也能看出,特价卷除了具有优惠卷的基本信息以外,还具有库存,抢购时间,结束时间等等字段

新增普通卷代码:
在PostMan中加入如下语句

{"shopId":2,"title":"50元代金券","subTitle":"周一至周日均可使用","rules":"每日特惠\\n无需预约\\n可以无限叠加\\不兑现、不找零\\n仅限堂食","payValue":4700,"actualValue":5000,"type":0,"stock":200,"beginTime":"2023-01-01T09:00:00","endTime":"2023-03-01T12:00:00"
}

添加成功
在这里插入图片描述
看数据库
在这里插入图片描述

新增秒杀卷代码:
在这里插入图片描述
用PostMan添加数据
在这里插入图片描述
json如下:

{"shopId":2,"title":"100元代金券","subTitle":"每天都可以使用","rules":"兔年特惠\\n无需预约\\n可以无限叠加\\不兑现、不找零\\n仅限堂食","payValue":8000,"actualValue":10000,"type":1,"stock":100,"beginTime":"2023-01-01T09:00:00","endTime":"2023-03-01T12:00:00"
}

点击send后
在这里插入图片描述
数据库中
在这里插入图片描述
登录后点击抢购优惠券
在这里插入图片描述
打开开发者工具,可以发现优惠券ID拼接在最后,POST请求
在这里插入图片描述

地址

http://localhost:8080/api/voucher-order/seckill/2
1.4 实现秒杀下单

下单核心思路:当我们点击抢购时,会触发右侧的请求,我们只需要编写对应的controller即可。
在这里插入图片描述
秒杀下单应该思考的内容:

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

下单核心逻辑分析:

当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件。

比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。
在这里插入图片描述

修改VoucherOrderController.java

@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {@Autowiredprivate VoucherOrderServiceImpl voucherOrderService;@PostMapping("seckill/{id}")public Result seckillVoucher(@PathVariable("id") Long voucherId) {return voucherOrderService.seckillVoucher(voucherId);}
}

修改接口IVoucherOrderService.java

public interface IVoucherOrderService extends IService<VoucherOrder> {Result seckillVoucher(Long voucherId);
}

修改VoucherOrderServiceImpl.java

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Autowiredprivate ISeckillVoucherService iSeckillVoucherService;@Autowiredprivate RedisWorker redisWorker;@Override@Transactionalpublic Result seckillVoucher(Long voucherId) {// 1.查询秒杀优惠券信息// select * from tb_seckill_voucher where voucher_id = ?SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始LocalDateTime beginTime = seckillVoucher.getBeginTime();LocalDateTime endTime = seckillVoucher.getEndTime();LocalDateTime now = LocalDateTime.now();if (now.isBefore(beginTime)) {// 当前时间早于秒杀开始时间,说明秒杀没有开始return Result.fail("秒杀尚未开始,请耐心等待!秒杀开始时间:" + beginTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}// 3.判断秒杀是否已经结束if (now.isAfter(endTime)) {// 当前时间晚于秒杀结束时间,说明秒杀结束了return Result.fail("秒杀已经结束,感谢支持!");}// 4.判断库存是否充足Integer stock = seckillVoucher.getStock();if (stock <= 0) {return Result.fail("商品已经售罄!");}// 5.扣减库存boolean result = iSeckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).update();if (!result) {return Result.fail("商品已经售罄!");}// 6.创建订单/*** 获取订单id*/long orderId = redisWorker.nextID("order");VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(orderId);voucherOrder.setVoucherId(voucherId);voucherOrder.setUserId(UserHolder.getUser().getId());// 将订单信息保存到数据库// insert into tb_voucher_order values ()save(voucherOrder);// 7.返回订单idreturn Result.ok(orderId);}
}

重启应用,为了防止后面一直登录用户费时间,我们把token设置成永久
在这里插入图片描述
设置TTL
在这里插入图片描述
点击限时抢购
在这里插入图片描述
显示抢购成功,然后我们去数据库看一下,库存扣减成功
在这里插入图片描述
订单增加了1条
在这里插入图片描述

4.3.2.超卖问题

为了方便观察,更改数据库的库存

UPDATE tb_seckill_voucher SET stock = 100 WHERE voucher_id = 2;

清空订单数据

DELETE FROM tb_voucher_order;

打开JMeter,配置相关操作模拟多线程操作
在这里插入图片描述
配置Http请求
在这里插入图片描述
配置HTTP信息头管理器
在这里插入图片描述
配置断言
在这里插入图片描述

配置完成后点击启动
在这里插入图片描述
查看结果树
在这里插入图片描述
看聚合报告
在这里插入图片描述

看数据库,发现产生了109个订单
在这里插入图片描述
看秒杀优惠券库存出现超卖问题
在这里插入图片描述

有关超卖问题分析:在我们原有代码中是这么写的
在这里插入图片描述

假设线程1过来查询库存,判断出来库存大于0,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于0,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。

在这里插入图片描述

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:见下图:

在这里插入图片描述

4.3.3.基于乐观锁解决超卖问题

1. 悲观锁

悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等。

2. 乐观锁

乐观锁:会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过,当然乐观锁还有一些变种的处理方式比如cas:

乐观锁的典型代表:就是cas,利用cas进行无锁化机制加锁,var5 是操作前读取的内存值,while中的var1+var2 是预估值,如果预估值 == 内存值,则代表中间没有被人修改过,此时就将新值去替换内存值。

其中do while 是为了在操作失败时,再次进行自旋操作,即把之前的逻辑再操作一次。
在这里插入图片描述
Unsafe.class

public final long getAndAddLong(Object var1, long var2, long var4) {long var5;do {var6 = this.getLongVolatile(var1, var2);} while(!this.compareAndSwapLong(var1, var2, var5, var5+ var4));return var5;}

课程中的使用方式:

课程中的使用方式是没有像cas一样带自旋的操作,也没有对version的版本号+1 ,他的操作逻辑是在操作时,对版本号进行+1 操作,然后要求version 如果是1 的情况下,才能操作,那么第一个线程在操作后,数据库中的version变成了2,但是他自己满足version=1 ,所以没有问题,此时线程2执行,线程2 最后也需要加上条件version =1 ,但是现在由于线程1已经操作过了,所以线程2,操作时就不满足version=1 的条件了,所以线程2执行失败。
在这里插入图片描述
这里分析其实版本号和stock是异曲同工之妙,看stock库存剩余量即可,无需添加version的版本信息,简化表的修改。

3. 乐观锁解决超卖问题

修改代码方案一、

VoucherOrderServiceImpl 在扣减库存时,改为:
在这里插入图片描述
代码如下:VoucherOrderServiceImpl.java

        // 5.2扣减库存(针对超卖问题用乐观锁CAS解决)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ? and stock = ?
boolean result = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", stock).update();

然后重置数据库数据

UPDATE tb_seckill_voucher SET stock = 100 WHERE voucher_id = 2;DELETE FROM tb_voucher_order;

重庆程序,打开JMeter再测试一下
查看结果树,很多不成功
在这里插入图片描述
查看报告,异常比例也很高
在这里插入图片描述
看一下数据库,只有21个订单
在这里插入图片描述
看一下库存,并没有超卖
在这里插入图片描述

以上逻辑的核心含义是:只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败。

修改代码方案二、

之前的方式要修改前后都保持一致,但是这样我们分析过,成功的概率太低,所以我们的乐观锁需要变一下,改成stock大于0 即可。
在这里插入图片描述

修改VoucherOrderServiceImpl.java

       // 5.3扣减库存(针对使用乐观锁CAS,没卖完解决)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ? and stock > 0boolean result = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();

恢复数据库的库存和订单

UPDATE tb_seckill_voucher SET stock = 100 WHERE voucher_id = 2;DELETE FROM tb_voucher_order;

再启动JMeter,查看吞吐量如下:
在这里插入图片描述
查看请求
在这里插入图片描述
再看下数据库
订单100单,无误
在这里插入图片描述
库存0无误
在这里插入图片描述
知识小扩展:

针对cas中的自旋压力过大,我们可以使用Longaddr这个类去解决
Java8 提供的一个对AtomicLong改进后的一个类,LongAdder
大量线程并发更新一个原子性的时候,天然的问题就是自旋,会导致并发性问题,当然这也比我们直接使用syn来的好
所以利用这么一个类,LongAdder来进行优化
如果获取某个值,则会对cell和base的值进行递增,最后返回一个完整的值。

在这里插入图片描述

在这里插入图片描述

4.4 秒杀的一人一单限制功能

4.4.1 实现秒杀的一人一单限制

优惠券秒杀一人一单

目前的模式是1个用户可以买多单,这样不利于店家的推广。
在这里插入图片描述

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单。

现在的问题在于:

优惠卷是为了引流,但是目前的情况是,一个人可以无限制的抢这个优惠卷,所以我们应当增加一层逻辑,让一个用户只能下一个单,而不是让一个用户下多个单

具体操作逻辑如下:比如时间是否充足,如果时间充足,则进一步判断库存是否足够,然后再根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单。
在这里插入图片描述

初步代码:增加一人一单逻辑
在这里插入图片描述

修改VoucherOrderServiceImpl 添加逻辑

 Long userID = UserHolder.getUser().getId();// 5.实现1人1单加入逻辑:根据优惠券id和用户id查询订单// 5.1查询订单,并不用查询出具体的值,而是查询出数量即可Integer count = query().eq("user_id", userID).eq("voucher_id", voucherId).count();// 5.2判断订单是否存在if (count > 0) {// 5.2.1 存在就返回异常结果return Result.fail("秒杀优惠券每人限购1张,感谢配合,本优惠券最终解释权归ty公司所有!");}

清理数据库

 UPDATE tb_seckill_voucher SET stock = 100 WHERE voucher_id = 2;DELETE FROM tb_voucher_order;

重启应用测试一下
配置一下JMeter,token只配置了1个,按理来说可以控制1个用户只下1单
在这里插入图片描述
查看结果
在这里插入图片描述
结果如下:发现同1人还是可以下8单
在这里插入图片描述

4.4.2.单机模式下的线程安全问题

存在问题:现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作。

**注意:**在这里提到了非常多的问题,我们需要慢慢的来思考,首先我们的初始方案是封装了一个createVoucherOrder方法,同时为了确保他线程安全,在方法上添加了一把synchronized 锁。
在这里插入图片描述

修改VoucherOrderServiceImpl.java

 @Transactionalpublic synchronized Result createVoucherOrder(Long voucherId) {Long userID = UserHolder.getUser().getId();// 5.实现1人1单加入逻辑:根据优惠券id和用户id查询订单// 5.1查询订单,并不用查询出具体的值,而是查询出数量即可Integer count = query().eq("user_id", userID).eq("voucher_id", voucherId).count();// 5.2判断订单是否存在if (count > 0) {// 5.2.1 存在就返回异常结果return Result.fail("秒杀优惠券每人限购1张,感谢配合,本优惠券最终解释权归ty公司所有!");}// 5.2.2 不存在再减少库存// 6.1扣减库存(会出现超卖问题)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ?/*boolean result = iSeckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).update();*/// 6.2扣减库存(针对超卖问题用乐观锁CAS解决)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ? and stock = ?/*boolean result = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", stock).update();*/// 6.3扣减库存(针对使用乐观锁CAS,没卖完解决)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ? and stock > 0boolean result = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!result) {return Result.fail("商品已经售罄!");}// 7.创建订单/*** 获取订单id*/long orderId = redisWorker.nextID("order");VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(orderId);voucherOrder.setVoucherId(voucherId);voucherOrder.setUserId(userID);// 将订单信息保存到数据库// insert into tb_voucher_order values ()save(voucherOrder);//8.返回订单idreturn Result.ok(orderId);}

但是像这样在方法上添加锁,相当于是this锁,任何对象进来都会获取到锁,锁的粒度太粗了,在使用锁过程中,控制锁粒度 是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会锁住,所以我们需要去控制锁的粒度,以下这段代码需要修改为:
intern() 这个方法是从常量池中拿到数据,如果我们直接使用userId.toString() 他拿到的对象实际上是不同的对象,new出来的对象,我们使用锁必须保证锁必须是同一把,所以我们需要使用intern()方法。
安装Translation插件看中文解释
在这里插入图片描述
配置鼠标悬浮
在这里插入图片描述

修改VoucherOrderServiceImpl.java

 @Transactionalpublic Result createVoucherOrder(Long voucherId) {Long userID = UserHolder.getUser().getId();// 通过悲观锁,锁住用户,实现一人一单synchronized (userID.toString().intern()) {// 5.实现1人1单加入逻辑:根据优惠券id和用户id查询订单// 5.1查询订单,并不用查询出具体的值,而是查询出数量即可Integer count = query().eq("user_id", userID).eq("voucher_id", voucherId).count();// 5.2判断订单是否存在if (count > 0) {// 5.2.1 存在就返回异常结果return Result.fail("秒杀优惠券每人限购1张,感谢配合,本优惠券最终解释权归ty公司所有!");}// 5.2.2 不存在再减少库存// 6.1扣减库存(会出现超卖问题)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ?/*boolean result = iSeckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).update();*/// 6.2扣减库存(针对超卖问题用乐观锁CAS解决)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ? and stock = ?/*boolean result = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", stock).update();*/// 6.3扣减库存(针对使用乐观锁CAS,没卖完解决)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ? and stock > 0boolean result = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!result) {return Result.fail("商品已经售罄!");}// 7.创建订单/*** 获取订单id*/long orderId = redisWorker.nextID("order");VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(orderId);voucherOrder.setVoucherId(voucherId);voucherOrder.setUserId(userID);// 将订单信息保存到数据库// insert into tb_voucher_order values ()save(voucherOrder);//8.返回订单idreturn Result.ok(orderId);}}

但是以上代码还是存在问题,问题的原因在于当前方法被spring的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题:如下:

在seckillVoucher 方法中,添加以下逻辑,这样就能保证事务的特性,同时也控制了锁的粒度。让对象锁,锁住整个方法,因为@Transactional是方法结束后才提交的。防止事务还没提交前有线程进入方法体内查询。不把synchronized放在方法上是因为,放在方法上相当于是this锁,任何对象都可以来获取。

在这里插入图片描述

        // 实现一人一单,锁住对象Long userID = UserHolder.getUser().getId();synchronized (userID.toString().intern()) {return createVoucherOrder(voucherId);}

但是以上做法依然有问题,因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务。
在这里插入图片描述
修改VoucherOrderServiceImpl.java

        // 实现一人一单,获取user对象锁Long userID = UserHolder.getUser().getId();synchronized (userID.toString().intern()) {// 调用本类方法的时候,Spring事务是失效的,解决方案二:调用AopContext APIObject o = AopContext.currentProxy();IVoucherOrderService proxy = (IVoucherOrderService) o;return proxy.createVoucherOrder(voucherId);}

修改IVoucherOrderService.java

public interface IVoucherOrderService extends IService<VoucherOrder> {Result seckillVoucher(Long voucherId);Result createVoucherOrder(Long voucherId);
}

pom.xml加入依赖

 <!-- Spring事务失效,采用AopContext API来处理 --><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>

修改HmDianPingApplication.java添加注解
在这里插入图片描述
HmDianPingApplication.java

@EnableAspectJAutoProxy(exposeProxy = true)

清理数据库

 UPDATE tb_seckill_voucher set stock = 100 where voucher_id = 2;delete from tb_voucher_order;

重启应用,运行JMeter
在这里插入图片描述

查看数据库
在这里插入图片描述
在这里插入图片描述

4.4.3 集群模式下的线程安全问题

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

1、我们将服务启动两份,端口分别为8081和8082:
在这里插入图片描述
给新服务重新命名,重新指定一个端口

-Dserver.port=8082

在这里插入图片描述

点选2个应用,点击启动,注意用dubug启动
在这里插入图片描述

2、然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡:
在这里插入图片描述

3.配置完后启动nginx

nginx.exe -s reload

在这里插入图片描述

4.访问

http://localhost:8080/api/voucher/list/1

在这里插入图片描述
刷新2-3次,8082有查询sql日志输出
在这里插入图片描述
8081有查询sql日志输出
在这里插入图片描述
这样就模拟多集群负载均衡完毕。

打开数据库,恢复数据

 UPDATE tb_seckill_voucher SET stock = 100 WHERE voucher_id = 2;delete FROM tb_voucher_order;

打开IDEA,打上断点
在这里插入图片描述
打开PostMan配置,配置2个Http请求,2个配置完全一样,访问路径和header的authorization都一样

路径http://ocalhost:8080/api/voucher-order/seckill/2
post请求
header:authorization

在这里插入图片描述
在这里插入图片描述

之后点击send测试,发现2个端口的服务都进入了断点,这明显是有问题的,因为二者配置的是同一个用户,不应该都获取到锁。与我们的设想有出入
在这里插入图片描述
在这里插入图片描述

继续跑断点,发现2个端口计算的count都是0
在这里插入图片描述
在这里插入图片描述
全部跑完后发现,订单出现了2个,库存少了2个,又一次出现了1人多卖现象。
在这里插入图片描述
在这里插入图片描述

具体操作(略)

有关锁失效原因分析

由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。
在这里插入图片描述

4.4.4 分布式锁

4.4.4.1 分布式锁原理

基本原理和实现方式对比

分布式锁:满足分布式系统集群模式下多进程可见并且互斥的锁。

分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路。
在这里插入图片描述
那么分布式锁他应该满足一些什么样的条件呢?

可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思

互斥:互斥是分布式锁的最基本的条件,使得程序串行执行

高可用:程序不易崩溃,时时刻刻都保证较高的可用性

高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能

安全性:安全也是程序中必不可少的一环
在这里插入图片描述
常见的分布式锁有三种:

Mysql:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见

Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁

Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述。
在这里插入图片描述

4.4.4.2 Redis的String结构实现分布式锁

实现分布式锁时需要实现的两个基本方法:

  • 获取锁:

    • 互斥:确保只能有一个线程获取锁

在这里插入图片描述

  • 非阻塞:尝试一次,成功返回true,失败返回false

  • 释放锁:

    • 手动释放
    • 超时释放:获取锁时添加一个超时时间

在这里插入图片描述
我们发现互斥和设置有效期是2个指令

setnx lock thread1
expire thread1 5

2个指令,有可能只执行了第一条指令,第二条执行还没执行的时候redis宕机了。这就需要找一条指令把2件事都干了。

set lock thread2 ex 5 nx 

在这里插入图片描述

核心思路:

我们利用redis 的setNx 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的哥们,等待一定时间后重试即可。
在这里插入图片描述

4.4.4.2.1 实现分布式锁版本一
  • 加锁逻辑

锁的基本接口
新增接口ILock.java
在这里插入图片描述
ILock.java

package com.hmdp.lock;/*** @InterfaceName: ILock* @Description:* @Author: wty* @Date: 2023/2/16*/public interface ILock {/*** @param* @return boolean true获取锁成功,false获取锁失败* @description //获取尝试锁* @param: expireTime 锁持有的超时时间,过期后自动释放* @date 2023/2/16 12:07* @author wty**/boolean tryLock(long expireTime);/*** @param* @return void* @description //释放锁* @date 2023/2/16 12:08* @author wty**/void unLock();
}

SimpleRedisLock

利用setnx方法进行加锁,同时增加过期时间,防止死锁,此方法可以保证加锁和增加过期时间具有原子性。新建类SimpleRedisLock
在这里插入图片描述
SimpleRedisLock.java

public class SimpleRedisLock implements ILock {/*** 锁前缀*/private static final String KEY_PREFIX = "lock:";/*** 定义锁的名称*/private String lockName;StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String lockName, StringRedisTemplate stringRedisTemplate) {this.lockName = lockName;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long expireTime) {// 获取当前线程的标识long threadId = Thread.currentThread().getId();// set key "1" ex expireTime nxBoolean flag = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + lockName, threadId + "", expireTime, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);}@Overridepublic void unLock() {stringRedisTemplate.delete(KEY_PREFIX + lockName);}
}
  • 释放锁逻辑

SimpleRedisLock

释放锁,防止删除别人的锁

    @Overridepublic void unLock() {stringRedisTemplate.delete(KEY_PREFIX + lockName);}
  • 修改业务代码VoucherOrderServiceImpl.java
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Autowiredprivate ISeckillVoucherService iSeckillVoucherService;@Autowiredprivate RedisWorker redisWorker;@ResourceStringRedisTemplate stringRedisTemplate;/*** @param* @return com.hmdp.dto.Result* @description //秒杀优惠券* @param: voucherId* @date 2023/2/15 22:09* @author wty**/@Overridepublic Result seckillVoucher(Long voucherId) {// 1.查询秒杀优惠券信息// select * from tb_seckill_voucher where voucher_id = ?SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始LocalDateTime beginTime = seckillVoucher.getBeginTime();LocalDateTime endTime = seckillVoucher.getEndTime();LocalDateTime now = LocalDateTime.now();if (now.isBefore(beginTime)) {// 当前时间早于秒杀开始时间,说明秒杀没有开始return Result.fail("秒杀尚未开始,请耐心等待!秒杀开始时间:" + beginTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}// 3.判断秒杀是否已经结束if (now.isAfter(endTime)) {// 当前时间晚于秒杀结束时间,说明秒杀结束了return Result.fail("秒杀已经结束,感谢支持!");}// 4.判断库存是否充足Integer stock = seckillVoucher.getStock();if (stock <= 0) {return Result.fail("商品已经售罄!");}// 实现一人一单,获取user对象锁Long userID = UserHolder.getUser().getId();/*// 使用JDK提供的锁监视器synchronized来实现synchronized (userID.toString().intern()) {// 调用本类方法的时候,Spring事务是失效的,解决方案二:调用AopContext APIObject o = AopContext.currentProxy();IVoucherOrderService proxy = (IVoucherOrderService) o;return proxy.createVoucherOrder(voucherId);}*/// 尝试自定义锁监视器SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order:" + userID.toString().intern(), stringRedisTemplate);boolean flag = simpleRedisLock.tryLock(RedisConstants.LOCK_VOUVHER_ORDER_TTL);if (!flag) {// 获取锁失败,就直接返回错误信息即可return Result.fail("[秒杀优惠券]不允许重复下单!本秒杀业务一切解释器归ty公司所有");}Result result;try {Object o = AopContext.currentProxy();IVoucherOrderService proxy = (IVoucherOrderService) o;return proxy.createVoucherOrder(voucherId);} finally {simpleRedisLock.unLock();}}/*** @param* @return com.hmdp.dto.Result* @description //根据优惠券id和用户id查询订单 减少库存生成订单* @param: voucherId* @date 2023/2/15 22:12* @author wty**/@Override@Transactionalpublic Result createVoucherOrder(Long voucherId) {Long userID = UserHolder.getUser().getId();// 5.实现1人1单加入逻辑:根据优惠券id和用户id查询订单// 5.1查询订单,并不用查询出具体的值,而是查询出数量即可Integer count = query().eq("user_id", userID).eq("voucher_id", voucherId).count();// 5.2判断订单是否存在if (count > 0) {// 5.2.1 存在就返回异常结果return Result.fail("秒杀优惠券每人限购1张,感谢配合,本优惠券最终解释权归ty公司所有!");}// 5.2.2 不存在再减少库存// 6.1扣减库存(会出现超卖问题)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ?/*boolean result = iSeckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).update();*/// 6.2扣减库存(针对超卖问题用乐观锁CAS解决)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ? and stock = ?/*boolean result = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", stock).update();*/// 6.3扣减库存(针对使用乐观锁CAS,没卖完解决)// update tb_seckill_voucher set stock = stock -1 where voucher_id = ? and stock > 0boolean result = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!result) {return Result.fail("商品已经售罄!");}// 7.创建订单/*** 获取订单id*/long orderId = redisWorker.nextID("order");VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(orderId);voucherOrder.setVoucherId(voucherId);voucherOrder.setUserId(userID);// 将订单信息保存到数据库// insert into tb_voucher_order values ()save(voucherOrder);//8.返回订单idreturn Result.ok(orderId);}
}

修改RedisConstants.java

public static final Long LOCK_VOUVHER_ORDER_TTL = 1200L; // TODO 后续改成5L

恢复数据库数据

 UPDATE tb_seckill_voucher SET stock = 100 WHERE voucher_id = 2;delete FROM tb_voucher_order;

Debug模式重启2个应用
发送PostMan请求
在这里插入图片描述
在这里插入图片描述
看一下IDEA中的断点,发现一个是true获取到锁,另一个是false未获取到锁。
在这里插入图片描述
在这里插入图片描述
查看数据库,发现库存减少1,符合
在这里插入图片描述
发现订单产生1条记录,符合
在这里插入图片描述

4.4.4.3 锁误删问题

1.Redis分布式锁误删情况说明

逻辑说明:

持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况说明。

解决方案:解决方案就是在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除,假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后删除锁,但是线程1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。
在这里插入图片描述

2. 解决Redis分布式锁误删问题

需求:修改之前的分布式锁实现,满足:

  1. 在获取锁时存入线程标示(可以用UUID表示)
  2. 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
  • 如果一致则释放锁
  • 如果不一致则不释放锁

核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。
在这里插入图片描述

具体代码如下:加锁和释放锁
通过UUID和线程id区分每个线程。
修改SimpleRedisLock.java

public class SimpleRedisLock implements ILock {/*** 锁前缀*/private static final String KEY_PREFIX = "lock:";/*** 线程ID前缀* true是可以把UUID中的横线去掉*/private static final String THREAD_ID_PREFIX = UUID.randomUUID().toString(true) + "-";/*** 定义锁的名称*/private String lockName;StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String lockName, StringRedisTemplate stringRedisTemplate) {this.lockName = lockName;this.stringRedisTemplate = stringRedisTemplate;}/*** @param* @return boolean* @description // 获取锁* @param: expireTime* @date 2023/2/16 12:24* @author wty**/@Overridepublic boolean tryLock(long expireTime) {// 获取当前线程的标识//long threadId = Thread.currentThread().getId();String threadId = THREAD_ID_PREFIX + Thread.currentThread().getId();// set key "1" ex expireTime nxBoolean flag = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + lockName, threadId + "", expireTime, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);}/*** @param* @return void* @description //释放锁* @date 2023/2/16 12:24* @author wty**/@Overridepublic void unLock() {// 获取当前线程的IDString threadId = THREAD_ID_PREFIX + Thread.currentThread().getId();String threadId_Redis = stringRedisTemplate.opsForValue().get(KEY_PREFIX + lockName);// 只有redis中的线程和当前的线程是同一个才允许释放锁if (String.valueOf(threadId).equals(threadId_Redis)) {stringRedisTemplate.delete(KEY_PREFIX + lockName);}}
}

修改数据库sql

 UPDATE tb_seckill_voucher SET stock = 100 WHERE voucher_id = 2;DELETE FROM tb_voucher_order;

重启应用,用PostMan发送请求
在这里插入图片描述
在这里插入图片描述
之后允许断点通过,让第一个端口能成功获取锁
在这里插入图片描述
此时模拟业务阻塞,数据有效期过期,但是业务还是没有办理完。我们可以人为手动删除掉redis中的锁。
可以看到value的形式满足UUID-线程id的形式
在这里插入图片描述
删除后,紧接着我们让端口2来获取锁
在这里插入图片描述
然后我们跟端口1,看看能否释放锁,发现是不能的
在这里插入图片描述
我们跟端口2,发现它可以释放锁
在这里插入图片描述
查看数据库
在这里插入图片描述
在这里插入图片描述

解决了误删的操作。

有关代码实操说明:

在我们修改完此处代码后,我们重启工程,然后启动两个线程,第一个线程持有锁后,手动释放锁,第二个线程 此时进入到锁内部,再放行第一个线程,此时第一个线程由于锁的value值并非是自己,所以不能释放锁,也就无法删除别人的锁,此时第二个线程能够正确释放锁,通过这个案例初步说明我们解决了锁误删的问题。

4.4.4.4 分布式锁的原子性操作问题

更为极端的误删逻辑说明:

线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生。
在这里插入图片描述

4.4.4.5 Lua脚本解决原子性问题

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:Lua官网。
这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了,作为Java程序员这一块并不作一个简单要求,并不需要大家过于精通,只需要知道他有什么作用即可。

这里重点介绍Redis提供的调用函数,语法如下:

redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,则脚本是这样:

# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
在这里插入图片描述
例如,我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:
这里最后的0,是指key类型的参数,比如这个脚本里没有参数,都是常量设置好的,就是0

EVAL "return redis.call('set','name','jack')" 0

在这里插入图片描述
自己试一下
在这里插入图片描述

如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:

在这里插入图片描述
Lua脚本中数组是从1开始的!
我们自己玩一下指令
在这里插入图片描述

接下来我们来回顾一下我们释放锁的逻辑:

释放锁的业务流程是这样的

​ 1、获取锁中的线程标示

​ 2、判断是否与指定的标示(当前线程标示)一致

​ 3、如果一致则释放锁(删除)

​ 4、如果不一致则什么都不做

如果用Lua脚本来表示则是这样的:

最终我们操作redis的拿锁比锁删锁的lua脚本就会变成这样
Lua脚本如下:

-- 锁的key
local key = KEYS[1]-- 当前线程的标识
local threadID = ARGV[1]-- 获取锁中的线程标识
local threadID_Redis = redis.call('get',KEYS[1])-- 比较线程的标识与锁中的标识是否一致
if(ARGV[1] == threadID_Redis) then-- 一致就释放锁redis.call('del',KEYS[1])
end
return 0
1. 利用Java代码调用Lua脚本改造分布式锁

lua脚本本身并不需要大家花费太多时间去研究,只需要知道如何调用,大致是什么意思即可,所以在笔记中并不会详细的去解释这些lua表达式的含义。

我们的RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图股

在这里插入图片描述
下载插件EmmyLua
在这里插入图片描述
下载完插件之后,新建Lua脚本
在这里插入图片描述
把上面的脚本拷贝进去即可
在这里插入图片描述

Java代码
在这里插入图片描述

修改SimpleRedisLock.java增加

public class SimpleRedisLock implements ILock {/*** 锁前缀*/private static final String KEY_PREFIX = "lock:";/*** 线程ID前缀* true是可以把UUID中的横线去掉*/private static final String THREAD_ID_PREFIX = UUID.randomUUID().toString(true) + "-";/*** 定义锁的名称*/private String lockName;StringRedisTemplate stringRedisTemplate;private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}public SimpleRedisLock(String lockName, StringRedisTemplate stringRedisTemplate) {this.lockName = lockName;this.stringRedisTemplate = stringRedisTemplate;}/*** @param* @return void* @description //释放锁(基于Lua脚本)* @date 2023/2/16 15:37* @author wty**/@Overridepublic void unLock() {// 调用Lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + lockName),THREAD_ID_PREFIX + Thread.currentThread().getId());}
}

经过以上代码改造后,我们就能够实现 拿锁、锁删锁的原子性动作了~
下面测试一下。
恢复数据库

 UPDATE tb_seckill_voucher SET stock = 100 WHERE voucher_id = 2;DELETE FROM tb_voucher_order;

重启应用Debug模式
打开PostMan,2个Http请求分别点击send
在这里插入图片描述
在这里插入图片描述
第一个端口跑到获取锁的断点
在这里插入图片描述
模拟业务阻塞有效期超时,去redis中删除lock
在这里插入图片描述
让端口2跑完获取锁逻辑,重新生成新的锁
在这里插入图片描述

此时,切换,让端口1跑到释放锁的位置,模拟减少库存,生成订单,但是还没有释放锁。此时看一下Redis数据库,说明还没释放锁
在这里插入图片描述
再让端口2跑完所有断点,redis中lock的那一条删除了,这样保证了释放锁的原子性。

小总结:

基于Redis的分布式锁实现思路:

  • 利用set nx ex获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁
    • 特性:
      • 利用set nx满足互斥性
      • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
      • 利用Redis集群保证高可用和高并发特性

笔者总结:我们一路走来,利用添加过期时间,防止死锁问题的发生,但是有了过期时间之后,可能出现误删别人锁的问题,这个问题我们开始是利用删之前 通过拿锁,比锁,删锁这个逻辑来解决的,也就是删之前判断一下当前这把锁是否是属于自己的,但是现在还有原子性问题,也就是我们没法保证拿锁比锁删锁是一个原子性的动作,最后通过lua表达式来解决这个问题

但是目前还剩下一个问题锁不住,什么是锁不住呢,你想一想,如果当过期时间到了之后,我们可以给他续期一下,比如续个30s,就好像是网吧上网, 网费到了之后,然后说,来,网管,再给我来10块的,是不是后边的问题都不会发生了,那么续期问题怎么解决呢,可以依赖于我们接下来要学习redission啦

测试逻辑:

第一个线程进来,得到了锁,手动删除锁,模拟锁超时了,其他线程会执行lua来抢锁,当第一天线程利用lua删除锁时,lua能保证他不能删除他的锁,第二个线程删除锁时,利用lua同样可以保证不会删除别人的锁,同时还能保证原子性。

4.4.4.6 Redission分布式锁

1.分布式锁 - redission功能介绍

基于setnx实现的分布式锁存在下面的问题:

重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。

不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。

**超时释放:**我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患

主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
在这里插入图片描述
那么什么是Redission呢?

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

Redission提供了分布式锁的多种多样的功能
在这里插入图片描述
官网网站:Redisson官网
github地址:

2. 分布式锁-Redission快速入门
  1. pom.xml中引入依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
  1. 配置Redisson客户端:
    在这里插入图片描述
    RedissonConfig代码如下:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置类Config config = new Config();// 添加redis地址,这里添加了单点的地址(虚拟机地址),也可以使用config.useClusterServers()添加集群地址config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassword("123321");// 创建RedissonClient对象return Redisson.create(config);}
}
  1. 如何使用Redission的分布式锁
@Resource
private RedissionClient redissonClient;@Test
void testRedisson() throws Exception{//获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);//判断获取锁成功if(isLock){try{System.out.println("执行业务");          }finally{//释放锁lock.unlock();}}}

在 VoucherOrderServiceImpl

注入RedissonClient
修改VoucherOrderServiceImpl.java

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Autowiredprivate ISeckillVoucherService iSeckillVoucherService;@Autowiredprivate RedisWorker redisWorker;@ResourceStringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;@Overridepublic Result seckillVoucher(Long voucherId) {// 1.查询秒杀优惠券信息// select * from tb_seckill_voucher where voucher_id = ?SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始LocalDateTime beginTime = seckillVoucher.getBeginTime();LocalDateTime endTime = seckillVoucher.getEndTime();LocalDateTime now = LocalDateTime.now();if (now.isBefore(beginTime)) {// 当前时间早于秒杀开始时间,说明秒杀没有开始return Result.fail("秒杀尚未开始,请耐心等待!秒杀开始时间:" + beginTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}// 3.判断秒杀是否已经结束if (now.isAfter(endTime)) {// 当前时间晚于秒杀结束时间,说明秒杀结束了return Result.fail("秒杀已经结束,感谢支持!");}// 4.判断库存是否充足Integer stock = seckillVoucher.getStock();if (stock <= 0) {return Result.fail("商品已经售罄!");}// 实现一人一单,获取user对象锁Long userID = UserHolder.getUser().getId();// 用Redisson提供的可重入锁RLock lock = redissonClient.getLock("lock:order:" + userID.toString().intern());boolean flag = lock.tryLock();if (!flag) {// 获取锁失败,就直接返回错误信息即可return Result.fail("[秒杀优惠券]不允许重复下单!本秒杀业务一切解释器归ty公司所有");}try {Object o = AopContext.currentProxy();IVoucherOrderService proxy = (IVoucherOrderService) o;return proxy.createVoucherOrder(voucherId);} finally {lock.unlock();}}}

恢复数据库

 UPDATE tb_seckill_voucher SET stock = 100 WHERE voucher_id = 2;DELETE FROM tb_voucher_order;

依然重启2个端口,用其中1个PostMan发送请求即可。
在这里插入图片描述
查看数据库
在这里插入图片描述
在这里插入图片描述
再恢复数据库测试并发。

 UPDATE tb_seckill_voucher SET stock = 100 WHERE voucher_id = 2;DELETE FROM tb_voucher_order;

用JMeter测试一下发现,只允许成功1个
在这里插入图片描述
数据库结果
在这里插入图片描述
在这里插入图片描述

4.4.4.7 Hash结构解决锁的可重入问题

分布式锁-redission可重入锁原理

在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。

在redission中,我们的也支持支持可重入锁。

我们来模拟一下这个过程
新建测试类RedissonTest.java

@Slf4j
@SpringBootTest
public class RedissonTest {@Resourceprivate RedissonClient redissonClient;RLock lock;@BeforeEachvoid setUp() {lock = redissonClient.getLock("lock");}@Testvoid method1() {boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败1");return;}try {log.info("获取锁成功,1");method2();} finally {log.info("释放锁,1");lock.unlock();}}@Testvoid method2() {boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败2");return;}try {log.info("获取锁成功,2");} finally {log.info("释放锁,2");lock.unlock();}}
}

看Redis的图形界面,method2获取锁的时候
在这里插入图片描述
method2释放锁的时候
在这里插入图片描述
method1释放锁后lock移除。

在分布式锁中,他采用hash结构用来存储锁,其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有,所以接下来我们一起分析一下当前的这个lua表达式

这个地方一共有3个参数

KEYS[1] : 锁名称

ARGV[1]: 锁失效时间

ARGV[2]: id + “:” + threadId; 锁的小key

exists: 判断数据是否存在 name:是lock是否存在,如果==0,就表示当前这把锁不存在

redis.call(‘hset’, KEYS[1], ARGV[2], 1);此时他就开始往redis里边去写数据 ,写成一个hash结构

Lock{​    id + **":"** + threadId :  1}

如果当前这把锁存在,则第一个条件不满足,再判断

redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1

此时需要通过大key+小key判断当前这把锁是否是属于自己的,如果是自己的,则进行

redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)

将当前这个锁的value进行+1 ,redis.call(‘pexpire’, KEYS[1], ARGV[1]); 然后再对其设置过期时间,如果以上两个条件都不满足,则表示当前这把锁抢锁失败,最后返回pttl,即为当前这把锁的失效时间

如果小伙帮们看了前边的源码, 你会发现他会去判断当前这个方法的返回值是否为null,如果是null,则对应则前两个if对应的条件,退出抢锁逻辑,如果返回的不是null,即走了第三个分支,在源码处会进行while(true)的自旋抢锁。

"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);"

在这里插入图片描述
我们跟一下源码看一下:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
以上是tryLock的源码。接下来看unLock的
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

4.4.4.8 watchDog解决锁超时释放问题

1.分布式锁-redission锁重试和WatchDog机制

说明:由于课程中已经说明了有关tryLock的源码解析以及其看门狗原理,所以笔者在这里给大家分析lock()方法的源码解析,希望大家在学习过程中,能够掌握更多的知识

抢锁过程中,获得当前线程,通过tryAcquire进行抢锁,该抢锁逻辑和之前逻辑相同

1、先判断当前这把锁是否存在,如果不存在,插入一把锁,返回null

2、判断当前这把锁是否是属于当前线程,如果是,则返回null

所以如果返回是null,则代表着当前这哥们已经抢锁完毕,或者可重入完毕,但是如果以上两个条件都不满足,则进入到第三个条件,返回的是锁的失效时间,同学们可以自行往下翻一点点,你能发现有个while( true) 再次进行tryAcquire进行抢锁

long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {return;
}

接下来会有一个条件分支,因为lock方法有重载方法,一个是带参数,一个是不带参数,如果带带参数传入的值是-1,如果传入参数,则leaseTime是他本身,所以如果传入了参数,此时leaseTime != -1 则会进去抢锁,抢锁的逻辑就是之前说的那三个逻辑

if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}

如果是没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间 commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()

ttlRemainingFuture.onComplete((ttlRemaining, e) 这句话相当于对以上抢锁进行了监听,也就是说当上边抢锁完毕后,此方法会被调用,具体调用的逻辑就是去后台开启一个线程,进行续约逻辑,也就是看门狗线程

RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}
});
return ttlRemainingFuture;

此逻辑就是续约逻辑,注意看commandExecutor.getConnectionManager().newTimeout() 此方法

Method( new TimerTask() {},参数2 ,参数3 )

指的是:通过参数2,参数3 去描述什么时候去做参数1的事情,现在的情况是:10s之后去做参数一的事情

因为锁的失效时间是30s,当10s之后,此时这个timeTask 就触发了,他就去进行续约,把当前这把锁续约成30s,如果操作成功,那么此时就会递归调用自己,再重新设置一个timeTask(),于是再过10s后又再设置一个timerTask,完成不停的续约

那么大家可以想一想,假设我们的线程出现了宕机他还会续约吗?当然不会,因为没有人再去调用renewExpiration这个方法,所以等到时间之后自然就释放了。

private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}

在这里插入图片描述
在这里插入图片描述

2. 分布式锁-redission锁的MutiLock原理
  • Redisson分布式锁主从一致性问题
    为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
    此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
    在这里插入图片描述
    为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
    在这里插入图片描述

那么MutiLock 加锁原理是什么呢?笔者画了一幅图来说明

当我们去设置了多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试.
在这里插入图片描述
实现连锁机制
修改RedissonConfig.java

@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {// 配置类Config config = new Config();// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址config.useSingleServer().setAddress("redis://192.168.183.145:6379").setPassword("112453");// 创建RedissonClient对象return Redisson.create(config);}@Beanpublic RedissonClient redissonClient2() {// 配置类Config config = new Config();// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址config.useSingleServer().setAddress("redis://192.168.193.175:6380").setPassword("557724");// 创建RedissonClient对象return Redisson.create(config);}@Beanpublic RedissonClient redissonClient3() {// 配置类Config config = new Config();// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址config.useSingleServer().setAddress("redis://192.168.177.145:6381").setPassword("5896");// 创建RedissonClient对象return Redisson.create(config);}
}

修改RedissonTest.java

@Slf4j
@SpringBootTest
public class RedissonTest {@Resourceprivate RedissonClient redissonClient;@Resourceprivate RedissonClient redissonClient2;@Resourceprivate RedissonClient redissonClient3;RLock lock;@BeforeEachvoid setUp() {RLock lock1 = redissonClient.getLock("lock");RLock lock2 = redissonClient.getLock("lock");RLock lock3 = redissonClient.getLock("lock");// 创建连锁lock = redissonClient.getMultiLock(lock1, lock2, lock3);}@Testvoid method1() {boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败1");return;}try {log.info("获取锁成功,1");method2();} finally {log.info("释放锁,1");lock.unlock();}}@Testvoid method2() {boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败2");return;}try {log.info("获取锁成功,2");} finally {log.info("释放锁,2");lock.unlock();}}}

在这里插入图片描述


http://www.ppmy.cn/news/25711.html

相关文章

【MySQL】数据库基础

目录 1、什么是数据库 2、 数据库基本操作 2.1 查看当前数据库 2.2 创建一个数据库 2.3 选中数据库 2.4 删除数据库 3、常见的数据类型 3.1 数值类型 3.2 字符串类型 3.3 日期类型 4、表的操作 4.1 创建表 4.2 查看指定数据库下的所有表 4.3 查看表的结构 4.…

【MT7628】固件开发-SDK4320添加MT7612E WiFi驱动操作说明

解压5G WiFi MT7612E驱动1.1解压指令 tar -xvf MT76x2E_MT7620_LinuxAP_V3.0.4.0_P2_DPA_20160308.tar.bz2 1.2解压之后会出现以下两个目录 rlt_wifi rlt_wifi_ap 1.3将解压后的文件拷贝到系统下 拷贝路径 RT288x_SDK/source/linux-2.6.36.x/drivers/net/wireless 内核中打开驱…

极验3代 加密分析

目标链接 aHR0cHM6Ly93d3cuZ2VldGVzdC5jb20vZGVtby9zbGlkZS1mbG9hdC5odG1s接口分析 极验参数重要信息 gt和challenge&#xff1b;gt是固定的&#xff0c;但是challenge每次请求会产生不同的&#xff0c;这里的请求的并没有什么加密参数。 下一个请求 gettype.php&#xff0c…

element-ui中el-table点击其他自定义按钮展开table中某一行

element-ui中el-table点击其他自定义按钮展开table中某一行 在日常开发中&#xff0c;我们遇见了会有点击某些按钮&#xff0c;使得表格行展开的需求&#xff0c;这时候去查看文档 element-ui&#xff08;table&#xff09; 这里官方提供了示例为在行最左侧有一个展开合并ico…

深入理解MySQLⅢ -- 锁与InnoDB引擎

文章目录锁概述全局锁表级锁表锁元数据锁意向锁行级锁行锁间隙锁&临键锁InnoDB引擎逻辑存储结构架构内存结构磁盘结构后台线程事务原理redo logundo logMVCC锁 概述 锁是计算机协调多个进程或线程并发访问某一资源的机制。在数据库中&#xff0c;除传统的计算资源&#x…

MySQL 派生表产生关联索引auto_key0导致SQL非常的慢

相同的SQL在maridb运行0.5秒&#xff0c;在MySQL8.0.26中运行要19秒 官方MySQL在处理子查时&#xff0c;优化器有个优化参数derived_merge&#xff0c;MySQL7开启添加&#xff0c;默认on.很多情况可以自动优化派生表&#xff0c;避免创建临时索引auto_key0和生成临时表数据做…

图文详解Ansible中的变量及加密

文章目录一、变量命名二、变量级别三、.变量设定和使用方式1.在playbook中直接定义变量2.在文件中定义变量3.使用变量4.设定主机变量和清单变量5.目录设定变量6.用命令覆盖变量7.使用数组设定变量8.注册变量9.事实变量10.魔法变量四、JINJA2模板五、 Ansible的加密控制练习1.用…

PPS文件如何转换成PPT?附两种方法

在工作中&#xff0c;PPS文件的使用还是很广泛的&#xff0c;因为作为幻灯片放映文件&#xff0c;点击后就能直接播放&#xff0c;十分方便。但如果想要修改PPS里的内容&#xff0c;PPS是无法编辑的&#xff0c;我们需要把文件转换成PPT&#xff0c;再进行修改。 那PPS文件如何…