【redis-04】Redisson实现分布式锁实战和源码剖析

embedded/2024/10/8 23:51:00/

redis系列整体栏目


内容链接地址
【一】redis基本数据类型和使用场景https://zhenghuisheng.blog.csdn.net/article/details/142406325
【二】redis的持久化机制和原理https://zhenghuisheng.blog.csdn.net/article/details/142441756
【三】redis缓存穿透、缓存击穿、缓存雪崩https://zhenghuisheng.blog.csdn.net/article/details/142577507
【四】redisson实现分布式锁实战和源码剖析https://zhenghuisheng.blog.csdn.net/article/details/142646301

如需转载,请输入:https://blog.csdn.net/zhenghuishengq/article/details/142577507

redisson实现分布式锁实战和源码剖析

  • 一,redisson实现分布式锁实战和源码剖析
    • 1,redis原生方式实现分布式
    • 2,Redisson实现分布式
      • 2.1,ReentrantLock 实现锁
      • 2.2,Redission实现分布式锁案例
      • 2.3,Redission底层实现原理和源码剖析
        • 2.3.1,lock加锁逻辑
        • 2.3.2,lock锁续命逻辑
        • 2.3.3,加锁失败阻塞逻辑
        • 2.3.4,unlock锁释放
    • 3,Redisson总结

redisson_12">一,redisson实现分布式锁实战和源码剖析

前面几篇讲解了redis的基本数据类型,接下来在本文中,讲解一下如何通过redis实现一把分布式锁。在分布式环境中,所有的jvm层面的锁将会失去该有的作用,因此在分布式环境中,可以通过redis来实现这种分布式锁,说白了就是在分布式和高并发的环境下,将并行的线程改成串行。

redis_16">1,redis原生方式实现分布式

redis内部,提供了实现分布式锁的方式,可以直接通过 setnx 命令的方式来,加下来直接通过代码的方式来演示一段扣减库存的代码,比如以下这段代码,对id为1001的手机进行扣减库存,此时redis中设置了库存为100

set phoneCount 100

随后自定义一把分布式锁,在设置 key/value 的同时,并设置过期时间,可以保证整条命令的原子性

@RestController
@RequestMapping("/test")
@Slf4j
public class StockController {@Resourceprivate RedisTemplate redisTemplate;//手机idpublic static final String PHONE_ID = "phone:1001";//手机数量public static final String PHONE_COUNT = "phoneCount";@GetMapping("/disStock")public AjaxResult disStock(){//每个线程分配一个唯一标识String flag = UUID.randomUUID().toString();//定义一把分布式锁,设置有效期为30sredisTemplate.opsForValue().setIfAbsent(PHONE_ID, flag, 30, TimeUnit.SECONDS);try {//当前库存Integer stock = Integer.parseInt(redisTemplate.opsForValue().get(PHONE_COUNT) + "");if (stock > 0){stock = stock - 1;redisTemplate.opsForValue().set(PHONE_COUNT,stock);log.info("当前库存值为:" + stock);}else{log.info("当前库存为空,扣减失败");}}finally {redisTemplate.delete(PHONE_ID);}return AjaxResult.success();}
}

如果是在并发量不大,或者说能接受超卖的情况下,上面这种方式实现分布式锁是够用的。

当然上面这种方式实现也有问题,就是有可能锁被误删的问题。假设此时线程1先拿到锁,线程2来时发现线程1已经拿到锁,那么线程2就会等待线程1执行完。但是现在还有问题,锁设置了一个过期时间30s,假设说线程1在执行下面这段代码的时候,可能逻辑特别复杂执行时间超过了30s,假设需要花费40s才能完成,那么在30s的时候,锁就过期了,那么线程2就能去抢锁

if (stock > 0){stock = stock - 1;xxxxx		//业务需要执行40sredisTemplate.opsForValue().set(PHONE_COUNT,stock);log.info("当前库存值为:" + stock);
}

但是线程1还是在执行的,假设此时线程2正拿到锁在执行任务,在40s后线程1执行完的时候,直接把这把锁给删了,导致线程2锁又失效了,线程2又没执行完,后面又会执行扣库存,删锁的命令,这样就会导致后面的线程的锁都会被莫名其妙的删除,库存方面最终也会出现超卖的问题。

在这里插入图片描述

redisTemplate.delete(PHONE_ID);

而且还会导致超卖问题,如线程1还没有set减1的操作到redis中,线程2拿到的还是100,按理来说是线程1减掉的值99,然后还是对100进行操作,如果是在高并发环境下,就会严重的出现超卖的问题。

