分布式锁—5.Redisson的读写锁一

devtools/2025/3/10 17:16:01/

大纲

1.Redisson读写锁RedissonReadWriteLock概述

2.读锁RedissonReadLock的获取读锁逻辑

3.写锁RedissonWriteLock的获取写锁逻辑

4.读锁RedissonReadLock的读读不互斥逻辑

5.RedissonReadLock和RedissonWriteLock的读写互斥逻辑

6.写锁RedissonWriteLock的写写互斥逻辑

7.写锁RedissonWriteLock的可重入逻辑

8.读锁RedissonReadLock的释放读锁逻辑

9.写锁RedissonWriteLock的释放写锁逻辑

1.Redisson读写锁RedissonReadWriteLock概述

(1)RedissonReadWriteLock的简介

(2)RedissonReadWriteLock的使用

(3)RedissonReadWriteLock的初始化

(1)RedissonReadWriteLock的简介

RedissonReadWriteLock提供了两个方法分别获取读锁和写锁。

RedissonReadWriteLock的readLock()方法可以获取读锁RedissonReadLock。

RedissonReadWriteLock的writeLock()方法可以获取写锁RedissonWriteLock。

由于RedissonReadLock和RedissonWriteLock都是RedissonLock的子类,所以只需关注RedissonReadLock和RedissonWriteLock的如下内容即可。

一是获取读锁(写锁)的lua脚本逻辑

二是释放读锁(写锁)的lua脚本逻辑

三是读锁(写锁)的WathDog检查读锁(写锁)和处理锁过期时间的逻辑

(2)RedissonReadWriteLock的使用

//读写锁
RedissonClient redisson = Redisson.create(config);
RReadWriteLock rwlock = redisson.getReadWriteLock("myLock");
rwlock.readLock().lock();//获取读锁
rwlock.readLock().unlock();//释放读锁
rwlock.writeLock().lock();//获取写锁
rwlock.writeLock().unlock();//释放写锁---------------------------------------------------------------//如果没有主动释放锁的话,10秒后将会自动释放锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
rwlock.writeLock().lock(10, TimeUnit.SECONDS);//加锁等待最多是100秒;加锁成功后如果没有主动释放锁的话,锁会在10秒后自动释放
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);

(3)RedissonReadWriteLock的初始化

RedissonReadWriteLock实现了RReadWriteLock接口,RedissonReadLock实现了RLock接口,RedissonWriteLock实现了RLock接口。

public class Redisson implements RedissonClient {//Redis的连接管理器,封装了一个Config实例protected final ConnectionManager connectionManager;//Redis的命令执行器,封装了一个ConnectionManager实例protected final CommandAsyncExecutor commandExecutor;...protected Redisson(Config config) {this.config = config;Config configCopy = new Config(config);//初始化Redis的连接管理器connectionManager = ConfigSupport.createConnectionManager(configCopy);...  //初始化Redis的命令执行器commandExecutor = new CommandSyncService(connectionManager, objectBuilder);...}@Overridepublic RReadWriteLock getReadWriteLock(String name) {return new RedissonReadWriteLock(commandExecutor, name);}...
}public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);}@Overridepublic RLock readLock() {return new RedissonReadLock(commandExecutor, getRawName());}@Overridepublic RLock writeLock() {return new RedissonWriteLock(commandExecutor, getRawName());}
}public class RedissonReadLock extends RedissonLock implements RLock {public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);}...
}public class RedissonWriteLock extends RedissonLock implements RLock {protected RedissonWriteLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);}...
}

2.读锁RedissonReadLock的获取读锁逻辑

(1)加读锁的lua脚本逻辑

(2)WathDog处理读锁过期时间的lua脚本逻辑

(1)加读锁的lua脚本逻辑

假设客户端A的线程(UUID1:ThreadID1)作为第一个线程进来加读锁,执行流程如下:

