【Redis进阶】一文搞懂Redisson的看门狗机制底层实现

news/2024/12/29 20:40:38/

文章目录

  • 1. 看门狗机制概述
  • 2. 源码解读
  • 3. 总结

1. 看门狗机制概述

看门狗机制是Redission提供的一种自动延期机制,这个机制使得Redission提供的分布式锁是可以自动续期的

private long lockWatchdogTimeout = 30 * 1000;

看门狗机制提供的默认超时时间是30*1000毫秒,也就是30秒

如果一个线程获取锁后,运行程序到释放锁所花费的时间大于锁自动释放时间(也就是看门狗机制提供的超时时间30s),那么Redission会自动给redis中的目标锁延长超时时间。

在Redission中想要启动看门狗机制,那么我们就不用获取锁的时候自己定义leaseTime(锁自动释放时间)

如果自己定义了锁自动释放时间的话,无论是通过lock还是tryLock方法,都无法启用看门狗机制。

但是,如果传入的leaseTime为-1,也是会开启看门狗机制的。

分布式锁是不能设置永不过期的,这是为了避免在分布式的情况下,一个节点获取锁之后宕机从而出现死锁的情况,所以需要个分布式锁设置一个过期时间。但是这样会导致一个线程拿到锁后,在锁的过期时间到达的时候程序还没运行完,导致锁超时释放了,那么其他线程就能获取锁进来,从而出现问题。

所以,看门狗机制的自动续期,就很好地解决了这一个问题。


2. 源码解读

进入tryLock方法,这里的tryLock(waitTime, -1, unit)有三个参数

  1. waitTime:获取锁的最大等待时间(没有传默认为-1)
  2. leaseTime:锁自动释放的时间(没有传的话默认-1)
  3. unit:时间的单位(等待时间和锁自动释放的时间单位)
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return tryLock(waitTime, -1, unit);
}
    @Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}
//        return get(tryLockAsync(waitTime, leaseTime, unit));}

这上面一坨主要是锁重试的代码,感兴趣可以看【Redis】4.万字文章带你深入Redisson与源码解读(建议收藏)——起名方面没有灵感的博客-CSDN博客

而看门狗机制的相关代码主要在tryAcquire方法上,在这个方法里主要看到方法是tryAcquireAsync(waitTime, leaseTime, unit, threadId)

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

由于在tryLock方法中没传leaseTime,所以leaseTime为默认值-1

调用tryLockInnerAsync,如果获取锁失败,返回的结果是这个key的剩余有效期,如果获取锁成功,则返回null。