因此需要在删锁时做一个进一步的优化,判断一下加锁的唯一标识是不是当前线程的唯一标识,是的话才能删

if(flag.equals(redisTemplate.opsForValue().get(PHONE_ID))){redisTemplate.delete(PHONE_ID);
}

上面这种情况在系统稳定的时候进行释放锁时没有问题,但是也可能遇到极端的情况,比如在释放锁之前遇到系统卡顿的情况,导致还没执行释放锁的命令锁又过期了,这样别的线程又能抢锁,然后当前线程在执行到删除锁命令的时候,有把别的抢到锁的线程的锁给删除了,又出现了上面的这个 超卖 的问题

if(flag.equals(redisTemplate.opsForValue().get(PHONE_ID))){xxxx 		//系统卡顿redisTemplate.delete(PHONE_ID);
}

总而言之如果用redis的自定义分布式锁时,会由于锁的超时时间、锁过期和锁删除机制,会导致出现 超卖问题。其主要原因是锁会过期,这样导致未执行完的线程还会对锁进行删除的操作,导致其他线程锁失效。

Redisson_106">2,Redisson实现分布式

虽然说redis确实可以通过自定义的方式实现一把分布式锁,但是其内部确实还存在一些问题,如经典的超卖问题,其主要原因还是,不能控制每个线程的过期时间,导致如果某个线程超时的话,就会出现锁提前释放,后续也可能出现将其他线程的锁删除的行为

因此出现了redisson分布式锁的实现,其官网链接地址如下:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

在这里插入图片描述

2.1,ReentrantLock 实现锁

看到这些可重入锁,公平锁等等,可以联想到JDK内部的JUC的实现,如可以查看本人写的JUC系列的 ReentrantLock的实现: https://blog.csdn.net/zhenghuishengq/article/details/132857564

实现一把单JVM进程锁的方式如下,在定义完一把 ReentrantLock 锁之后呢,直接调用内部两个简单的api就能实现加锁和解锁,底层通过aqs实现

ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock()

底层通过clh同步等待队列实现,同时还支持公平锁和非公平锁,由于本文主角是redission,因此详细可以去看上面给的文章的链接

在这里插入图片描述

2.2,Redission实现分布式锁案例

接下来针对上面这段自定义实现的分布式锁,通过redisson进行优化

public class StockController {@Resourceprivate RedisTemplate redisTemplate;@Resourceprivate Redisson redisson;//手机idpublic static final String PHONE_ID = "phone:1001";//手机数量public static final String PHONE_COUNT = "phoneCount";@GetMapping("/disStock")public AjaxResult disStock(){String flag = UUID.randomUUID().toString();//定义一把分布式锁,设置有效期为30sRLock lock = redisson.getLock(PHONE_ID);lock.lock();try {//当前库存Integer stock = Integer.parseInt(redisTemplate.opsForValue().get(PHONE_COUNT) + "");if (stock > 0){stock = stock - 1;redisTemplate.opsForValue().set(PHONE_COUNT,stock);log.info("当前库存值为:" + stock);}else{log.info("当前库存为空,扣减失败");}}finally {lock.unlock();}return AjaxResult.success();}
}

通过上面这段代码可以发现,其内部的实现方式和ReentrantLock的实现是很像的,都是在获取到相对于的实例对象之后,通过lock方式加锁和通过unlock的方式进行解锁

RLock lock = redisson.getLock(PHONE_ID);
lock.lock();
lock.unlock();

2.3,Redission底层实现原理和源码剖析

2.3.1,lock加锁逻辑

在看源码之前,来先对加锁内部做一个预想,无非就是抢锁、没抢到的阻塞,阻塞的线程轮询抢锁。

接下里进入内部源码查看,先进入这个lock方法,然后进入这个 lockInterruptibly 方法,首先会获取到当前线程id,需要给后面使用

在这里插入图片描述

接下来就是进入重要的 tryAcquire 方法,这个就是主要的获取锁的逻辑代码

Long ttl = this.tryAcquire(leaseTime, unit, threadId);

最后进入这个最重要的 tryLockInnerAsync 方法,内部其实是通过一个lua脚本来实现原子性,由于redis中执行任务的线程还是单线程,因此下面这一大段都可以保证操作的原子性

在这里插入图片描述

