归档
Unit-Test
说明
@Override
public void lock() { lock(-1, null, false);
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(-1, leaseTime, unit, threadId);if (ttl == null) {return; }CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future);RedissonLockEntry entry;if (interruptibly) {entry = commandExecutor.getInterrupted(future);} else { entry = commandExecutor.get(future);}try {while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId);if (ttl == null) { break;}...}} finally {unsubscribe(entry, threadId);}
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else { ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {if (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else { scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if ((redis.call('exists', KEYS[1]) == 0) " + "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " +"return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
protected void scheduleExpirationRenewal(long threadId) {...try {renewExpiration(); } finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId); }}
}
private void renewExpiration() {...Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {...CompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) { EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {renewExpiration(); } else {cancelExpirationRenewal(null); }});} }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
}
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " +"return 0;", Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));
}
流程说明
- 加锁成功则返回,同时内部开启续约任务(每 10s 一次,续约 30s TTL)
- 加锁失败,则订阅通道,以获知别的线程释放锁的通知
Ref
- https://zhuanlan.zhihu.com/p/135864820