Redisson分布式锁和同步器完整篇
在分布式系统中,如何确保多个服务实例之间的数据一致性和资源协调是一个关键挑战。Redisson作为基于Redis的Java客户端,提供了一套完整的分布式锁和同步器解决方案,帮助开发者轻松应对这些挑战。
本文将深入探讨Redisson的核心组件,包括:
- 分布式锁:可重入锁、公平锁、读写锁、红锁、联锁
- 同步工具:信号量、可过期信号量、闭锁
我们将从以下维度展开详细分析:
✅ 使用场景:结合实际案例,解析每种工具的适用场景
✅ 实现原理:深入源码,揭示底层工作机制
✅ 最佳实践:提供代码示例和配置建议
通过丰富的代码示例、清晰的流程图和详细的对比表格,本文旨在为开发者提供一份实用的Redisson使用指南,助力构建高可靠、高性能的分布式系统。
🔒 分布式锁
Redisson的分布式锁共有可重入锁、公平锁、读写锁、红锁、联锁五种,我们将逐一介绍
🔄1.可重入锁
所谓可重入锁就是同一个线程可以多次加锁,不会出现死锁问题
💡为什么要可重入
普通的分布式锁如果想要再次获取锁,就必须等待锁的释放后才能获取锁,如果在一个线程中出现了需要多次获取同一把锁的情况,就必须等待第一次获取到锁的线程释放锁,但是这两次获取锁的动作处于一个线程中,无法再次获取锁,线程就不会结束,因此第一次获取的锁也不会被释放,从而造成死锁
比如一个线程中,可能存在递归调用或者嵌套调用,如果多次尝试获取同一把锁,但锁不能重入,则会导致死锁。以下代码为业务场景中用到的两个方法,他们都各自做了获取锁的操作
java">public class ReentrantExample {private final MyLock lock = new MyLock();// 方法Apublic void methodA() {lock.lock(); // 获取锁try {methodB(); // 在方法A中调用方法B// 执行其他业务} finally {lock.unlock(); // 释放锁}} // 方法Bpublic void methodB() {lock.lock(); // 同样获取锁try {// 执行其他业务} finally {lock.unlock(); // 释放锁}}
}
此时在某处调用了A方法,A方法会先获取锁,接着再调用B的方法,B方法同样也会先获取锁,但是此时的锁未释放,是无法获取的
java">ReentrantExample example = new ReentrantExample();
example.methodA(); // 如果锁不可重入,这里会死锁
因此我们需要一个可以在同一线程下多次获取的锁,而Redisson就提供了这样的可重入锁,调用Redisson客户端redissonClient
的getLock
方法获取一个可重入锁RLock,并且传入一个字符串,这个字符串通常用于标识锁的唯一性。而Redisson是依靠Redis实现的,所以传入的这个字符串也会被用作在 Redis 中存储的键,通过这种方式,分布式系统的不同实例可以通过 Redis 共享锁的状态,从而实现分布式锁。
java">RLock lock = redissonClient.getLock("myLock");
// 可调用lock方法或者tryLock方法获取锁
public void method() {lock.tryLock(); // 获取锁try {// 执行其他业务} finally {lock.unlock(); // 释放锁}
}
🔎可重入的实现原理
可重入锁实现了RLock接口,其实现类为RedissonLock
之所以可重复获取锁,是因为底层在Redis中的数据结构采用了Hash而不是String,除了锁对象和线程标识符之外,还维护了一个重入次数的字段来记录锁被获取的次数,获取锁时会让获取次数加一,当释放锁时,会先使得获取次数减一,随后判断获取次数是否为0,如果是0则证明已经是最后一次释放锁,即该锁正在被第一次获取锁的地方释放,此时才正式释放锁
可以查看最终执行Lua脚本的异步方法:
java"><T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
该方法尝试为当前线程加锁或续租锁。如果锁不存在,则创建锁并设置过期时间;如果锁已存在且由当前线程持有,则增加锁计数并更新过期时间;否则,返回锁的剩余有效时间
🚀具体的原理实现可以查看我之前的文章:Redisson分布式锁如何实现可重入-CSDN博客
除此之外,可重入锁还具有可重试和自动续期的功能
📌可重试机制
重试机制就是当线程尝试获取锁时,发现锁已经被其他线程获取,此时不会立马返回失败,而是不停的尝试获取锁,等待锁被释放
当我们加锁的时候,可以调用lock
方法或是tryLock
方法,这两种加锁方式他们的重试机制是不同的:
1.lock
方法
-
阻塞式加锁:会一直尝试获取锁,直到成功为止。
-
如果锁已被其他线程持有,当前线程会进入等待状态,直到锁被释放。
2.tryLock
方法
- 非阻塞式加锁:尝试获取锁,但不会无限等待。
- 提供两个方法重载:
- 无超时参数:立即尝试获取锁,获取不到时直接返回
false
。 - 带超时参数:在指定的时间范围内尝试获取锁,超时后仍未获取到,则返回
false
。
- 无超时参数:立即尝试获取锁,获取不到时直接返回
📌自动续期机制
如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟
🚀对于详解其源码的文章:带你读懂Redisson分布式锁原理-CSDN博客
⚖️2.公平锁
公平锁遵循 FIFO(先来先服务) 原则,确保锁的分配顺序是公平的
它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
📌获取一个公平锁
在上面的可重入锁中,通过调用Redisson客户端redissonClient
的getLock
方法,而获取一个公平锁只需要把方法改成getFairLock
即可
java">RLock lock = redissonClient.getFairLock("myLock");
// 可调用lock方法或者tryLock方法获取锁
public void method() {lock.tryLock(); // 获取锁try {// 执行其他业务} finally {lock.unlock(); // 释放锁}
}
公平锁在需要保证严格访问顺序的场景中使用,确保资源的访问严格按照请求的先后顺序进行。这对于需要避免资源竞争导致不公平现象的业务非常重要,以下是常见的公平锁适用场景:
(1) ✅排队逻辑严格的场景:
当业务中存在用户或线程需要按照请求时间顺序排队的情况时,公平锁可以确保资源分配的公平性。
(2) ✅资源访问敏感的场景:
某些资源的访问必须公平分配,避免某些线程因非公平竞争而“垄断”资源。
(3) ✅用户体验至上的场景:
在需要优化用户体验,避免不公平现象带来负面影响的场景中,公平锁是一个很好的选择。
(4) ✅防止线程“饿死”的场景:
如果资源竞争特别激烈,非公平锁可能导致某些线程长期得不到资源,出现“饿死”现象。公平锁可以有效防止这种问题。
(5)✅高并发场景下的审计或合规要求:
某些业务对公平性有严格要求,并需要确保符合审计标准。
🔎公平机制实现原理
公平锁与可重入锁同样实现了RLock接口,公平锁的实现类是RedissonFairLock,其大部分加锁与释放锁的操作与可重入锁RedissonLock基本一致
之所以遵循先来先服务原则,是因为公平锁底层维护了一个等待队列,但是这个等待队列的声明并不在Java源码中,而是通过Redis的列表数据结构来实现的,新的线程尝试加锁时,会使用 RPUSH
将其线程标识加入队列末尾,释放锁时,通过 LPOP
或 LRANGE
获取队首的线程标识,进行调度
公平锁同样具有可重入锁的全部特性:可重入、可重试、自动续期,具体执行流程与可重入锁基本一致,因此我们只需要查看他的最终异步获取锁的方法,他重写了RedissonLock中的tryLockInnerAsync
方法,其中最重要的是执行的Lua脚本:
java"> <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);long wait = this.threadWaitTime;if (waitTime != -1L) {wait = unit.toMillis(waitTime);}long currentTime = System.currentTimeMillis();if (command == RedisCommands.EVAL_NULL_BOOLEAN) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "while true " +"do local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +"if timeout <= tonumber(ARGV[3]) then " +"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else break;" +"end;" +"end;" +"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 " +"do redis.call('zincrby', KEYS[3], -tonumber(ARGV[4]), keys[i]);" +"end;" +"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +"return 1;", Arrays.asList(this.getName(), this.threadsQueueName, this.timeoutSetName), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId), currentTime, wait});} else if (command == RedisCommands.EVAL_LONG) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "while true " +"do local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +"if timeout <= tonumber(ARGV[4]) then " +"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else break;" +"end;" + "end;" +"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +"redis.call('lpop', KEYS[2]);redis.call('zrem', KEYS[3], ARGV[2]);" +"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 " +"do redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2],1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +"if timeout ~= false then " +"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +"end;" +"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +"local ttl;" +"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +"else " +"ttl = redis.call('pttl', KEYS[1]);" +"end;" +"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end;" +"return ttl;", Arrays.asList(this.getName(), this.threadsQueueName, this.timeoutSetName), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId), wait, currentTime});} else {throw new IllegalArgumentException();}}
在这段源码中,两个 EVAL
脚本使用了 Redis 的 Lua 脚本来实现对公平锁的加锁逻辑,确保公平性并控制锁的获取。它们分别在两个不同的场景下执行:一个用于处理锁的获取,另一个用于检查锁的状态和排队机制
第一个 Lua 脚本:加锁和释放锁
执行流程:
- 检查队列中的第一个线程:
local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
- 使用
lindex
获取队列中第一个线程的标识符,如果队列为空,跳出循环。
- 检查超时:
local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
- 获取队列中第一个线程的超时时间。这个超时时间通过
zscore
从 Redis 的有序集合(timeoutSetName
)中读取。
- 如果超时,移除线程并释放资源:
- 如果该线程超时,即其
timeout <= currentTime
,那么将其从等待队列中移除(lpop
)并清除其超时记录(zrem
)。
- 如果该线程超时,即其
- 检查锁的可用性:
if (redis.call('exists', KEYS[1]) == 0)
:检查锁是否已经被其他线程占用。如果没有被占用,则执行加锁操作。
- 加锁:
redis.call('hset', KEYS[1], ARGV[2], 1);
:将当前线程的标识符存储在锁的hash
中(hset
)。这标志着当前线程已经获得锁。
- 设置锁的过期时间:
redis.call('pexpire', KEYS[1], ARGV[1]);
:设置锁的过期时间。通过pexpire
来确保锁不会一直被占用。
- 返回:
return nil;
:如果锁成功获取,返回nil
,表示没有错误。
第二个 Lua 脚本:等待和超时机制
执行流程:
- 检查当前线程是否已经持有锁:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
:首先检查当前线程是否已经获得了锁。如果已经获得锁,直接增加计数(可重入性)。
- 检查锁的超时:
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
:如果当前线程还没有获取锁,检查它在队列中的超时记录。
- 判断队列中的最后一个线程:
local lastThreadId = redis.call('lindex', KEYS[2], -1);
:如果当前线程没有获取锁,检查队列中的最后一个线程,看看是否应该允许当前线程获取锁。
- 添加线程到等待队列:
redis.call('zadd', KEYS[3], timeout, ARGV[2]);
:将当前线程添加到有序集合(timeoutSetName
)中,设置超时信息。这样,线程将在超时后自动移除。
- 检查队首线程是否已到达超时并进行加锁:
- 如果队列中的首个线程是当前线程,执行加锁操作:
redis.call('rpush', KEYS[2], ARGV[2]);
。
- 如果队列中的首个线程是当前线程,执行加锁操作:
- 返回等待时间:
- 如果当前线程被加入到队列,返回它需要等待的时间。
它们共同保证了公平锁的公平性,确保队列中的线程按照它们请求的顺序依次获取锁
📊与非公平锁的区别
特性 | 公平锁(Fair Lock) | 非公平锁(Unfair Lock) |
---|---|---|
锁分配规则 | FIFO(按请求顺序获取锁) | 可能插队,后来的线程可以先获取锁 |
实现复杂度 | 较高(需要维护等待队列) | 较低 |
性能 | 较低(线程需要等待) | 较高 |
适用场景 | 公平性要求高的场景,例如金融、订单系统等 | 性能要求高,但对公平性要求不高的场景 |
📝3.读写锁
读写锁(Read-Write Lock)是一种允许多线程并发读、但只允许一个线程写的同步机制。它是一种优化的锁机制,适用于读操作远多于写操作的场景
⁉️什么是读写锁
读锁(共享锁):
- 多个线程可以同时持有读锁,前提是没有线程持有写锁。
- 适用于读取共享资源的场景,因为读操作不会修改资源的状态。
写锁(排他锁):
- 只有一个线程可以持有写锁,且持有写锁时不能有线程持有读锁。
- 适用于需要修改共享资源的场景。
特性 | 读写锁 | 排他锁 |
---|---|---|
访问粒度 | 区分读锁(共享)和写锁(独占) | 只提供独占模式 |
读操作并发 | 支持多个线程同时读资源 | 不允许任何线程访问资源 |
写操作独占 | 写锁是独占的(等同于排他锁) | 写操作独占资源 |
适用场景 | 读多写少场景 | 需要严格控制访问的场景 |
读写锁实现类为RReadWriteLock,其中读锁和写锁都继承了RLock接口。
读写锁常用于以下业务场景,它们的共同特点是读多写少,即大多数操作是只读的,写操作相对较少但要求独占访问
采用读写锁可以保证线程在读数据的时候可以允许其他线程同样进行读操作,但是不允许其他线程进行写操作,若是此时有线程正在进行写操作,则不允许进行任何操作,这样可以保证数据的强一致性,但是会损失一点点效率
📌获取一个读写锁
通过调用Redisson客户端redissonClient
的getReadWriteLock
方法即可获取一个读写锁,这个读写锁可以分别获取读锁或写锁
java">RReadWriteLock rwlock = redisson.getReadWriteLock("myLock");
// 获取读锁
rwlock.readLock().lock();
// 释放锁
rwlock.unlock();
// 获取写锁
rwlock.writeLock().lock();
// 释放锁
rwlock.unlock();
在开发中使用读写锁通常都会在两个不同业务中分别使用读锁和写锁,比如在获取商品信息的业务代码中使用读锁,更改商品信息的代码中使用写锁,但是要想保证一致性,两个锁的标识要保持一致:
java">// 读数据业务
public Item getById(Integer id){RReadWriteLock readWriteLock = redissonclient.getReadWriteLocK("ITEM_READ_WRITE_LOCK");//读之前加读锁,读锁的作用就是等待该lockkey释放写锁以后再读RLock readLock=readWriteLock.readLock();try {//开锁readLock.lock();System.out.println("readLock...");Item item =(Item)redisTemplate.opsForValue().get("item:"+id);if(item ! null){return item;}//查询业务数据item = new Item(id,"华为手机","华为手机", 5999.00);//写入缓存redisTemplate.opsForValue().set("item:"+id item);//返回数据return item;} finally {readLock.unlock();}
java">// 写数据业务
public void updateById(Integer id){RReadWriteLock readWriteLock = redissonclient.getReadWriteLock("ITEM_READ_WRITE_LOCK");// 写之前加写锁,写锁加锁成功,读锁只能等待RLock writeLock=readWriteLock.writeLock();try {//开锁writeLock.lock();System.out.printin("writeLock...");//更新业务数据Item item = new Item(id,"华为手机","华为手机",5299.00);try {Thread.sleep(10000);}catch(InterruptedException e){e.printStackTrace();}//删除缓存redisTemplate.delete( key:"item:"+id);} finally {writeLock.unlock();}
🔎读写锁实现原理
读写锁的共享与互斥特性是通过不同的计数器和线程状态管理来实现的
核心实现机制
-
✅状态变量(Lock State)
-
ReentrantReadWriteLock
的底层使用一个整数变量(称为state
)来表示锁的状态:一个整数(32 位)中按位分割的两部分:
- 高 16 位:表示读锁的计数(即有多少线程持有读锁)。
- 低 16 位:表示写锁的计数(是否有线程持有写锁)。
-
通过位运算对这两个部分进行解码:
- 读计数:
state >>> 16
- 写计数:
state & 0xFFFF
- 读计数:
-
-
✅写锁获取(互斥)
-
写线程尝试获取写锁时,会检查以下条件:
- 当前
state & 0xFFFF == 0
(写锁未被持有)。 - 当前
state >>> 16 == 0
(没有线程持有读锁)。
- 当前
-
如果条件满足:
- 将
state & 0xFFFF
加 1,表示当前线程持有写锁。
- 将
-
-
✅读锁获取(共享)
-
读线程尝试获取读锁时,会检查以下条件:
- 当前
state & 0xFFFF == 0
(写锁未被持有)。
- 当前
-
如果条件满足:
- 将
state >>> 16
加 1,表示一个线程成功获取了读锁。
- 将
-
-
✅释放锁
-
释放读锁:读线程完成任务后,将
state >>> 16
减 1。 -
释放写锁:写线程完成任务后,将
state & 0xFFFF
减 1。
-
关键逻辑示例
以下是伪代码描述读写锁的主要逻辑:
1️⃣写锁获取逻辑
java">// 尝试获取写锁
synchronized (lock) {while (state != 0) { // 读锁或写锁被占用wait(); // 当前线程等待}state = WRITE_LOCK; // 设置写锁标志
}
2️⃣读锁获取逻辑
java">// 尝试获取读锁
synchronized (lock) {while (state & WRITE_LOCK != 0) { // 写锁被占用wait(); // 当前线程等待}state += READ_LOCK; // 增加读锁计数
}
🛡4.红锁
红锁(Redlock) 是一种由 Redis 官方提出的分布式锁实现方案,用于确保在分布式系统中多个节点间的互斥性和一致性。它解决了经典分布式锁的一些问题,比如网络分区、节点故障等
通过部署多个独立的Redis主节点来确保分布式锁的高可用性。当客户端尝试获取锁时,需要向超过半数的节点申请锁,只有当多数节点都成功获取锁时,才认为锁获取成功。
💡红锁的特性
- 容错性:允许部分节点故障(最多N/2个节点故障)
- 安全性:获取锁需要超过半数的节点确认
- 自动续期:与可重入锁一样支持看门狗机制
📌使用场景
- ☑️金融交易系统
- ☑️医疗数据管理系统
- ☑️政务审批系统
📌获取红锁的代码示例
这里需要多个Redis的节点来尝试获取锁,并且redis节点要为单数,为了保证多数派原则(N/2+1),当有2个节点故障时,3节点集群仍能正常运作(需要至少2个节点存活)
这里三个节点分别获取一个对应的锁,再new一个红锁把这三个节点对应的锁传递过去
java">// 节点1
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://192.168.0.1:6379");
RedissonClient client1 = Redisson.create(config1);// 节点2
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://192.168.0.2:6379");
RedissonClient client2 = Redisson.create(config2);// 节点3
Config config3 = new Config();
config2.useSingleServer().setAddress("redis://192.168.0.3:6379");
RedissonClient client3 = Redisson.create(config3);RLock lock1 = client1.getLock("lock");
RLock lock2 = client2.getLock("lock");
RLock lock3 = client3.getLock("lock");RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {// 尝试获取锁,最多等待100秒,锁有效期30秒boolean res = redLock.tryLock(100, 30, TimeUnit.SECONDS);if (res) {// 业务处理}
} finally {redLock.unlock();
}
🔎红锁算法原理
在 RedissonRedLock
类中,核心方法通过遍历所有节点执行加锁操作
- 获取当前时间(毫秒精度)
- 依次向所有Redis节点请求加锁(设置相同的key和随机值)
- 计算获取锁消耗的总时间(必须小于锁的有效时间)
- 当且仅当在超过半数的节点上成功获取锁,并且总耗时小于锁有效时间时,才认为锁获取成功
- 如果获取失败,需要向所有节点发起解锁请求
🤝5.联锁
联锁要求同时获取多个独立的锁,只有当所有锁都成功获取时,才认为加锁成功。适用于需要同时锁定多个资源的场景。
📊与红锁的区别
特性 | 联锁 | 红锁 |
---|---|---|
节点关系 | 各锁对应不同业务资源 | 多个节点存储同一把锁 |
容错性 | 必须全部成功 | 允许部分失败 |
使用场景 | 多资源协同操作 | 高可用分布式锁 |
📌获取联锁示例
这里同样需要获取多把锁再交给RedissonMultiLock
类
java">RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");RedissonMultiLock multiLock = new RedissonMultiLock(lock1, lock2, lock3);
try {// 同时获取三个锁,最多等待100秒boolean res = multiLock.tryLock(100, TimeUnit.SECONDS);if (res) {// 操作多个资源}
} finally {multiLock.unlock();
}
🔎实现原理
在 RedissonMultiLock
类中,加锁操作严格按顺序执行,通过遍历所有锁依次尝试获取:
java">public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {for (RLock lock : locks) {if (!lock.tryLock(waitTime, unit)) {// 任意一个锁获取失败立即回滚unlockInner(locks);return false;}}return true;
⏱️同步工具
📡1.信号量
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore
采用了与java.util.concurrent.Semaphore
相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
💡核心用途
实现精准的并发流量控制,典型场景:
- ✅API网关限流(每秒1000请求)
- ✅数据库连接池管理(最大50连接)
- ✅分布式爬虫速率控制
📌分布式信号量特点
- ✅控制同时访问资源的线程数量
- ✅支持跨JVM的流量控制
- ✅支持许可证回收机制
java">RSemaphore semaphore = redisson.getSemaphore("semaphore");
// 初始化信号量(设置许可证数量)
semaphore.trySetPermits(5);// 获取许可(阻塞直到获取成功)
semaphore.acquire();// 释放许可
semaphore.release();
🔎源码实现
核心类 RedissonSemaphore
通过Redis的DECR
原子操作实现(源码位置:org.redisson.RedissonSemaphore#acquire
):
java">public void acquire() throws InterruptedException {// 执行Lua脚本尝试获取许可String script = "local value = redis.call('get', KEYS[1]); " +"if (value ~= false and tonumber(value) > 0) then " +" redis.call('decr', KEYS[1]); " +" return 1; " +"end; " +"return 0;";while (true) {Long res = evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,script, Collections.singletonList(getName()));if (res == 1) {break;}Thread.sleep(100); // 轮询间隔}
}
⌛2.可过期性信号量
基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore)是在RSemaphore
对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。它提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
💡核心用途
解决传统信号量可能出现的许可证泄漏问题,典型场景:
- 容器化环境中的瞬时Pod故障
- 移动端网络不稳定导致的连接中断
- 长时间任务执行监控
在普通信号量基础上增加许可证有效期,防止因客户端崩溃导致的许可证泄漏。
java">RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("semaphore");
// 获取许可(有效期30秒)
String permitId = semaphore.acquire(30, TimeUnit.SECONDS);try {// 执行业务逻辑
} finally {if (semaphore.tryRelease(permitId)) {System.out.println("许可已释放");}
}
📌自动续期机制
在之前的分布式锁中的自动续期机制中,就使用了可过期性信号量
🔎源码解析
在 RedissonPermitExpirableSemaphore
中通过双重数据结构实现(源码位置:org.redisson.RedissonPermitExpirableSemaphore#acquire
):
java">public String acquire(long leaseTime, TimeUnit unit) {// 生成唯一许可IDString permitId = UUID.randomUUID().toString();// 使用Hash结构存储许可信息String script = "if redis.call('exists', KEYS[1]) == 0 then " +" redis.call('hset', KEYS[1], ARGV[1], 1); " +" redis.call('pexpire', KEYS[1], ARGV[2]); " +" return 1; " +"end; " +"return 0;";Long res = evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,script, Collections.singletonList(getName()), permitId, unit.toMillis(leaseTime));return res == 1 ? permitId : null;
}
数据结构设计:
自动清理机制:
后台线程定期执行扫描脚本(源码位置:org.redisson.PermitExpirableSemaphoreCleaner
):
local keys = redis.call('hkeys', KEYS[1]);
for i, key in ipairs(keys) doif tonumber(redis.call('hget', KEYS[1], key)) < tonumber(ARGV[1]) thenredis.call('hdel', KEYS[1], key)end
end
return #keys
🔐3.闭锁
闭锁(CountDownLatch)就像是一个倒计时门闩,它的作用是让一群线程在某个地方“等着”,直到某个条件满足(比如倒计时结束),它们才能继续往下执行。
java">RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
💡核心用途
实现分布式系统的阶段同步,典型场景:
- 微服务启动依赖(数据库服务先于业务服务启动)
- 大数据批量任务执行(MapReduce阶段控制)
- 分布式事务提交协调
📌典型应用场景
- 微服务启动顺序控制
- 大数据批量任务协调
- 分布式事务提交协调
🔎实现原理
在 RedissonCountDownLatch
中通过Redis的计数器实现(源码位置:org.redisson.RedissonCountDownLatch#countDown
):
java">public void countDown() {// 原子递减操作String script = "local v = redis.call('get', KEYS[1]); " +"if v == false then " +" return 0; " +"end " +"if tonumber(v) > 0 then " +" redis.call('decr', KEYS[1]); " +"end " +"return redis.call('get', KEYS[1]);";Long res = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,script, Collections.singletonList(getName()));// 计数器归零时发布通知if (res == 0) {commandExecutor.getNow(subscription.publish(CountDownLatchPubSub.ZERO_COUNT_MESSAGE));}
}
等待机制实现:
关键特性:
- 跨JVM通知:基于Redis的Pub/Sub机制
- 动态调整:支持运行时修改计数器值
- 多重等待:支持多个线程同时await()