感兴趣的可以了解一下lua脚本,上面也通过箭头表明了lua脚本的参数,正常的对象都是通过 key/value 的方式表示,在lua脚本中,前面的这个集合标识key,后面的几个参数表示value,就是前面的 getName 是key,后面的 internalLockLeaseTime, getLockName(threadId) 是value,通过ARGV表示。lua脚本官方更加推荐使用一个key对应一个value,当然也允许key有1个或者多个,value有1个或者多个

Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

接下来分析这段代码,其本质就是一个通过hset设置值,

  • key是KEYS[1],代表的是getName(),这个name由外部提供,在实例化的时候传了一个key进来redisson.getLock(PHONE_ID),那么这个name此时表示的就是外部设置的手机id。
  • ARGV[2]表示第二个参数,对应的是 getLockName(threadId) ,就是线程id
  • 最后通过pexpire设置一个过期时间,此时的 ARGV[1]表示的是 internalLockLeaseTime,在内部定义了这个时间 private long lockWatchdogTimeout = 30 * 1000; 默认就是30s,看名字就知道是一个看门狗的超时机制
  • 如果设置成功,那么就回返回一个nil值,对应java里面的null值
"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +
"end; "

redis通过这种管道的方式实现lua脚本,减少网络开销,同时保证操作执行的原子性,在redis官方也有介绍,可以直接通过lua脚本代替redis的事务。

2.3.2,lock锁续命逻辑

上面的讲解的就是 tryLockInnerAsync 方法,通过异步的方式去拿到锁,通过Future阻塞拿到执行任务的结果,拿到执行结果之后,再回调一下这个 addListener 方法

在这里插入图片描述

接下来主要是看这个 addListener 中的核心方法 scheduleExpirationRenewal ,第一眼可以看到里面就是一个定时任务的线程类,看默认就是会在 internalLockLeaseTime / 3 时间内执行一次,也就是10s后执行一次

在这里插入图片描述

这一块的内部实现延时通过lua脚本,实现锁续命机制。其续命逻辑也很简单,如果10s后线程还没执行完成,内部会通过递归的方式循环调用,继续调用这个 scheduleExpirationRenewal 方法,很多中间件实现这种续命的方式都是采用内部递归调用的方式

//判断上一次续命是否续命成功
if (future.getNow()) {	// reschedule itselfscheduleExpirationRenewal(threadId);
}
2.3.3,加锁失败阻塞逻辑

当某个线程加锁失败时,那么该线程就会设置成阻塞状态,从而让出cpu的使用权。依旧得看这个抢锁逻辑的lua脚本,看到最后一句,如果抢锁失败的话,那么就会返回一个pttl的状态,其实就是一个拿到锁的过期时间。比如拿到锁的线程1已经执行了10s,那么来拿锁的线程2就会获取到剩余20s的过期时间

在这里插入图片描述

再回到进入这个最初的抢锁方法中,可以发现每一个线程都会返回一个ttl的过期时间,首先会对这个ttl超时时间进行判断,上面抢锁的逻辑可以看到,如果拿到锁的线程会返回一个nil,就是对应java中的null,如果不为null,就会继续往下执行

在这里插入图片描述

在进行阻塞的时候,作者使用了发布订阅的模式进行了优化 ,在线程进行阻塞时,把这些队列都订阅一个主题,当有锁释放的时候,那么就唤醒订阅了这个主题的线程。

RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);

随后进入一个自旋获取锁的阶段,在JDK的 ReentrantLock 中,就是采用的自旋的方式获取锁。但是在redis分布式情况下,一般都是适用于高流量高并发的场景,因此是不能完全一直空转自旋的,而且想想默认设置30s一个线程,刚运行就让大量的线程在那空转,肯定是不合适的

接下来看内部的这段自旋的代码,接下来主要分析这段ttl大于0的情况,getLatch表示一个JDK中的Semaphore信号量,这里用来做阻塞操作,比如说获取到的ttl为15s,那么这个线程就阻塞15s在这里,再进行一次自旋抢锁,而不是像 ReentrantLock 一样一直空转自旋在那抢锁,从而降低cpu的使用率,同时通过阻塞让出cpu的使用权