public class RedissonLock extends RedissonBaseLock {...//不带参数的加锁public void lock() {...lock(-1, null, false);...}//带参数的加锁public void lock(long leaseTime, TimeUnit unit) {...lock(leaseTime, unit, false);...}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(-1, leaseTime, unit, threadId);//加锁成功if (ttl == null) {return;}//加锁失败...}private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime != -1) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法//公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法//读写锁中的读锁,接下来调用RedissonReadLock.tryLockInnerAsync()方法ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}//对RFuture<Long>类型的ttlRemainingFuture添加回调监听CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {//tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑://加锁成功if (ttlRemaining == null) {if (leaseTime != -1) {//如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务internalLockLeaseTime = unit.toMillis(leaseTime);} else {//创建定时调度任务scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}...
}public class RedissonReadLock extends RedissonLock implements RLock {...@Override<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//执行命令"hget myLock mode",尝试获取一个Hash值mode"local mode = redis.call('hget', KEYS[1], 'mode'); " +//mode为false则执行加读锁的逻辑"if (mode == false) then " +//hset myLock mode read"redis.call('hset', KEYS[1], 'mode', 'read'); " +//hset myLock UUID1:ThreadID1 1"redis.call('hset', KEYS[1], ARGV[2], 1); " +//set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1"redis.call('set', KEYS[2] .. ':1', 1); " +//pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +//pexpire myLock 30000"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果已经有线程加了读锁 或者 有线程加了写锁且是自己加的写锁//所以一个线程如果加了写锁,它是可以重入自己的写锁和自己的读锁的"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +//hincrby myLock UUID2:ThreadID2 1//ind表示重入次数,线程可以重入自己的读锁和写锁,线程后加的读锁可以重入线程自己的读锁或写锁"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //key = {myLock}:UUID2:ThreadID2:rwlock_timeout:1"local key = KEYS[2] .. ':' .. ind;" +//set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1"redis.call('set', key, 1); " +//pexpire myLock 30000"redis.call('pexpire', key, ARGV[1]); " +"local remainTime = redis.call('pttl', KEYS[1]); " +//pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +"return nil; " +"end;" +//执行命令"pttl myLock",返回myLock的剩余过期时间"return redis.call('pttl', KEYS[1]);",//KEYS[1] = myLock//KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout 或 KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeoutArrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),unit.toMillis(leaseTime),//ARGV[1] = 30000getLockName(threadId),//ARGV[2] = UUID1:ThreadID1 或 ARGV[2] = UUID2:ThreadID2getWriteLockName(threadId)//ARGV[3] = UUID1:ThreadID1:write 或 ARGV[3] = UUID2:ThreadID2:write);}...
}

一.参数说明

KEYS[1] = myLock
KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout
ARGV[1] = 30000
ARGV[2] = UUID1:ThreadID1
ARGV[3] = UUID1:ThreadID1:write

二.执行lua脚本的获取读锁逻辑

首先执行命令"hget myLock mode",尝试获取一个Hash值mode,也就是从key为myLock的Hash值里获取一个field为mode的value值。但是此时一开始都还没有加锁,所以mode肯定是false。于是就执行如下加读锁的逻辑:设置两个Hash值 + 设置一个字符串。

hset myLock mode read
//用来记录当前客户端线程重入锁的次数
hset myLock UUID1:ThreadID1 1
//用来记录当前客户端线程第1个重入锁过期时间
set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1
pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000
pexpire myLock 30000

执行完加读锁逻辑后,Redis存在如下结构的数据。其实加读锁的核心在于构造一个递增序列,记录不同线程的读锁和同一个线程不同的重入锁。

field为类似于UUID1:ThreadID1的value值,是用来记录当前客户端线程重入锁次数的。key为类似于{myLock}:UUID1:ThreadID1:rwlock_timeout:1的String,是用来记录当前客户端线程第n个重入锁过期时间的。

假设将key为myLock称为父读锁,key为UUID1:ThreadID1称为子读锁。那么记录每一个子读锁的过期时间,是因为需要根据多个子读锁的过期时间更新父读锁的过期时间。

