Redisson分布式锁

devtools/2024/11/10 11:38:25/

目录

Redisson%E7%9A%84%E5%9F%BA%E6%9C%AC%E4%BD%BF%E7%94%A8-toc" style="margin-left:0px;">Redisson的基本使用

Redisson%E7%9A%84%E5%9F%BA%E6%9C%AC%E5%8E%9F%E7%90%86-toc" style="margin-left:0px;">Redisson的基本原理

Redis中的使用

简单了解一下Lua脚本

加锁脚本

解锁脚本

看门口续期lua脚本 

源码

tryLock方法

tryAcquireAsync方法

unlock方法

 renewExpiration()方法


在一个进程的各个线程间保持数据的同步可以使用Lock、synchronized、CAS、ReentrantLock等,在进程间保持数据的同步就需要使用分布式锁。Redisson就是一种分布式锁。

Redisson是一个基于Redis的Java驻内存数据网格(In-Memory Data Grid),提供了丰富的分布式Java对象和服务,其中包括分布式锁。Redisson分布式锁实现了java.util.concurrent.locks.Lock接口,可以方便地在分布式环境中实现分布式锁的功能。

Redisson%E7%9A%84%E5%9F%BA%E6%9C%AC%E4%BD%BF%E7%94%A8" style="background-color:transparent;">Redisson的基本使用

导入依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.12.0</version></dependency>

编写配置

@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://47.115.217.159:6379");//redis所在的服务器以及端口// 创建RedissonClient对象return Redisson.create(config);}}

使用

