目录
Redisson%E7%9A%84%E5%9F%BA%E6%9C%AC%E4%BD%BF%E7%94%A8-toc" style="margin-left:0px;">Redisson的基本使用
Redisson%E7%9A%84%E5%9F%BA%E6%9C%AC%E5%8E%9F%E7%90%86-toc" style="margin-left:0px;">Redisson的基本原理
Redis中的使用
简单了解一下Lua脚本
加锁脚本
解锁脚本
看门口续期lua脚本
源码
tryLock方法
tryAcquireAsync方法
unlock方法
renewExpiration()方法
在一个进程的各个线程间保持数据的同步可以使用Lock、synchronized、CAS、ReentrantLock等,在进程间保持数据的同步就需要使用分布式锁。Redisson就是一种分布式锁。
Redisson是一个基于Redis的Java驻内存数据网格(In-Memory Data Grid),提供了丰富的分布式Java对象和服务,其中包括分布式锁。Redisson的分布式锁实现了java.util.concurrent.locks.Lock
接口,可以方便地在分布式环境中实现分布式锁的功能。
Redisson%E7%9A%84%E5%9F%BA%E6%9C%AC%E4%BD%BF%E7%94%A8" style="background-color:transparent;">Redisson的基本使用
导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.12.0</version></dependency>
编写配置
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://47.115.217.159:6379");//redis所在的服务器以及端口// 创建RedissonClient对象return Redisson.create(config);}}
使用
@Resource
private RedissionClient redissonClient;@Test
void testRedisson() throws Exception{//获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);//判断获取锁成功if(isLock){try{System.out.println("执行业务"); }finally{//释放锁lock.unlock();}}}
Redisson%E7%9A%84%E5%9F%BA%E6%9C%AC%E5%8E%9F%E7%90%86">Redisson的基本原理
Redis中的使用
Redisson借助redis的setnx和expire命令实现。
Redisson 使用了 Redis 的 SET key value [PX milliseconds]|[EX seconds] NX
命令来实现分布式锁,该命令的含义是:当且仅当 key 不存在时,将 key 的值设为 value ,并同时设置 key 的过期时间为 milliseconds 毫秒或者seconds秒。如果成功设置了值和过期时间,返回 OK
;如果 key 已经存在,返回 null。
上面这段操作我先get myLock,发现redis中不存在这个key,然后设置myLock的value为hello1,过期时间为100秒,可以看到第一次set的时候返回的是OK,第二次设置的时候返回的是nil(null的意思),然后及时的get,得到设置的值,在100秒之后,再去get,发现已经过期了,返回的是nil。
-
使用
SET key value [PX milliseconds]|[EX seconds] NX
命令尝试获取锁,其中key
是锁的唯一标识,value
可以是任意值(通常用来区分不同的客户端),PX milliseconds
表示设置 key 的过期时间为milliseconds
毫秒,EX seconds
表示设置key的过期时间为seconds秒,NX
表示仅当 key 不存在时才设置成功。 -
如果获取锁成功(
SET
命令返回OK
),则表示当前客户端获取到了锁,可以执行临界区代码;如果获取失败(SET
命令返回nil
),则表示锁已经被其他客户端持有,当前客户端需要等待或放弃获取锁。 -
在执行临界区代码期间,Redisson 会周期性地使用
EXPIRE key seconds
命令来更新锁的过期时间,确保即使临界区代码执行时间较长,锁也不会过期释放。(锁的续期) -
当临界区代码执行完成后,客户端使用
DEL key
命令来释放锁,即使当前客户端不持有锁(例如由于锁的过期时间已到),也可以调用该命令来释放锁。
简单了解一下Lua脚本
在Java中,Redisson使用lua脚本来保证加锁、解锁的原子操作。
想要真正读懂redisson底层的加锁解锁实现,基本的lua脚本还是要了解一下的,这里就简单的介绍一下,本人也了解的不多。
加锁脚本
-
KEYS[1] 锁的名字
-
ARGV[1] 锁自动失效时间(毫秒,默认30s(看门狗续期时长))
-
ARGV[2] value中hash子项的key(uuid+threadId)
--如果锁不存在
if (redis.call('exists', KEYS[1]) == 0) then
--重入次数初始为0后加一
redis.call('hincrby', KEYS[1], ARGV[2], 1);
--设锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
--返回null-代表加锁成功
return nil;
--结束符
end;
--如果加锁的进程已存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
--重入次数加一
redis.call('hincrby', KEYS[1], ARGV[2], 1);
--更新锁的过期时间(毫秒)
redis.call('pexpire', KEYS[1], ARGV[1]);
--返回null-代表重入成功
return nil;
--结束符
end;
--返回锁的剩余时间(毫秒)-代表加锁失败
return redis.call('pttl', KEYS[1]);
结论:当且仅当返回nil,才表示加锁成功;
解锁脚本
-
KEYS[1] 锁的名字
-
KEYS[2] 发布订阅的信道(
channel=redisson_lock__channel:{lock_name})
-
ARGV[1] 发布订阅中解锁消息
-
ARGV[2] 看门狗续期时间
-
ARGV[3] hash子项的(
key=uuid+threadId)、
--如果锁不存在
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
--返回null-代表解锁成功
return nil;
end;
--重入次数减一
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
--如果重入次数不为0,对锁进行续期(使用看门狗的续期时间,默认续期30s)
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
--返回0-代表锁的重入次数减一,解锁成功
return 0;
--否则重入次数<=0
else
--删除key
redis.call('del', KEYS[1]);
--向channel中发布删除key的消息
redis.call('publish', KEYS[2], ARGV[1]);
--返回1-代表锁被删除,解锁成功
return 1;
end;
return nil;
结论:当且仅当返回1,才表示当前请求真正解锁;
看门口续期lua脚本
如果我们在getLock的时候没有自己设置leaseTime,那么默认的过期时间就是30ms,每隔10毫秒持有Redisson分布式锁的进程会创建一个线程去判断同步代码块是否执行完成,如果没有,就将过期时间设置为30毫秒。如果在创建Redisson分布式锁的时候自己设置了leaseTime,就不会出发看门狗机制。
-
KEYS[1] 锁的名字
-
ARGV[1] 锁自动失效时间
-
ARGV[2] value中hash子项的key(uuid+threadId)
--自己加的锁存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
--续期
redis.call('pexpire', KEYS[1], ARGV[1]);
--1代表续期成功
return 1;
end;
--自己加的锁不存在,后续不需要再续期
return 0;
源码
在IDEA中shift+shift搜索RedissonLock,找到如下方法。
tryLock方法
// tryLock 是Redisson加锁的核心代码,在这里,我们基本可以了解加锁的整个逻辑流程
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 获取锁能容忍的最大等待时长long time = unit.toMillis(waitTime);// 获取当前系统时间long current = System.currentTimeMillis();// 获取当前线程idlong threadId = Thread.currentThread().getId();// 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {return true;}// 剩余等待时长 = 最大等待时长-(当前时间)time -= System.currentTimeMillis() - current;if (time <= 0) {//等待时间超时acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();// 【核心点2】获取锁失败之后订阅解锁消息,这是一个异步任务CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);try {// 以阻塞的方式获取订阅结果,最大等待时间time,在time时间之内代码一直停在这里subscribeFuture.toCompletableFuture().get(time, TimeUnit.MILLISECONDS);} catch (ExecutionException | TimeoutException e) {// 判断异步任务是否不存在,比如上面的阻塞等待没有获取到订阅结果if (!subscribeFuture.cancel(false)) {subscribeFuture.whenComplete((res, ex) -> {// 异步任务出现异常,取消订阅if (ex == null) {unsubscribe(res, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {// 剩余等待时长time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 循环获取锁 while (true) {long currentTime = System.currentTimeMillis();// 再次获取锁,成功则返回ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}currentTime = System.currentTimeMillis();// 【核心点3】阻塞等待信号量唤醒或者超时,接收到订阅时唤醒// 使用的是Semaphore#tryAcquire()// 判断 锁的占有时间(ttl)是否小于等待时间 if (ttl >= 0 && ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {// 因为是同步操作,所以无论加锁成功或失败,都取消订阅unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);}
}
tryAcquireAsync方法
/*** 异步的方式尝试获取锁*/
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;// 占有时间等于 -1 表示会一直持有锁,直到业务进行完成,主动解锁(这里就显示出了finally的重要性)if (leaseTime != -1) {// 【核心点4】这里就是直接使用lua脚本ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime != -1) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}
核心点4对应的lua脚本
/*** redisson最底层就是lua脚本的直接调用* 这里是使用lua脚本进行加锁*/
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
unlock方法
/**
* 解锁逻辑
*/
@Override
public void unlock() {try {// 以线程阻塞的方式获取结果get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}
}
异步解锁
// 异步解锁
@Override
public RFuture<Void> unlockAsync(long threadId) {// 【核心点5】 调用异步解锁方法--使用lua脚本 RFuture<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);
}
核心点5所对应的解锁的lua脚本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//Redisson解锁lua脚本
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});}
renewExpiration()方法
private void renewExpiration() {ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee != null) {Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());if (ent != null) {Long threadId = ent.getFirstThreadId();if (threadId != null) {//【核心点6】锁续约的核心代码RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);} else {if (res) {RedissonLock.this.renewExpiration();}}});}}}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);ee.setTimeout(task);}}
核心店6对应的锁续期的lua脚本代码
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//lua脚本
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});}