Redisson分布式锁整体流程图
Redisson分布式锁源码流程图
Redisson分布式锁源码解析
获取分布式锁lock
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//获取当前线程IDlong threadId = Thread.currentThread().getId();/*** 尝试获取分布式锁* a.如果获取到锁:返回null* b.如果没有获取到锁:返回当前分布式锁的剩余的过期时间*/Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}//ttl不为null说明锁被其他线程占用,没有获取到锁。订阅解锁消息CompletableFuture<RedissonLockEntry> future = subscribe(threadId);pubSub.timeout(future);RedissonLockEntry entry;//订阅解锁消息:如果分布式锁未进行解锁(pub解锁消息),当前线程进入阻塞状态。当接收到分布式锁的pub消息,当前线程被唤醒继续执行if (interruptibly) {entry = commandExecutor.getInterrupted(future);} else {entry = commandExecutor.get(future);}try {while (true) {//当接收到分布式锁的pub消息,当前线程被唤醒继续执行,继续尝试获取锁ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquired//获取成功跳出循环if (ttl == null) {break;}// waiting for messageif (ttl >= 0) {try {//阻塞ttl毫秒后继续执行entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {//线程被打断,直接跳出循环if (interruptibly) {throw e;}//线程被打断,但是没有设置打断跳出循环,则继续阻塞ttl毫秒后继续执行entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {entry.getLatch().acquire();} else {entry.getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(entry, threadId);}// get(lockAsync(leaseTime, unit));
}
详细步骤如下:
- 获取当前获取锁的线程ID。
- 调用
tryAcquire
方法尝试获取锁。
tryAcquire
方法返回null
,说明获取到了锁。tryAcquire
方法返回不是null
值,说明没有获取到了锁,返回的Long
值指的是其他线程占用该分布式锁的过期时间,单位为毫秒。- 未获取到锁的线程订阅(
sub
)解锁(pub
)消息,如果分布式锁未进行解锁(pub解锁消息),当前线程进入阻塞状态。当接收到分布式锁的pub消息,当前线程被唤醒继续执行,进入while
循环调用tryAcquire
方法继续争抢分布式锁。while
中:
- 抢到了分布式锁,则跳出循环,并执行finally语句块的取消订阅解锁消息
- 如果没有抢到分布式锁,则执行
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
方法阻塞分布式锁剩余时间ttl
毫秒后,继续while
循环争取分布式锁,直到抢到分布式锁。
尝试获取分布式锁tryAcquire
尝试获取分布式锁,获取到分布式锁后,开启一个开门狗,为分布式锁续期。为获取到分布式锁,返回分布式锁剩余的过期时间,毫秒为单位。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;//获取分布式锁//返回null则说明获取到了分布式锁//返回不为null说明没有获取到分布式锁,返回的是分布式锁的剩余失效时间if (leaseTime > 0) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}//解锁的CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);//如果获取到分布式锁,则使用看门狗进行锁的续期操作。默认过期时间是30秒,看门狗续期的时间间隔是过期时间的三分之一。CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);
}
详细步骤如下:
- 调用
tryLockInnerAsync
方法执行Lua
脚本获取锁,获取失败返回锁的过期时间。获取成功返回null
。- 获取分布式锁成功开启一个开门狗,进行锁的续期操作。默认过期时间是30秒,看门狗续期的时间间隔是过期时间的三分之一。
- 未获取到锁则返回锁的过期时间,单位毫秒。
获取分布式锁的lua
脚本
/*** 执行Lua脚本获取锁* 1.key是否存在* 2.重入锁+1* 3.重置锁的过期时间(默认30秒)*/
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if ((redis.call('exists', KEYS[1]) == 0) " +"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
详细步骤如下
- key是否存在
- 重入锁+1
- 重置锁的过期时间(默认30秒)
分布式锁续期
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}//执行分布式锁续期的lua脚本CompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock {} expiration", getRawName(), e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// reschedule itself//自动续期renewExpiration();} else {//取消分布式锁续期cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
分布式锁续期lua
脚本
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));
}