@Resource
private RedissionClient redissonClient;@Test
void testRedisson() throws Exception{//获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);//判断获取锁成功if(isLock){try{System.out.println("执行业务");          }finally{//释放锁lock.unlock();}}}

Redisson%E7%9A%84%E5%9F%BA%E6%9C%AC%E5%8E%9F%E7%90%86">Redisson的基本原理

Redis中的使用

Redisson借助redis的setnx和expire命令实现。

Redisson 使用了 Redis 的 SET key value [PX milliseconds]|[EX seconds] NX 命令来实现分布式锁,该命令的含义是:当且仅当 key 不存在时,将 key 的值设为 value ,并同时设置 key 的过期时间为 milliseconds 毫秒或者seconds秒。如果成功设置了值和过期时间,返回 OK;如果 key 已经存在,返回 null。

上面这段操作我先get myLock,发现redis中不存在这个key,然后设置myLock的value为hello1,过期时间为100秒,可以看到第一次set的时候返回的是OK,第二次设置的时候返回的是nil(null的意思),然后及时的get,得到设置的值,在100秒之后,再去get,发现已经过期了,返回的是nil。

具体来说,Redisson 实现分布式锁的过程如下:

  1. 使用 SET key value [PX milliseconds]|[EX seconds] NX 命令尝试获取锁,其中 key 是锁的唯一标识,value 可以是任意值(通常用来区分不同的客户端),PX milliseconds 表示设置 key 的过期时间为 milliseconds 毫秒,EX seconds 表示设置key的过期时间为seconds秒, NX 表示仅当 key 不存在时才设置成功。

  2. 如果获取锁成功(SET 命令返回 OK),则表示当前客户端获取到了锁,可以执行临界区代码;如果获取失败(SET 命令返回 nil),则表示锁已经被其他客户端持有,当前客户端需要等待或放弃获取锁。

  3. 在执行临界区代码期间,Redisson 会周期性地使用 EXPIRE key seconds命令来更新锁的过期时间,确保即使临界区代码执行时间较长,锁也不会过期释放。(锁的续期)

  4. 当临界区代码执行完成后,客户端使用 DEL key 命令来释放锁,即使当前客户端不持有锁(例如由于锁的过期时间已到),也可以调用该命令来释放锁。

简单了解一下Lua脚本

在Java中,Redisson使用lua脚本来保证加锁、解锁的原子操作。

想要真正读懂redisson底层的加锁解锁实现,基本的lua脚本还是要了解一下的,这里就简单的介绍一下,本人也了解的不多。

加锁脚本

  • KEYS[1] 锁的名字

  • ARGV[1] 锁自动失效时间(毫秒,默认30s(看门狗续期时长))

  • ARGV[2] value中hash子项的key(uuid+threadId)

--如果锁不存在
if (redis.call('exists', KEYS[1]) == 0) then
--重入次数初始为0后加一
redis.call('hincrby', KEYS[1], ARGV[2], 1);
--设锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
--返回null-代表加锁成功
return nil;
--结束符
end;
--如果加锁的进程已存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
--重入次数加一
redis.call('hincrby', KEYS[1], ARGV[2], 1);
--更新锁的过期时间(毫秒)
redis.call('pexpire', KEYS[1], ARGV[1]);
--返回null-代表重入成功
return nil;
--结束符
end;
--返回锁的剩余时间(毫秒)-代表加锁失败
return redis.call('pttl', KEYS[1]);

结论:当且仅当返回nil,才表示加锁成功;

解锁脚本

  • KEYS[1] 锁的名字

  • KEYS[2] 发布订阅的信道(channel=redisson_lock__channel:{lock_name})

  • ARGV[1] 发布订阅中解锁消息

  • ARGV[2] 看门狗续期时间

  • ARGV[3] hash子项的(key=uuid+threadId)、

--如果锁不存在
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
--返回null-代表解锁成功
return nil;
end;
--重入次数减一
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
--如果重入次数不为0,对锁进行续期(使用看门狗的续期时间,默认续期30s)
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
--返回0-代表锁的重入次数减一,解锁成功
return 0;
--否则重入次数<=0
else
--删除key
redis.call('del', KEYS[1]);
--向channel中发布删除key的消息
redis.call('publish', KEYS[2], ARGV[1]);
--返回1-代表锁被删除,解锁成功
return 1;
end;
return nil;

结论:当且仅当返回1,才表示当前请求真正解锁;

看门口续期lua脚本 

如果我们在getLock的时候没有自己设置leaseTime,那么默认的过期时间就是30ms,每隔10毫秒持有Redisson分布式锁的进程会创建一个线程去判断同步代码块是否执行完成,如果没有,就将过期时间设置为30毫秒。如果在创建Redisson分布式锁的时候自己设置了leaseTime,就不会出发看门狗机制。

  • KEYS[1] 锁的名字

  • ARGV[1] 锁自动失效时间

  • ARGV[2] value中hash子项的key(uuid+threadId)

--自己加的锁存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
--续期
redis.call('pexpire', KEYS[1], ARGV[1]);
--1代表续期成功
return 1;
end;
--自己加的锁不存在,后续不需要再续期
return 0;

源码

在IDEA中shift+shift搜索RedissonLock,找到如下方法。

tryLock方法

// tryLock 是Redisson加锁的核心代码,在这里,我们基本可以了解加锁的整个逻辑流程
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 获取锁能容忍的最大等待时长long time = unit.toMillis(waitTime);// 获取当前系统时间long current = System.currentTimeMillis();// 获取当前线程idlong threadId = Thread.currentThread().getId();// 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {return true;}// 剩余等待时长 =   最大等待时长-(当前时间)time -= System.currentTimeMillis() - current;if (time <= 0) {//等待时间超时acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();// 【核心点2】获取锁失败之后订阅解锁消息,这是一个异步任务CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);try {// 以阻塞的方式获取订阅结果,最大等待时间time,在time时间之内代码一直停在这里subscribeFuture.toCompletableFuture().get(time, TimeUnit.MILLISECONDS);} catch (ExecutionException | TimeoutException e) {// 判断异步任务是否不存在,比如上面的阻塞等待没有获取到订阅结果if (!subscribeFuture.cancel(false)) {subscribeFuture.whenComplete((res, ex) -> {// 异步任务出现异常,取消订阅if (ex == null) {unsubscribe(res, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {// 剩余等待时长time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 循环获取锁      while (true) {long currentTime = System.currentTimeMillis();// 再次获取锁,成功则返回ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}currentTime = System.currentTimeMillis();// 【核心点3】阻塞等待信号量唤醒或者超时,接收到订阅时唤醒// 使用的是Semaphore#tryAcquire()// 判断 锁的占有时间(ttl)是否小于等待时间  if (ttl >= 0 && ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {// 因为是同步操作,所以无论加锁成功或失败,都取消订阅unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);}
}

tryAcquireAsync方法

/*** 异步的方式尝试获取锁*/
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;// 占有时间等于 -1 表示会一直持有锁,直到业务进行完成,主动解锁(这里就显示出了finally的重要性)if (leaseTime != -1) {// 【核心点4】这里就是直接使用lua脚本ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime != -1) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}

核心点4对应的lua脚本

/*** redisson最底层就是lua脚本的直接调用* 这里是使用lua脚本进行加锁*/
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}

unlock方法