获取锁成功后,如果检测不存在异常并且获取锁成功`(ttlRemaining == null)。

那么则执行this.scheduleExpirationRenewal(threadId);来启动看门狗机制。

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//如果获取锁失败,返回的结果是这个key的剩余有效期RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);//上面获取锁回调成功之后,执行这代码块的内容ttlRemainingFuture.onComplete((ttlRemaining, e) -> {//不存在异常if (e == null) {//剩余有效期为nullif (ttlRemaining == null) {//这个函数是解决最长等待有效期的问题this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return evalWriteAsync(getName(), LongCodec.INSTANCE, command,// 锁不存在,则往redis中设置锁信息"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 锁存在"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

一个锁就对应自己的一个ExpirationEntry类,

EXPIRATION_RENEWAL_MAP存放的是所有的所信息。

根据锁的名称从EXPIRATION_RENEWAL_MAP里面获取锁,如果存在这把锁则冲入,如果不存在,则将这个新锁放置进EXPIRATION_RENEWAL_MAP,并且开启看门狗机制。

private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();//这里EntryName是指锁的名称ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);if (oldEntry != null) {//重入//将线程ID加入oldEntry.addThreadId(threadId);} else {//将线程ID加入entry.addThreadId(threadId);//续约this.renewExpiration();}
}

首先,从EXPIRATION_RENEWAL_MAP中获取这个锁,接下来定义一个延迟任务task,这个任务的步骤如下

  1. 新创建了一个子线程去反复调用
  2. EXPIRATION_RENEWAL_MAP中获取这把锁,如果这把锁不存在了,说明被删除了,不在需要续期了。
  3. 从锁中获取获得这把锁的线程IDthreadId
  4. 调用renewExpirationAsync方法刷新最长等待时间
  5. 如果刷新成功,则进来递归调用这个函数renewExpiration()

这个任务task设置为 this.internalLockLeaseTime / 3L,也是锁自动释放时间,因为没传,也就是10s。

也就是说,这个延迟任务延迟十秒执行一次。

最后,为这把锁ee设置延迟任务task即可

private void renewExpiration() {//先从map里得到这个ExpirationEntryExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee != null) {//这个是一个延迟任务Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {//延迟任务内容public void run(Timeout timeout) throws Exception {//拿出ExpirationEntryExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());if (ent != null) {//从ExpirationEntry拿出线程IDLong threadId = ent.getFirstThreadId();if (threadId != null) {//调用renewExpirationAsync方法刷新最长等待时间RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);} else {if (res) {//renewExpirationAsync方法执行成功之后,进行递归调用,调用自己本身函数//那么就可以实现这样的效果//首先第一次进行这个函数,设置了一个延迟任务,在10s后执行//10s后,执行延迟任务的内容,刷新有效期成功,那么就会再新建一个延迟任务,刷新最长等待有效期//这样这个最长等待时间就会一直续费RedissonLock.this.renewExpiration();}}});}}}}, //这是锁自动释放时间,因为没传,所以是看门狗时间=30*1000//也就是10sthis.internalLockLeaseTime / 3L, //时间单位TimeUnit.MILLISECONDS);//给当前ExpirationEntry设置延迟任务ee.setTimeout(task);}
}// 刷新等待时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));
}

最后,在释放锁的时候,就会关闭所有的延迟任务,核心代码如下

public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise();RFuture<Boolean> future = this.unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {//取消锁更新任务this.cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);} else if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);result.tryFailure(cause);} else {result.trySuccess((Object)null);}});return result;
}void cancelExpirationRenewal(Long threadId) {//获得当前这把锁的任务ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (task != null) {//当前锁的延迟任务不为空,且线程id不为空if (threadId != null) {//先把线程ID去掉task.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {//然后取出延迟任务Timeout timeout = task.getTimeout();if (timeout != null) {//把延迟任务取消掉timeout.cancel();}//再把ExpirationEntry移除出mapEXPIRATION_RENEWAL_MAP.remove(this.getEntryName());}}
}

3. 总结

在使用Redis实现分布式锁的时候,会存在很多问题。

比如说业务逻辑处理时间>自己设置的锁自动释放时间的话,Redis就会按超时情况把锁释放掉,而其他线程就会趁虚而入抢夺锁从而出现问题,因此需要有一个续期的操作。

并且,如果释放锁的操作在finally完成,需要判断一下当前锁是否是属于自己的锁,防止释放掉其他线程的锁,这样释放锁的操作就不是原子性了,而这个问题很好解决,使用lua脚本即可。

Redisson的出现,其中的看门狗机制很好解决续期的问题,它的主要步骤如下:

  1. 在获取锁的时候,不能指定leaseTime或者只能将leaseTime设置为-1,这样才能开启看门狗机制。
  2. tryLockInnerAsync方法里尝试获取锁,如果获取锁成功调用scheduleExpirationRenewal执行看门狗机制
  3. scheduleExpirationRenewal中比较重要的方法就是renewExpiration,当线程第一次获取到锁(也就是不是重入的情况),那么就会调用renewExpiration方法开启看门狗机制。
  4. renewExpiration会为当前锁添加一个延迟任务task,这个延迟任务会在10s后执行,执行的任务就是将锁的有效期刷新为30s(这是看门狗机制的默认锁释放时间)
  5. 并且在任务最后还会继续递归调用renewExpiration

也就是总的流程就是,首先获取到锁(这个锁30s后自动释放),然后对锁设置一个延迟任务(10s后执行),延迟任务给锁的释放时间刷新为30s,并且还为锁再设置一个相同的延迟任务(10s后执行),这样就达到了如果一直不释放锁(程序没有执行完)的话,看门狗机制会每10s将锁的自动释放时间刷新为30s。

而当程序出现异常,那么看门狗机制就不会继续递归调用renewExpiration,这样锁会在30s后自动释放。

或者,在程序主动释放锁后,流程如下:

  1. 将锁对应的线程ID移除
  2. 接着从锁中获取出延迟任务,将延迟任务取消
  3. 在将这把锁从EXPIRATION_RENEWAL_MAP中移除。


http://www.ppmy.cn/news/32725.html

相关文章

AD域安全攻防实践(附攻防矩阵图)

以域控为基础架构&#xff0c;通过域控实现对用户和计算机资源的统一管理&#xff0c;带来便利的同时也成为了最受攻击者重点攻击的集权系统。 01、攻击篇 针对域控的攻击技术&#xff0c;在Windows通用攻击技术的基础上自成一套技术体系&#xff0c;将AD域攻防分为信息收集、权…

Keil MDK6要来了,将嵌入式软件开发水平带到新高度,支持跨平台(2023-03-11)

注&#xff1a;这个是MDK6&#xff0c;不是MDK5 AC6&#xff0c;属于下一代MDK视频版&#xff1a; https://www.bilibili.com/video/BV16s4y157WF Keil MDK6要来了&#xff0c;将嵌入式软件开发水平带到新高度&#xff0c;支持跨平台一年一度的全球顶级嵌入式会展Embedded Wor…

Category In Objective-C

Category In Objective-C 来源 Objective-C 2.0中新增的语言特性 可以用来做什么 ? 扩展已有的类 (仅限于为已有类增加方法) ; 分割类实现 ; 官方原文: Distribute the implementation of your own classes into separate source files—for example, you could group the…

关于类型转换

隐式转换先看个例子int a {500}; unsigned b {1000}; std::cout<<a-b;这里的输出结果并不为-500。因为最后输出结果的类型自动转换成了unsigned&#xff0c;unsigned是正整数型类型转换顺序表(由高到低)long doubledoublefloatunsigned long long long longunsigned long…

Java的jar包打包成exe应用

将springboot项目使用maven打出的jar包&#xff0c;打成windows平台下exe应用程序包&#xff08;自带jre环境&#xff09;。 工具&#xff1a;1、exe4j 2、Inno Setup 工具放到网盘&#xff0c;链接&#xff1a;https://pan.baidu.com/s/1ZHX8P7u-7GBxaC6uaIC8Ag 提取码&#x…

如何成为一名优秀的网络安全工程师?

前言 这是我的建议如何成为网络安全工程师&#xff0c;你应该按照下面顺序学习。 简要说明 第一件事你应该学习如何编程&#xff0c;我建议首先学python&#xff0c;然后是java。 &#xff08;非必须&#xff09;接下来学习一些算法和数据结构是很有帮助的&#xff0c;它将…

【面试题】闭包是什么?this 到底指向谁?

一通百通&#xff0c;其实函数执行上下文、作用域链、闭包、this、箭头函数是相互关联的&#xff0c;他们的特性并不是孤立的&#xff0c;而是相通的。因为内部函数可以访问外层函数的变量&#xff0c;所以才有了闭包的现象。箭头函数内没有 this 和 arguments&#xff0c;所以…

2023年网络安全最应该看的书籍,弯道超车,拒绝看烂书

学习的方法有很多种&#xff0c;看书就是一种不错的方法&#xff0c;但为什么总有人说&#xff1a;“看书是学不会技术的”。 其实就是书籍没选对&#xff0c;看的书不好&#xff0c;你学不下去是很正常的。 一本好书其实不亚于一套好的视频教程&#xff0c;尤其是经典的好书…