//1.线程1第一次加读锁
//Hash结构
myLock: {"mode": "read","UUID1:ThreadID1": 1
}
//String结构
{myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1//2.线程1第二次加读锁
//Hash结构
myLock: {"mode": "read","UUID1:ThreadID1": 2
}
//String结构
{myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
{myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1//3.线程1第三次加读锁
//Hash结构
myLock: {"mode": "read","UUID1:ThreadID1": 3
}
//String结构
{myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
{myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
{myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1//4.线程2第一次加读锁
//Hash结构
myLock: {"mode": "read","UUID1:ThreadID1": 3,"UUID2:ThreadID2": 1
}
//String结构
{myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
{myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
{myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1
{myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1//5.线程2第二次加读锁
//Hash结构
myLock: {"mode": "read","UUID1:ThreadID1": 3,"UUID2:ThreadID2": 2
}
//String结构
{myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
{myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
{myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1
{myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
{myLock}:UUID2:ThreadID2:rwlock_timeout:2 ==> 1

(2)WathDog处理读锁过期时间的lua脚本逻辑

假设客户端A的线程(UUID1:ThreadID1)已经成功获取到一个读锁,此时会创建一个WatchDog定时调度任务,10秒后检查该读锁。执行流程如下:

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {...protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {//创建一个更新过期时间的定时调度任务renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}}//更新过期时间private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}//使用了Netty的定时任务机制:HashedWheelTimer + TimerTask + Timeout//创建一个更新过期时间的定时调度任务,下面会调用MasterSlaveConnectionManager.newTimeout()方法//即创建一个定时调度任务TimerTask交给HashedWheelTimer,10秒后执行Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}//异步执行lua脚本去更新锁的过期时间//对于读写锁,接下来会执行RedissonReadLock.renewExpirationAsync()方法RFuture<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getRawName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}//res就是执行renewExpirationAsync()里的lua脚本的返回值if (res) {//重新调度自己renewExpiration();} else {//执行清理工作cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}protected void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) {return;}if (threadId != null) {task.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) {timeout.cancel();}EXPIRATION_RENEWAL_MAP.remove(getEntryName());}}...
}public class RedissonReadLock extends RedissonLock implements RLock {...@Overrideprotected RFuture<Boolean> renewExpirationAsync(long threadId) {String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//执行命令"hget myLock UUID1:ThreadID1",获取当前这个线程是否还持有这个读锁"local counter = redis.call('hget', KEYS[1], ARGV[2]); " +"if (counter ~= false) then " +//指定的线程还在持有锁,那么就执行"pexpire myLock 30000"刷新锁的过期时间"redis.call('pexpire', KEYS[1], ARGV[1]); " +"if (redis.call('hlen', KEYS[1]) > 1) then " +//获取key为myLock的Hash值的所有key"local keys = redis.call('hkeys', KEYS[1]); " + //遍历已被线程获取的所有重入和非重入的读锁"for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + //排除掉key为mode的Hash值"if type(counter) == 'number' then " + //递减拼接重入锁的key,刷新同一个线程的所有重入锁的过期时间"for i=counter, 1, -1 do " + "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + "end; " + "end; " + "end; " +"end; " +"return 1; " +"end; " +"return 0;",//KEYS[1] = myLock//KEYS[2] = {myLock}Arrays.<Object>asList(getRawName(), keyPrefix),internalLockLeaseTime,//ARGV[1] = 30000毫秒getLockName(threadId)//ARGV[2] = UUID1:ThreadID1);}...
}

一.参数说明

KEYS[1] = myLock
KEYS[2] = {myLock}
ARGV[1] = 30000
ARGV[2] = UUID1:ThreadID1

二.执行lua脚本的处理逻辑

执行命令"hget myLock UUID1:ThreadID1",尝试获取一个Hash值,也就是获取指定的这个线程是否还持有这个读锁。如果指定的这个线程还在持有这个锁,那么这里返回的是1,于是就会执行"pexpire myLock 30000"刷新锁的过期时间。

接着执行命令"hlen myLock",判断key为锁名的Hash元素个数是否大于1。如果指定的这个线程还在持有这个锁,那么key为myLock的Hash值就至少有两个kv对。其中一个key是mode,一个key是UUID1:ThreadID1。所以这里的判断是成立的,于是遍历处理key为锁名的Hash值。

在遍历处理key为锁名的Hash值时,需要排除掉key为mode的Hash值。然后根据key为UUID + 线程ID的Hash值,通过递减拼接,进行循环遍历,把每一个不同线程的读锁或同一个线程不同的重入锁,都刷新过期时间。

三.总结

WatchDog在处理读锁时,如果指定的线程还持有读锁,那么就会:刷新读锁key的过期时间为30秒,根据重入读锁的次数进行遍历,对重入读锁对应的key的过期时间也刷新为30秒。

//KEYS[1] = myLock
//KEYS[2] = {myLock}
"if (redis.call('hlen', KEYS[1]) > 1) then " +"local keys = redis.call('hkeys', KEYS[1]); " + //遍历处理key为锁名的Hash值"for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + //排除掉key为mode的Hash值"if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + //递减拼接,把不同线程的读锁或同一个线程不同的重入锁,都刷新过期时间"redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + "end; " + "end; " + "end; " +
"end; " +//Hash结构
myLock: {"mode": "read","UUID1:ThreadID1": 3,"UUID2:ThreadID2": 2
}
//String结构
{myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
{myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
{myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1
{myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
{myLock}:UUID2:ThreadID2:rwlock_timeout:2 ==> 1

3.写锁RedissonWriteLock的获取写锁逻辑

(1)获取写锁的执行流程

(2)获取写锁的lua脚本逻辑

(1)获取写锁的执行流程

假设客户端A的线程(UUID1:ThreadID1)作为第一个线程进来加写锁,执行流程如下:

public class RedissonLock extends RedissonBaseLock {...//不带参数的加锁public void lock() {...lock(-1, null, false);...}//带参数的加锁public void lock(long leaseTime, TimeUnit unit) {...lock(leaseTime, unit, false);...}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(-1, leaseTime, unit, threadId);//加锁成功if (ttl == null) {return;}//加锁失败...}private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime != -1) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法//公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法//读写锁中的读锁,接下来调用RedissonReadLock.tryLockInnerAsync()方法//读写锁中的写锁,接下来调用RedissonWriteLock.tryLockInnerAsync()方法ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}//对RFuture<Long>类型的ttlRemainingFuture添加回调监听CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {//tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑://加锁成功if (ttlRemaining == null) {if (leaseTime != -1) {//如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务internalLockLeaseTime = unit.toMillis(leaseTime);} else {//创建定时调度任务scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}...
}public class RedissonWriteLock extends RedissonLock implements RLock {...@Override<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//执行命令"hget myLock mode",尝试获取一个Hash值mode"local mode = redis.call('hget', KEYS[1], 'mode'); " +//获取不到,说明没有加读锁或者写锁"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'write'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果加过锁,那么就要看是不是写锁 + 写锁是不是自己加过的(即重入写锁)"if (mode == 'write') then " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//重入写锁"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +"return nil; " +"end; " +"end;" +//执行命令"pttl myLock",返回myLock的剩余过期时间"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getRawName()),//KEYS[1] = myLockunit.toMillis(leaseTime),//ARGV[1] = 30000getLockName(threadId)//ARGV[2] = UUID1:ThreadID1:write);}...
}

(2)获取写锁的lua脚本逻辑

一.参数说明

KEYS[1] = myLock
ARGV[1] = 30000
ARGV[2] = UUID1:ThreadID1:write

二.执行分析

首先执行命令"hget myLock mode",尝试获取一个Hash值mode,也就是从key为myLock的Hash值里获取一个field为mode的value值。但是此时一开始都还没有加锁,所以mode肯定是false。于是就执行如下加读锁的逻辑:设置两个Hash值。

hset myLock mode write
hset myLock UUID1:ThreadID1:write 1
pexpire myLock 30000

完成加锁操作后,Redis中存在如下数据:

//Hash结构
myLock: {"mode": "write","UUID1:ThreadID1:write": 1
}

4.读锁RedissonReadLock的读读不互斥逻辑

(1)不同客户端线程读锁与读锁不互斥说明

(2)客户端A先加读锁的Redis命令执行过程和结果

(3)客户端B后加读锁的Redis命令执行过程和结果

(1)不同客户端线程读锁与读锁不互斥说明

假设客户端A(UUID1:ThreadID1)对myLock这个锁先加了一个读锁,客户端B(UUID2:ThreadID2)也要对myLock这个锁加一个读锁,那么此时这两个读锁是不会互斥的,客户端B可以加锁成功。

public class RedissonReadLock extends RedissonLock implements RLock {...@Override<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//执行命令"hget myLock mode",尝试获取一个Hash值mode"local mode = redis.call('hget', KEYS[1], 'mode'); " +//mode为false则执行加读锁的逻辑"if (mode == false) then " +//hset myLock mode read"redis.call('hset', KEYS[1], 'mode', 'read'); " +//hset myLock UUID1:ThreadID1 1"redis.call('hset', KEYS[1], ARGV[2], 1); " +//set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1"redis.call('set', KEYS[2] .. ':1', 1); " +//pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +//pexpire myLock 30000"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果已经有线程加了读锁 或者 有线程加了写锁且是自己加的写锁//所以一个线程如果加了写锁,它是可以重入自己的写锁和自己的读锁的"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +//hincrby myLock UUID2:ThreadID2 1//ind表示重入次数,线程可以重入自己的读锁和写锁,线程后加的读锁可以重入线程自己的读锁或写锁"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //key = {myLock}:UUID2:ThreadID2:rwlock_timeout:1"local key = KEYS[2] .. ':' .. ind;" +//set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1"redis.call('set', key, 1); " +//pexpire myLock 30000"redis.call('pexpire', key, ARGV[1]); " +"local remainTime = redis.call('pttl', KEYS[1]); " +//pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +"return nil; " +"end;" +//执行命令"pttl myLock",返回myLock的剩余过期时间"return redis.call('pttl', KEYS[1]);",//KEYS[1] = myLock//KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout 或 KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeoutArrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),unit.toMillis(leaseTime),//ARGV[1] = 30000getLockName(threadId),//ARGV[2] = UUID1:ThreadID1 或 ARGV[2] = UUID2:ThreadID2getWriteLockName(threadId)//ARGV[3] = UUID1:ThreadID1:write 或 ARGV[3] = UUID2:ThreadID2:write);}...
}

(2)客户端A先加读锁的Redis命令执行过程和结果

参数说明:

KEYS[1] = myLock
KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout
ARGV[1] = 30000
ARGV[2] = UUID1:ThreadID1
ARGV[3] = UUID1:ThreadID1:write

Redis命令的执行过程:

hset myLock mode read
hset myLock UUID1:ThreadID1 1
set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1
pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000
pexpire myLock 30000

Redis执行结果:

//Hash结构
myLock: {"mode": "read","UUID1:ThreadID1": 1
}
//String结构
{myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1

(3)客户端B后加读锁的Redis命令执行过程和结果

参数说明:

KEYS[1] = myLock
KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout 
ARGV[1] = 30000
ARGV[2] = UUID2:ThreadID2
ARGV[3] = UUID2:ThreadID2:write

Redis命令的执行过程:

hget myLock mode ===> 获取到mode=read,表示此时已经有线程加了读锁
hincrby myLock UUID2:ThreadID2 1
set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1
pexpire myLock 30000
pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000

Redis执行结果:

//Hash结构
myLock: {"mode": "read","UUID1:ThreadID1": 1,"UUID2:ThreadID2": 1
}//String结构
{myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
{myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1

需要注意的是:多个客户端同时加读锁,读锁与读锁不互斥。会不断在key为锁名的Hash里,自增field为客户端UUID + 线程ID的value值。每个客户端成功加的一次读锁或写锁,都会维持一个WatchDog,不断刷新myLock的生存时间 + 刷新该客户端这次加的锁的过期时间。

加读锁的lua脚本中,ind表示重入次数。线程可重入自己的读锁和写锁。也就是说,线程后加的读锁可以重入线程自己先加的读锁或写锁。

5.RedissonReadLock和RedissonWriteLock的读写互斥逻辑

(1)不同客户端线程先读锁后写锁如何互斥

(2)不同客户端线程先写锁后读锁如何互斥

(1)不同客户端线程先读锁后写锁如何互斥

首先,客户端A(UUID1:ThreadID1)和客户端B(UUID2:ThreadID2)先加读锁,此时Redis中存在如下的数据:

//Hash结构
myLock: {"mode": "read","UUID1:ThreadID1": 1,"UUID2:ThreadID2": 1
}//String结构
{myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
{myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1

接着,客户端C(UUID3:ThreadID3)来加写锁。

public class RedissonWriteLock extends RedissonLock implements RLock {...@Override<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//执行命令"hget myLock mode",尝试获取一个Hash值mode"local mode = redis.call('hget', KEYS[1], 'mode'); " +//此时发现mode=read,说明已有线程加了锁了"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'write'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果加过锁,那么就要看是不是写锁 + 写锁是不是自己加过的(即重入写锁)"if (mode == 'write') then " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//重入写锁"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +"return nil; " +"end; " +"end;" +//执行命令"pttl myLock",返回myLock的剩余过期时间"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getRawName()),//KEYS[1] = myLockunit.toMillis(leaseTime),//ARGV[1] = 30000getLockName(threadId)//ARGV[2] = UUID3:ThreadID3:write);}...
}

客户端C(UUID3:ThreadID3)加写锁时的参数:

KEYS[1] = myLock
ARGV[1] = 30000
ARGV[2] = UUID3:ThreadID3:write

客户端C(UUID3:ThreadID3)加写锁时:首先执行命令"hget myLock mode"发现mode = read,说明已有线程加了锁了。由于已加的锁不是当前线程加的写锁,而是其他线程加的读锁。所以此时会执行命令"pttl myLock",返回myLock的剩余过期时间。这会导致客户端C加锁失败,会在while循环中阻塞和重试,从而实现先读锁后写锁的互斥。

(2)不同客户端线程先写锁后读锁如何互斥

假设客户端A(UUID1:ThreadID1)先加了一个写锁,此时Redis中存在如下的数据:

//Hash结构
myLock: {"mode": "write","UUID1:ThreadID1:write": 1
}

然后客户端B(UUID2:ThreadID2)再来加读锁。

public class RedissonReadLock extends RedissonLock implements RLock {...@Override<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//执行命令"hget myLock mode",尝试获取一个Hash值mode"local mode = redis.call('hget', KEYS[1], 'mode'); " +//发现mode=write,说明已有线程加了锁了"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'read'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('set', KEYS[2] .. ':1', 1); " +"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果已经有线程加了读锁 或者 有线程加了写锁且是自己加的写锁//所以一个线程如果加了写锁,它是可以重入自己的写锁和自己的读锁的"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +//hincrby myLock UUID2:ThreadID2 1//ind表示重入次数,线程可以重入自己的读锁和写锁,线程后加的读锁可以重入线程自己的读锁或写锁"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //key = {myLock}:UUID2:ThreadID2:rwlock_timeout:1"local key = KEYS[2] .. ':' .. ind;" +//set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1"redis.call('set', key, 1); " +//pexpire myLock 30000"redis.call('pexpire', key, ARGV[1]); " +"local remainTime = redis.call('pttl', KEYS[1]); " +//pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +"return nil; " +"end;" +//执行命令"pttl myLock",返回myLock的剩余过期时间"return redis.call('pttl', KEYS[1]);",//KEYS[1] = myLock//KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeoutArrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),unit.toMillis(leaseTime),//ARGV[1] = 30000getLockName(threadId),//ARGV[2] = UUID2:ThreadID2getWriteLockName(threadId)//ARGV[3] = UUID2:ThreadID2:write);}...
}

客户端B(UUID2:ThreadID2)加读锁时的参数:

KEYS[1] = myLock
KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout 
ARGV[1] = 30000
ARGV[2] = UUID2:ThreadID2
ARGV[3] = UUID2:ThreadID2:write

客户端B(UUID2:ThreadID2)加读锁时:首先执行命令"hget myLock mode"发现mode = write,说明已有线程加了锁了。接下来执行命令"hexists myLock UUID2:ThreadID2:write",发现不存在。也就是说,如果客户端B之前加过写锁,此时B加读锁才能通过判断。但是,之前加写锁的是客户端A,所以这里的判断条件不会通过。于是返回"pttl myLock",导致加读锁失败,会在while循环中阻塞和重试,从而实现先写锁后读锁的互斥。

(3)总结

如果客户端线程A之前先加了写锁,此时该线程再加读锁,可以成功。

如果客户端线程A之前先加了写锁,此时该线程再加写锁,可以成功。

如果客户端线程A之前先加了读锁,此时该线程再加读锁,可以成功。

如果客户端线程A之前先加了读锁,此时该线程再加写锁,不可以成功。

所以写锁可以被自己的写锁重入,也可以被自己的读锁重入。但是读锁可以被任意的读锁重入,不可以被任意的写锁重入。


http://www.ppmy.cn/devtools/166053.html

相关文章

DeepSeek进阶应用(一):结合Mermaid绘图(流程图、时序图、类图、状态图、甘特图、饼图)

&#x1f31f;前言: 在软件开发、项目管理和系统设计等领域&#xff0c;图表是表达复杂信息的有效工具。随着AI助手如DeepSeek的普及&#xff0c;我们现在可以更轻松地创建各种专业图表。 名人说&#xff1a;博观而约取&#xff0c;厚积而薄发。——苏轼《稼说送张琥》 创作者&…

【技术干货】三大常见网络攻击类型详解:DDoS/XSS/中间人攻击,原理、危害及防御方案

1. DDoS攻击 1.1 什么是DDoS攻击&#xff1f; DDoS&#xff08;Distributed Denial of Service&#xff0c;分布式拒绝服务攻击&#xff09;通过操控大量“僵尸设备”&#xff08;Botnet&#xff09;向目标服务器发送海量请求&#xff0c;耗尽服务器资源&#xff08;带宽、CPU…

Spring Boot 项目中 Redis 常见问题及解决方案

目录 缓存穿透缓存雪崩缓存击穿Redis 连接池耗尽Redis 序列化问题总结 1. 缓存穿透 问题描述 缓存穿透是指查询一个不存在的数据&#xff0c;由于缓存中没有该数据&#xff0c;请求会直接打到数据库上&#xff0c;导致数据库压力过大。 解决方案 缓存空值&#xff1a;即使…

计算机视觉算法实战——老虎个体识别(主页有源码)

✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ ​ ​​​ 1. 领域介绍 老虎个体识别是计算机视觉中的一个重要应用领域&#xff0c;旨在通过分析老虎的独特条纹图案&#xff0c;自动识别和区…

基于Asp.net的零食购物商城网站

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

Linux系统编程--线程同步

目录 一、前言 二、线程饥饿 三、线程同步 四、条件变量 1、cond 2、条件变量的使用 五、条件变量与互斥锁 一、前言 上篇文章我们讲解了线程互斥的概念&#xff0c;为了防止多个线程同时访问一份临界资源而出问题&#xff0c;我们引入了线程互斥&#xff0c;线程互斥其实…

16 HarmonyOS NEXT UVList组件开发指南(三)

温馨提示&#xff1a;本篇博客的详细代码已发布到 git : https://gitcode.com/nutpi/HarmonyosNext 可以下载运行哦&#xff01; HarmonyOS NEXT UVList组件开发指南(三) 第三篇&#xff1a;UVList组件使用方法与实际应用 1. 基础使用方法 1.1 引入组件 使用UVList组件前&a…

代码随想录第五十天| 图论理论基础

图论理论基础 这篇我们将正式开始学习图论&#xff01; 在代码随想录中&#xff0c;图论相关的算法题目将统一使用ACM模式。为什么要使用ACM模式呢&#xff1f; 图的基本概念 在二维坐标中&#xff0c;两点可以连成线&#xff0c;多个点连成的线就构成了图。 当然&#xff0…