/**
* 解锁逻辑
*/
@Override
public void unlock() {try {// 以线程阻塞的方式获取结果get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}
}

异步解锁

// 异步解锁
@Override
public RFuture<Void> unlockAsync(long threadId) {// 【核心点5】 调用异步解锁方法--使用lua脚本     RFuture<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);
}

核心点5所对应的解锁的lua脚本

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
//Redisson解锁lua脚本
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
return nil;
end; 
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
redis.call('pexpire', KEYS[1], ARGV[2]); 
return 0; 
else redis.call('del', KEYS[1]); 
redis.call('publish', KEYS[2], ARGV[1]); 
return 1; 
end; 
return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});}

 renewExpiration()方法

private void renewExpiration() {ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee != null) {Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());if (ent != null) {Long threadId = ent.getFirstThreadId();if (threadId != null) {//【核心点6】锁续约的核心代码RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);} else {if (res) {RedissonLock.this.renewExpiration();}}});}}}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);ee.setTimeout(task);}}

核心店6对应的锁续期的lua脚本代码

protected RFuture<Boolean> renewExpirationAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
//lua脚本
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
redis.call('pexpire', KEYS[1], ARGV[1]); 
return 1; 
end; 
return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});}


http://www.ppmy.cn/devtools/23848.html

相关文章

navicat连接postgresql报错解决方案

navicat连接postgresql报错解决方案 问题描述原因分析&#xff1a;解决方案&#xff1a;1、将navicat升级到16.2以上版本2、降级pgsql3、修改dll配置文件 问题描述 使用Navicat连接postgresql时&#xff0c;出现如下错误。 原因分析&#xff1a; 由于pgsql 15版本以后&#…

一文掌握python面向对象魔术方法(一)

目录 Python 中的魔术方法(Magic Methods)是一系列以双下划线开头和结尾的方法,它们在特定场景下会被 Python 解释器自动调用。这些方法让开发者可以定制类的行为,模拟类似内置类型的特性。 一、初始化和清理: 1、构造方法 __init__(self, ...): 它是类中定义的一个构造…

线上申报开放时间!2024年阜阳市大数据企业培育认定申报条件、流程和材料

2024年阜阳市大数据企业培育认定申报条件、流程和材料&#xff0c;线上申报开放时间整理如下 一、2024年阜阳市大数据企业培育认定申报要求 &#xff08;一&#xff09;经营范围 申请认定的企业应当从事以下生产经营活动&#xff1a; 1.从事数据收集、存储、使用、加工、传输、…

Rust检查一个Vec<String>是否包含一个特定的子字符串

在Rust中&#xff0c;你可以使用contains方法来检查一个Vec<&str>是否包含特定的字符串。但是&#xff0c;如果你想检查一个Vec是否包含一个特定的子字符串&#xff0c;你需要先将子字符串转换为String。 以下是一个示例代码&#xff0c;展示了如何检查一个Vec是否包…

谷歌TPU(Tensor Processing Unit)

谷歌TPU&#xff08;Tensor Processing Unit&#xff09; https://cloud.google.com/tpu/docs/intro-to-tpu?hlzh-cn CPU的工作模式和GPU工作模式的区别 CPU 最大的优点是它们的灵活性。您可以在 CPU 上为许多不同类型的应用加载任何类型的软件。对于每次计算&#xff0c;CPU…

ansible安装教程

一、启动系统前&#xff0c;加一块光驱&#xff0c;把安装镜像文件放到光驱中 二、配置Yum文件 [rootlocalhost ~]# cd /etc/yum.repos.d/ [rootlocalhost yum.repos.d]# rm -f * [rootlocalhost yum.repos.d]# vi cdrom.repo [BaseOS] nameBaseOS baseurlfile:///media/BaseO…

在idea中连接mysql

IDE&#xff08;集成开发环境&#xff09;是一种软件应用程序&#xff0c;它为开发者提供编程语言的开发环境&#xff0c;通常集成了编码、编译、调试和运行程序的多种功能。一个好的IDE可以大幅提高开发效率&#xff0c;尤其是在进行大型项目开发时。IDE通常包括以下几个核心组…

如何实现直播声卡反向给手机充电功能呢?

在数字化时代的浪潮中&#xff0c;声卡作为多媒体系统的核心组件&#xff0c;扮演着声波与数字信号相互转换的关键角色。它不仅能够将来自各类音源的原始声音信号转换为数字信号&#xff0c;进而输出到各类声响设备&#xff0c;更能够通过音乐设备数字接口(MIDI)发出合成乐器的…