try {while (true) {ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {	break;}if (ttl >= 0) {		//如果ttl大于0//getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}
} finally {unsubscribe(future, threadId);	//唤醒订阅的线程
}

通过这种信号量阻塞的方式,达到间歇性加锁的目的。并且在抢锁时,内部没有公平锁的概念,默认就是非公平锁。

2.3.4,unlock锁释放

在上面的加锁失败阻塞中,讲了有这段加锁失败阻塞的方法,内部提供了一个同步订阅的方法,就是每个加锁失败的线程会订阅一个topic主题,当锁被释放或者过期之后就能通知订阅的线程来抢锁,不然每次自旋抢锁就太低效了,比如5s中执行完了,设置的30s,剩余的ttl还有25s,还要等25s去抢锁,显然不合适

RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);

接下来查看这段释放锁的逻辑,通过 unlockInnerAsync 进行释放锁

在这里插入图片描述

详细查看这个 unlockInnerAsync 方法之后,内部又是一个lua脚本,主要判断锁是否存在,如果存在则进行解锁的操作,然后通过 publish 发送一条消息给订阅了这个主题的所有线程可以来抢锁,内部还包含一些可重入锁等

在这里插入图片描述

Redisson_312">3,Redisson总结

redission主要通过lua脚本来实现加锁和解锁的操作,从而保证相关操作的原子性,其主要操作有以下步骤。

  • 线程进来先执行抢锁的操作,抢锁成功则继续往下执行业务,并且通过内部递归的方式给当前线程一个watch dog 看门狗的一个续命方式。
  • 抢锁失败线程则阻塞,并将线程关注一个发布订阅的模型,供锁释放时被唤醒。并且内部通过间接性的方式轮旋抢锁,时间间隔为当前线程结束时间的ttl
  • unlock释放锁时会往一个发布订阅的模型里面发送消息,关注了这个模型的线程接收到消息之后,则会被唤醒,从而的去进行加锁的操作

http://www.ppmy.cn/embedded/123273.html

相关文章

记录一次gRpc流式操作

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息) gRpc协议类定义 message AdMsgProto{ optional string msg1; optional string tag2; optional string topic3; } 2. service方法定义 service MQDataService{ rpc sendRedissonMsg(AdMsgProto)returns…

ubuntu图形界面右上角网络图标找回解决办法

问题现象&#xff1a; ubuntu图形界面右上角网络图标消失了&#xff0c;不方便联网&#xff1a; 正常应该是下图&#xff1a; 网络寻找解决方案&#xff0c;问题未解决&#xff0c;对于某些场景可能有用&#xff0c;引用过来&#xff1a; 参考方案 方法一 修改虚拟机的网络管…

JAVA智慧社区系统跑腿家政本地生活商城系统小程序源码

智慧社区系统集成跑腿家政与本地生活商城 —— 打造便捷高效的社区生活圈 &#x1f3e0; 智慧社区新时代&#xff1a;一站式服务新体验 在快节奏的都市生活中&#xff0c;智慧社区系统正悄然改变着我们的生活方式。它不再只是一个居住的空间&#xff0c;而是集成了跑腿家政、本…

【算法题】——数组、双指针

1、 Leecode题目&#xff1a;两个数组的交集&#xff08;查找 set&#xff09; 思路&#xff1a; 一个数组元素都放入到哈希表中 &#xff0c;然后看另一个数组中的元素在不在&#xff0c;最后结果存到unordered_set中转化成vector输出 set是insert vector是push_back class S…

小红书制作视频如何去原视频音乐,视频如何去原声保留背景音乐?

在视频编辑、音乐制作或个人娱乐中&#xff0c;有时我们希望去掉视频中的原声&#xff08;如对话、解说等&#xff09;&#xff0c;仅保留背景音乐。这种处理能让观众更加聚焦于视频的氛围或节奏&#xff0c;同时也为创作者提供了更多创意空间。选择恰当的背景音乐&#xff0c;…

MyBatisPlus——学习笔记

MyBatisPlus 一、导入依赖 <!-- MyBatisPlus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.2</version></dependency><!-- MySql --><de…

了解华为计算产品线,昇腾的业务都有哪些?

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 随着 ChatGPT 的现象级爆红&#xff0c;它引领了 AI 大模型时代的深刻变革&#xff0c;进而造成 AI 算力资源日益紧缺。与此同时&#xff0c;中美贸易战的持续也使得 AI 算力国产化适配成为必然趋势。 …

ChatGPT推出Canvas功能

"Canvas" 是 OpenAI 推出的全新界面&#xff0c;专为增强写作和编程协作而设计。它让用户能够在聊天之外更高效地进行编辑、审阅和反馈&#xff0c;提供了内联编辑、代码调试和文档版本控制等功能。目前&#xff0c;"Canvas" 已面向 ChatGPT Plus 和 Team …