文章目录
- 集群下的锁失效问题
- Redis中的setnx命令实现分布式锁
- setnx基本原理
- 死锁问题
- 利用Redis实现的简单分布式锁流程
- setnx的分布式锁的问题
- 锁误删问题
- 超时释放问题
- 其它问题
- Redisson
- 基于注解的分布式锁
- 工厂模式 选择锁类型
- 策略模式提供 重试策略 + 失败策略组合
- 基于SPEL的动态锁名
- 相关知识点
- 自定义注解
- Redis Pub/Sub 的工作原理
- 失败重试机制在 Redis Pub/Sub 中的应用
- 1. **消息确认和重试机制**
- 2. ==使用 Redis Streams 替代 Pub/Sub==
- ~~3. **使用 Redis Pub/Sub 的失败重试方案**~~
- 总结
集群下的锁失效问题
Synchronized中的重量级锁,底层就是基于锁监视器(Monitor)来实现的。简单来说就是锁对象头会指向一个锁监视器,而在监视器中则会记录一些信息,比如:
- _owner:持有锁的线程
- _recursions:锁重入次数
每一个锁对象,都会指向一个锁监视器,而每一个锁监视器,同一时刻只能被一个线程持有,这样就实现了互斥效果。但前提是,多个线程使用的是同一把锁。
但问题来了,我们的服务将来肯定会多实例不是,形成集群。每一个实例都会有一个自己的JVM运行环境,因此即便是同一个用户,如果并发的发起了多个请求,由于请求进入了多个JVM,就会出现多个锁对象(用户id对象),自然就有多个锁监视器。此时就会出现每个JVM内部都有一个线程获取锁成功的情况,没有达到互斥的效果,并发安全问题就可能再次发生了:
我们不能让每个实例去使用各自的JVM内部锁监视器,而是应该在多个实例外部寻找一个锁监视器,多个实例争抢同一把锁。像这样的锁,就称为分布式锁。
分布式锁必须要满足的特征:
- 多JVM实例都可以访问
- 互斥
能满足上述特征的组件有很多,因此实现分布式锁的方式也非常多,例如:
- 基于MySQL
- 基于Redis
- 基于Zookeeper
- 基于ETCD
但目前使用最广泛的还应该是基于Redis的分布式锁。
Redis中的setnx命令实现分布式锁
Redis本身可以被任意JVM实例访问,同时Redis中的setnx命令具备互斥性,因此符合分布式锁的需求
setnx基本原理
Redis的setnx命令是对string类型数据的操作,语法如下:
给key赋值为value: SETNX key value
当前仅当key不存在的时候,setnx才能执行成功,并且返回1,其它情况都会执行失败,并且返回0.我们就可以认为返回值是1就是获取锁成功,返回值是0就是获取锁失败,实现互斥效果。
而当业务执行完成时,我们只需要删除这个key即可释放锁。这个时候其它线程又可以再次获取锁(执行setnx成功)了。
删除指定key,用来释放锁: DEL key
死锁问题
不过我们要考虑一种极端情况,比如我们获取锁成功,还未释放锁呢当前实例突然宕机了!那么释放锁的逻辑自然就永远不会被执行,这样lock就永远存在,再也不会有其它线程获取锁成功了!出现了死锁问题: 利用Redis的KEY过期时间机制,在获取锁时给锁添加一个超时时间:
获取锁,并记录持有锁的线程: SETNX lock thread1
设置过期时间,避免死锁: EXPIRE lock 20
这里我们设置超时时间为20秒,远超任务执行时间。当业务正常执行时,这个过期时间不起作用
但是如果当前服务实例宕机,DEL无法执行。但由于我们设置了20秒的过期时间,当超过这个时间时,锁会因为过期被删除,因此就等于释放锁了,从而避免了死锁问题。这种策略就是超时释放锁策略。
但新的问题来了,SETNX和EXPIRE是两条命令,如果我执行完SETNX,还没来得急执行EXPIRE时服务已经宕机了,这样加锁成功,但锁超时时间依然没能设置!死锁问题岂不是再次发生了?!
所以,必须保证SETNX和EXPIRE两个操作的原子性。事实上,Redis中的set命令就能同时实现setnx和expire的效果:
NX 等同于SETNX lock thread1效果, EX 20 等同于 EXPIRE lock 20效果
SET lock thread1 NX EX 20
利用Redis实现的简单分布式锁流程
java">public class RedisLock {private final String key;private final StringRedisTemplate redisTemplate;/*** 尝试获取锁* @param leaseTime 锁自动释放时间* @param unit 时间单位* @return 是否获取成功,true:获取锁成功;false:获取锁失败*/public boolean tryLock(long leaseTime, TimeUnit unit){// 1.获取线程名称String threadValue = Thread.currentThread().getName();// 2.获取锁Boolean success = redisTemplate.opsForValue().setIfAbsent(key, threadValue, leaseTime, unit);// 3.返回结果return BooleanUtils.isTrue(success);}/*** 释放锁*/public void unlock(){redisTemplate.delete(key);}
}
setnx的分布式锁的问题
锁误删问题
例如,有线程1获取锁成功,并且执行完任务,正准备释放锁,但是因为某种原因导致线程1释放锁的操作被阻塞了,直到锁被超时释放。就在此时,有一个新的线程2来尝试获取锁。因为线程1的锁被超时释放,因此线程2是可以获取锁成功的。而就在此时,线程1醒来,继续执行释放锁的操作,也就是DEL.结果就把线程2的锁给删除了。然而此时线程2还在执行任务,如果有其它线程再来获取锁,就会认为无人持有锁从而获取锁成功,于是多个线程再次并行执行,并发安全问题就可能再次发生了
解决方法:释放锁前要检查是不是自己的锁
超时释放问题
线程1获取锁成功,并且执行业务完成,并且也判断了锁标示,确实与自己一致:
接下来,线程1应该去释放自己的锁了,可就在此时发生了阻塞!直到锁超时释放:然后,线程2来获取锁,又和上面一样了。
总结一下,误删的原因归根结底是因为什么?
- 超时释放
- 判断锁标示、删除锁两个动作不是原子操作
操作锁的多行命令又该如何确保原子性?
其它问题
除了上述问题以外,分布式锁还会碰到一些其它问题:
- 锁的重入问题:同一个线程多次获取锁的场景,目前不支持,可能会导致死锁
- 锁失败的重试问题:获取锁失败后要不要重试?目前是直接失败,不支持重试
- Redis主从的一致性问题:由于主从同步存在延迟,当线程在主节点获取锁后,从节点可能未同步锁信息。如果此时主宕机,会出现锁失效情况。此时会有其它线程也获取锁成功。从而出现并发安全问题。
- …
当然,上述问题并非无法解决,只不过会比较麻烦。例如:
- 原子性问题:可以利用Redis的LUA脚本来编写锁操作,确保原子性
- 超时问题:利用WatchDog(看门狗)机制,获取锁成功时开启一个定时任务,在锁到期前自动续期,避免超时释放。而当服务宕机后,WatchDog跟着停止运行,不会导致死锁。
- 锁重入问题:可以模拟Synchronized原理,放弃setnx,而是利用Redis的Hash结构来记录锁的持有者以及重入次数,获取锁时重入次数+1,释放锁是重入次数-1,次数为0则锁删除
- 主从一致性问题:可以利用Redis官网推荐的RedLock机制来解决
这些解决方案实现起来比较复杂,因此我们通常会使用一些开源框架来实现分布式锁,而不是自己来编码实现。目前对这些解决方案实现的比较完善的一个第三方组件:Redisson
Redisson
首先引入依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId>
</dependency>
然后是配置:
java">@Configuration
public class RedisConfig {@Beanpublic RedissonClient redissonClient() {// 配置类Config config = new Config();// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址 config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassowrd("123321");// 创建客户端return Redisson.create(config);}
}
tj-common里面已经配置了,所以不需要重复配置
最后是基本用法:
java">@Autowired
private RedissonClient redissonClient;@Test
void testRedisson() throws InterruptedException {// 1.获取锁对象,指定锁名称RLock lock = redissonClient.getLock("anyLock");//anylock是锁的名字也是redis的键值try {// 2.尝试获取锁,参数:waitTime、leaseTime、时间单位boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);if (!isLock) {// 获取锁失败处理 ..} else {// 获取锁成功处理}} finally {// 4.释放锁lock.unlock();//判断锁是否属于自己+原子性都有实现}
}
Watch Dog看门狗不能设置失效时间,会设置默认的失效时间。
Redisson解决上面的问题:
- 原子性: Lua保证 判断锁是不是自己的,操作的原子性
- 超时问题:Watch Dog看门狗,会专门创建一个线程,监控当前的分布式锁有没有结束,(如果正在使用着锁)没有结束的话会每10s调整过期时间。就检测到正在使用着会把过期时间重置回30s, 不用担心“我正在用着锁释放了”导致的安全问题
- 不可重入:Redis的Hash结构来记录锁的持有者以及重入次数
- 失败重试:redis的发布订阅(Pub/Sub)
- 主从一致性问题:可以利用Redis官网推荐的RedLock机制来解决。
向games频道发送消息,上面订阅了games频道的就会收到消息’hello’
基于注解的分布式锁
基于AOP的思想,将业务部分作为切入点,将业务前后的锁操作作为环绕增强。注解的核心作用是两个:
- 标记切入点
- 传递锁参数
注解本身起到标记作用,同时还要带上锁参数:
- 锁名称
- 锁等待时间
- 锁超时时间
- 时间单位
Step1:自定义注解锁:
java">package com.tianji.promotion.annotation;import com.tianji.promotion.enums.MyLockStrategy;
import com.tianji.promotion.enums.MyLockType;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;@Retention(RetentionPolicy.RUNTIME) // 运行时生效
@Target(ElementType.METHOD) // 作用于方法
public @interface MyLock {String name(); // 锁名称long waitTime() default 1; // 申请锁的等待时间long leaseTime() default -1; // 持有锁的TTL有效时间TimeUnit unit() default TimeUnit.SECONDS; // 时间单位
}
Step2:定义切面类:
Step3:定义好了锁注解和切面,接下来就可以改造业务了:
怎么定义@Transactional和@MyLock的顺序,默认事务的执行顺序比较靠后(其注解里面的order值较高顺序靠后)。 所以 能保证是 先获取锁再执行事务
工厂模式 选择锁类型
Step1: 在注解 锁MyLock里面加入一个属性
锁的类型,默认为可重入锁,由工厂模式根据lockType进行创建
java">MyLockType lockType() default MyLockType.RE_ENTRANT_LOCK;
Step2: 在切面类中创建锁对象(更新为工厂模式创建)
java">private final MyLockFactory myLockFactory;//RLock lock = redissonClient.getLock(myLock.name()); // 只能获取可重入锁
RLock lock = myLockFactory.getLock(myLock.lockType(), lockName);
工厂模式MyLockFactory:
java">import com.tianji.promotion.enums.MyLockType;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;@Component
public class MyLockFactory {// 锁对象类型,方法引用private final Map<MyLockType, Function<String, RLock>> lockHandlers;/*** 使用工厂模式,来实现不同的锁类型** @param redissonClient Redisson 客户端实例*/public MyLockFactory(RedissonClient redissonClient) {// 初始化锁处理器映射表this.lockHandlers = new EnumMap<>(MyLockType.class);// 添加不同类型的锁处理器到映射表中this.lockHandlers.put(MyLockType.RE_ENTRANT_LOCK, redissonClient::getLock);this.lockHandlers.put(MyLockType.FAIR_LOCK, redissonClient::getFairLock);this.lockHandlers.put(MyLockType.READ_LOCK, name -> redissonClient.getReadWriteLock(name).readLock());this.lockHandlers.put(MyLockType.WRITE_LOCK, name -> redissonClient.getReadWriteLock(name).writeLock());}/*** 获取指定类型的锁实例** @param lockType 锁类型* @param name 锁名称* @return 对应类型的锁实例*/public RLock getLock(MyLockType lockType, String name){// get获取锁类型的引用,apply调用对应的创建方法return lockHandlers.get(lockType).apply(name);}
}
java">public enum MyLockType {RE_ENTRANT_LOCK, // 可重入锁FAIR_LOCK, // 公平锁READ_LOCK, // 读锁WRITE_LOCK, // 写锁;
}
策略模式提供 重试策略 + 失败策略组合
定义枚举类,枚举Redisson分布式锁的锁失败的处理策略:
java">import com.tianji.promotion.annotation.MyLock;
import org.redisson.api.RLock;
public enum MyLockStrategy {SKIP_FAST(){ // 枚举项,快速结束 = 不重试+快速失败@Overridepublic boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {return lock.tryLock(0, prop.leaseTime(), prop.unit());}},FAIL_FAST(){ // 枚举项,快速失败 = 不重试+抛出异常@Overridepublic boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {boolean isLock = lock.tryLock(0, prop.leaseTime(), prop.unit());if (!isLock) {throw new BizIllegalException("请求太频繁");}return true;}},KEEP_TRYING(){ // 枚举项,无限重试@Overridepublic boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {lock.lock( prop.leaseTime(), prop.unit());return true;}},SKIP_AFTER_RETRY_TIMEOUT(){ // 枚举项,重试超时后结束 = 有限重试+直接结束@Overridepublic boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {return lock.tryLock(prop.waitTime(), prop.leaseTime(), prop.unit());}},FAIL_AFTER_RETRY_TIMEOUT(){ // 枚举项,重试超时后失败 = 有限重试+抛出异常@Overridepublic boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {boolean isLock = lock.tryLock(prop.waitTime(), prop.leaseTime(), prop.unit());if (!isLock) {throw new BizIllegalException("请求太频繁");}return true;}},;public abstract boolean tryLock(RLock lock, MyLock prop) throws InterruptedException;
}
和工厂模式 选择锁类型
一样, 在注解 锁MyLock里面加入一个属性,实现可选策略:
java">// 锁的失败策略,默认为重试超时后失败(有限重试,失败后抛出异常),由工厂模式根据lockType进行创建
MyLockStrategy lockStrategy() default MyLockStrategy.FAIL_AFTER_RETRY_TIMEOUT;
修改切面代码,基于用户选择的策略来处理:
就可以在使用锁的时候自由选择锁类型、锁策略了:
基于SPEL的动态锁名
现在实现的锁版本还没有userID
在当前业务中,我们的锁对象本来应该是当前登录用户,是动态获取的。而加锁是基于注解参数添加的,在编码时就需要指定。怎么办?
Spring中提供了一种表达式语法,称为SPEL表达式,可以执行java代码,获取任意参数。
思路:
我们可以让用户指定锁名称参数时不要写死,而是基于SPEL表达式。在创建锁对象时,解析SPEL表达式,动态获取锁名称。
首先,在使用锁注解时,锁名称可以利用SPEL表达式,例如我们指定锁名称中要包含参数中的用户id,则可以这样写:
而如果是通过UserContext.getUser()获取,则可以利用下面的语法:
在这里插入图片描述
这里T(类名).方法名()就是调用静态方法。
获取锁名称用的是getLockName()这个方法:
java">
/*** SPEL的正则规则*/
private static final Pattern pattern = Pattern.compile("\\#\\{([^\\}]*)\\}");
/*** 方法参数解析器*/
private static final ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();/*** 解析锁名称* @param name 原始锁名称* @param pjp 切入点* @return 解析后的锁名称*/
private String getLockName(String name, ProceedingJoinPoint pjp) {// 1.判断是否存在spel表达式if (StringUtils.isBlank(name) || !name.contains("#")) {// 不存在,直接返回return name;}// 2.构建context,也就是SPEL表达式获取参数的上下文环境,这里上下文就是切入点的参数列表EvaluationContext context = new MethodBasedEvaluationContext(TypedValue.NULL, resolveMethod(pjp), pjp.getArgs(), parameterNameDiscoverer);// 3.构建SPEL解析器ExpressionParser parser = new SpelExpressionParser();// 4.循环处理,因为表达式中可以包含多个表达式Matcher matcher = pattern.matcher(name);while (matcher.find()) {// 4.1.获取表达式String tmp = matcher.group();String group = matcher.group(1);// 4.2.这里要判断表达式是否以 T字符开头,这种属于解析静态方法,不走上下文Expression expression = parser.parseExpression(group.charAt(0) == 'T' ? group : "#" + group);// 4.3.解析出表达式对应的值Object value = expression.getValue(context);// 4.4.用值替换锁名称中的SPEL表达式name = name.replace(tmp, ObjectUtils.nullSafeToString(value));}return name;
}private Method resolveMethod(ProceedingJoinPoint pjp) {// 1.获取方法签名MethodSignature signature = (MethodSignature)pjp.getSignature();// 2.获取字节码Class<?> clazz = pjp.getTarget().getClass();// 3.方法名称String name = signature.getName();// 4.方法参数列表Class<?>[] parameterTypes = signature.getMethod().getParameterTypes();return tryGetDeclaredMethod(clazz, name, parameterTypes);
}private Method tryGetDeclaredMethod(Class<?> clazz, String name, Class<?> ... parameterTypes){try {// 5.反射获取方法return clazz.getDeclaredMethod(name, parameterTypes);} catch (NoSuchMethodException e) {Class<?> superClass = clazz.getSuperclass();if (superClass != null) {// 尝试从父类寻找return tryGetDeclaredMethod(superClass, name, parameterTypes);}}return null;
}
解析SPEL
在切面中,我们需要基于注解中的锁名称做动态解析,而不是直接使用名称:
相关知识点
自定义注解
java">@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface xxxx{
}
通知类:里面的切点表达式,public是方法返回类型,路径…*是service及其子包下的类 后面的.*是任意方法 (…)指任意参数
不用切点表达式而是用注解 控制仅实现类中的一个方法:(@annotation(路径.实现类中方法的注解)
注解直接写到方法的参数上:@Around("@annotation(xxx类)")
以上图为例,只要方法加了@printTime注解,方法就是切点,再走这个方法之前就会走环绕通知@Around()
around()方法
Redis 的 Pub/Sub (发布/订阅) 是一种消息传递机制,它允许客户端订阅一个或多个频道,并接收其他客户端发布到这些频道的消息。在使用 Redis Pub/Sub 的过程中,可能会遇到由于网络故障、订阅客户端崩溃或其他原因导致消息接收失败的情况。因此,失败重试机制可以帮助保证消息在分布式环境下的可靠性。
Redis Pub/Sub 的工作原理
- 发布者 (Publisher) 将消息发布到指定的频道。
- 订阅者 (Subscriber) 订阅频道,并监听来自该频道的消息。
- 当有消息发布到订阅的频道时,Redis 会将这些消息推送到所有订阅了该频道的客户端。
失败重试机制在 Redis Pub/Sub 中的应用
在 Redis Pub/Sub 中,如果出现网络问题或客户端挂掉导致的消息丢失,默认情况下消息不会被重试或保存(Redis 本身不支持持久化消息)。因此,如果需要实现失败重试机制,可以采取以下几种策略:
1. 消息确认和重试机制
- 问题:如果消息在订阅者接收时失败(比如网络中断、订阅者崩溃等),这些消息会丢失。
- 解决方案:一种常见的做法是通过消息确认机制来实现重试。每个消息可以通过客户端进行确认,如果未成功处理消息,则将其重新发布到一个队列或另一个频道,等待下一次重试。
实现方式:
- 订阅者在接收到消息时,需要向发布者或消息队列发送确认信号。如果在一定时间内没有收到确认,可以将该消息重新推送到某个死信队列(Dead Letter Queue,DLQ)或者一个等待重试的队列中,等到订阅者恢复正常后,再进行重试。
示例:
java">// Redis Pub/Sub 订阅者代码(使用 Jedis 客户端)
public class MySubscriber extends JedisPubSub {@Overridepublic void onMessage(String channel, String message) {try {// 处理消息processMessage(message);// 发送确认信号sendAcknowledgment(message);} catch (Exception e) {// 处理失败,重试机制handleFailure(message);}}private void handleFailure(String message) {// 如果处理失败,可以将消息重新推送到一个队列或保存到死信队列redisClient.lpush("retryQueue", message);}// 确认消息已处理private void sendAcknowledgment(String message) {redisClient.publish("acknowledgmentChannel", message); // 可选的确认机制}
}
2. 使用 Redis Streams 替代 Pub/Sub
Redis Streams 是一种基于日志的消息队列结构,适合需要消息持久化和重试的场景。与传统的 Pub/Sub 模式不同,Redis Streams 可以存储消息,并且订阅者可以从流的任意位置读取消息,避免了消息丢失的问题。
特点:
- 持久化:Redis Streams 会持久化消息到磁盘,避免了消息丢失的风险。
- 消息确认和重试:消息的消费者可以通过
XACK
命令显式地确认消息,如果没有确认,Redis 可以将消息重新分配给其他消费者或重新发送给原消费者进行重试。
示例:
java">// 发布消息到 Redis Stream
redisClient.xadd("mystream", Map.of("message", "hello"));// 订阅者处理消息
while (true) {List<Map.Entry<String, List<StreamEntry>>> messages = redisClient.xread(StreamEntryID.UNRECEIVED, Map.of("mystream", "0"));for (Map.Entry<String, List<StreamEntry>> streamEntry : messages) {for (StreamEntry entry : streamEntry.getValue()) {try {processMessage(entry.getFields().get("message"));redisClient.xack("mystream", "consumerGroup", entry.getID()); // 消息确认} catch (Exception e) {// 处理失败,可以重新发送消息进行重试redisClient.xadd("retryQueue", Map.of("message", entry.getFields().get("message")));}}}
}
使用 Redis Streams 的优势:
- 消息不丢失:Stream 中的消息会持久化在 Redis 中,订阅者可以在后续任何时候读取。
- 自动重试:可以通过消费组的方式来确保如果某个消费者失败,其他消费者会接管任务。
- 确认机制:消费者可以确认已处理的消息,如果没有确认,Redis 会重新分配任务。
3. 使用 Redis Pub/Sub 的失败重试方案
如果仍然希望使用 Redis 的传统 Pub/Sub 模式并实现某种程度的消息重试,可以结合一些外部机制,例如将消息发布到 Redis Pub/Sub 频道后,同时将消息也存储到一个队列中(如 Redis List、Redis Stream),然后通过定时任务或后台进程来检查未确认的消息。
步骤:
- 订阅者从 Redis 频道获取消息时,在处理前将消息的 ID 记录下来。
- 如果处理失败,订阅者会将消息 ID 添加到一个待重试的队列中(例如 Redis List 或 Stream)。
- 定期检查待重试队列并重试这些消息。
总结
虽然 Redis Pub/Sub 本身并不直接支持消息的失败重试机制,但可以通过以下几种方式来实现:
- 使用 Redis Streams 代替 Pub/Sub,利用其消息持久化和消费确认功能来实现失败重试。
- 使用 消息确认机制,结合 Redis 队列(如 List、Stream)将失败的消息重新推送,进行后续重试。
- 如果不希望改变现有的 Pub/Sub 模式,可以通过后台任务周期性地重试失败消息,将消息记录在专门的队列中。