Redis-分布式锁
如何使用分布式锁
正常在一个java服务中使用sync锁或lock锁完全可以满足线程安全问题的,但是在部署集群的情况下,不同的jvm不能锁同一个方法,因此需要分布式锁用来保护线程安全问题。
分布式锁实现
常见的分布式锁解决方案:
- Mysql:自带悲观锁,但是不太好维护
- redis:利用setnx实现互斥,操作方便,推荐使用
- zookeeper:利用节点实现互斥
本章主要采用redis的方式进行实现
public interface ILock {/*** 分布式-互斥锁*/boolean tryLock(String name, Long time, TimeUnit unit);/*** 分布式-释放互斥锁*/void unLock(String name);
}
/*** 分布式锁实现*/
@Component
public class DistributedLock implements ILock{@Resourceprivate StringRedisTemplate stringRedisTemplate;/** 分布式锁key */private final String DISTRIBUTED_LOCK = "distributed_lock:";/*** 分布式互斥锁*/@Overridepublic boolean tryLock(String name, Long time, TimeUnit unit) {// valueString value = Thread.currentThread().getId() + "";// keyString key = DISTRIBUTED_LOCK + name;Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);// 防止自动拆箱空指针return BooleanUtil.isTrue(aBoolean);}/*** 分布式释放锁*/@Overridepublic void unLock(String name) {String key = DISTRIBUTED_LOCK + name;stringRedisTemplate.delete(key);}
}
分布式锁误删问题
在设置互斥锁的时候为了解决redis宕机导致互斥锁永久失效的情况下,加了一个过期时间。此时如果缓存重建的时间比过期时间更长,会导致多个线程释放不同的锁资源导致分布式锁误删问题。
解决误删问题:
- 需要在获取锁时存入线程表示(uuid + 线程id)的方式
- 在释放锁时需要先获取锁中的线程标识,判断是否与当前线程标识一致
- 如果一致则释放锁
- 如果不一致则不释放锁
更新代码:
@Component
public class DistributedLock implements ILock{@Resourceprivate StringRedisTemplate stringRedisTemplate;/** 分布式锁key */private final String DISTRIBUTED_LOCK = "distributed_lock:";/** UUID */private String uuid = UUID.randomUUID(true).toString();/*** 分布式互斥锁*/@Overridepublic boolean tryLock(String name, Long time, TimeUnit unit) {// valueString value = uuid + Thread.currentThread().getId();// keyString key = DISTRIBUTED_LOCK + name;Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);// 防止自动拆箱空指针return BooleanUtil.isTrue(aBoolean);}/*** 分布式释放锁*/@Overridepublic void unLock(String name) {String key = DISTRIBUTED_LOCK + name;String value = uuid + Thread.currentThread().getId();// 获取互斥锁中的值String redisValue = stringRedisTemplate.opsForValue().get(key);if (StringUtils.equals(value,redisValue)){stringRedisTemplate.delete(key);}}}
Redisson入门
正常使用setnx实现分布式锁存在以下几种问题
- 不可重入锁:同一现成无法多次获取同一把锁
- 不可重试:获取锁只尝试一次就返回,无法重试
- 超时释放:业务执行耗时较长,会导致锁释放
- 主从一致性:集群的情况下主节点宕机后同步数据过程种,导致锁失效
Redisson 是一个 Java 高级 Redis 客户端,提供了基于 Redis 的分布式和可扩展的 Java 数据结构,如并发集合(Concurrent Collections)、同步器(Synchronizers)、分布式服务(Distributed Services)等。Redisson 构建于 Jedis 之上,旨在简化 Redis 的使用,尤其对于分布式环境中的应用程序而言,它提供了一种易于使用的 API 来处理 Redis 中的数据,并实现了多种分布式锁和其他高级功能。Redisson底层采用的是Netty 框架
案例:每个用户对一件商品只能下一单。
配置文件
redisson:# redis key前缀keyPrefix:# 线程池数量threads: 4# Netty线程池数量nettyThreads: 8# 单节点配置singleServerConfig:# 客户端名称clientName: ${ruoyi.name}# 最小空闲连接数connectionMinimumIdleSize: 8# 连接池大小connectionPoolSize: 32# 连接空闲超时,单位:毫秒idleConnectionTimeout: 10000# 命令等待超时,单位:毫秒timeout: 3000# 发布和订阅连接池大小subscriptionConnectionPoolSize: 50
/*** Redisson 配置属性**/
@Data
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {/*** redis缓存key前缀*/private String keyPrefix;/*** 线程池数量,默认值 = 当前处理核数量 * 2*/private int threads;/*** Netty线程池数量,默认值 = 当前处理核数量 * 2*/private int nettyThreads;/*** 单机服务配置*/private SingleServerConfig singleServerConfig;/*** 集群服务配置*/private ClusterServersConfig clusterServersConfig;@Data@NoArgsConstructorpublic static class SingleServerConfig {/*** 客户端名称*/private String clientName;/*** 最小空闲连接数*/private int connectionMinimumIdleSize;/*** 连接池大小*/private int connectionPoolSize;/*** 连接空闲超时,单位:毫秒*/private int idleConnectionTimeout;/*** 命令等待超时,单位:毫秒*/private int timeout;/*** 发布和订阅连接池大小*/private int subscriptionConnectionPoolSize;}@Data@NoArgsConstructorpublic static class ClusterServersConfig {/*** 客户端名称*/private String clientName;/*** master最小空闲连接数*/private int masterConnectionMinimumIdleSize;/*** master连接池大小*/private int masterConnectionPoolSize;/*** slave最小空闲连接数*/private int slaveConnectionMinimumIdleSize;/*** slave连接池大小*/private int slaveConnectionPoolSize;/*** 连接空闲超时,单位:毫秒*/private int idleConnectionTimeout;/*** 命令等待超时,单位:毫秒*/private int timeout;/*** 发布和订阅连接池大小*/private int subscriptionConnectionPoolSize;/*** 读取模式*/private ReadMode readMode;/*** 订阅模式*/private SubscriptionMode subscriptionMode;}}
/*** redis配置**/
@Slf4j
@AutoConfiguration
@EnableCaching
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisConfig {@Autowiredprivate RedissonProperties redissonProperties;@Autowiredprivate ObjectMapper objectMapper;@Beanpublic RedissonAutoConfigurationCustomizer redissonCustomizer() {return config -> {ObjectMapper om = objectMapper.copy();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);// 指定序列化输入的类型,类必须是非final修饰的。序列化时将对象全类名一起保存下来om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);TypedJsonJacksonCodec jsonCodec = new TypedJsonJacksonCodec(Object.class, om);// 组合序列化 key 使用 String 内容使用通用 json 格式CompositeCodec codec = new CompositeCodec(StringCodec.INSTANCE, jsonCodec, jsonCodec);config.setThreads(redissonProperties.getThreads()).setNettyThreads(redissonProperties.getNettyThreads())// 缓存 Lua 脚本 减少网络传输(redisson 大部分的功能都是基于 Lua 脚本实现).setUseScriptCache(true).setCodec(codec);RedissonProperties.SingleServerConfig singleServerConfig = redissonProperties.getSingleServerConfig();if (ObjectUtil.isNotNull(singleServerConfig)) {// 使用单机模式config.useSingleServer()//设置redis key前缀.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix())).setTimeout(singleServerConfig.getTimeout()).setClientName(singleServerConfig.getClientName()).setIdleConnectionTimeout(singleServerConfig.getIdleConnectionTimeout()).setSubscriptionConnectionPoolSize(singleServerConfig.getSubscriptionConnectionPoolSize()).setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize()).setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());}// 集群配置方式 参考下方注释RedissonProperties.ClusterServersConfig clusterServersConfig = redissonProperties.getClusterServersConfig();if (ObjectUtil.isNotNull(clusterServersConfig)) {config.useClusterServers()//设置redis key前缀.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix())).setTimeout(clusterServersConfig.getTimeout()).setClientName(clusterServersConfig.getClientName()).setIdleConnectionTimeout(clusterServersConfig.getIdleConnectionTimeout()).setSubscriptionConnectionPoolSize(clusterServersConfig.getSubscriptionConnectionPoolSize()).setMasterConnectionMinimumIdleSize(clusterServersConfig.getMasterConnectionMinimumIdleSize()).setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize()).setSlaveConnectionMinimumIdleSize(clusterServersConfig.getSlaveConnectionMinimumIdleSize()).setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize()).setReadMode(clusterServersConfig.getReadMode()).setSubscriptionMode(clusterServersConfig.getSubscriptionMode());}log.info("初始化 redis 配置");};}
@RequiredArgsConstructor
@Service
public class BookOrderServiceImpl implements IBookOrderService {private final BookOrderMapper baseMapper;private final SysUserMapper sysUserMapper;private final BooksMapper booksMapper;private final BookOrderDetailMapper bookOrderDetailMapper;/** 自定义分布式锁 */private final DistributedLock distributedLock;/** redission */private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);/*** 模拟库存扣减并发问题*/@Transactional(rollbackFor = Exception.class)@Overridepublic void inventory(String bookId,Long userId) {// 一人一单校验Long aLong = bookOrderDetailMapper.selectCount(Wrappers.lambdaQuery(BookOrderDetail.class).eq(BookOrderDetail::getNumber, userId).eq(BookOrderDetail::getBookId,bookId));if (aLong > 0){throw new ServiceException("下单失败");}// 自定义获取锁// boolean piker = distributedLock.tryLock("PIKER", 10L, TimeUnit.SECONDS);// redisClient分布式锁RLock lock = CLIENT.getLock("lock:order:");// 默认失败不等待锁时间,锁过期时间30秒boolean piker = lock.tryLock();if (piker){try {// 订单业务placingOrder(bookId, userId);}finally {// 自定义释放锁// distributedLock.unLock("PIKER");// redisson 释放锁lock.unlock();}}}/*** 业务操作*/private void placingOrder(String bookId, Long userId) {// 1.减少库存Books books = booksMapper.selectById(bookId);books.setStockQuantity(books.getStockQuantity() - 1);booksMapper.updateById(books);// 2.增加订单BookOrderDetail bookOrder = new BookOrderDetail();bookOrder.setBookId(Long.parseLong(bookId));bookOrder.setNumber(userId.intValue());bookOrderDetailMapper.insert(bookOrder);}
}
Redisson-分布式锁实现原理
1.可重入锁
方法1{获取锁调用方法2
}
方法2{获取锁
}
以上这种情况下使用自定义的setnx方式就会造成死锁的情况,比较经典的重入锁。
Rdisson使用Lua脚本来实现可重入锁的。
2.重试机制,超时释放
重试机制:在设置互斥锁时有两个线程A,B。A线程先获取锁资源,之后B在获取锁就会一直失败,因为锁的互斥性,没有重试的机制。
超时释放:给锁设置一个过期时间,防止redis宕机情况下锁一直没有办法被释放导致死锁情况,或者因为业务原因导致缓存重建时间大于锁过期时间导致数据丢失
注意:redisson不同版本的代码不同,但是整体流程是大差不大的,下面是结合黑马程序猿老师结合总结的流程图。
如果自己设置失效时间的话,锁过期时间就不是-1因此就不会触发看门狗机制了。
获取锁:
释放锁:
有了以上的机制可以实现:我有三个线程 A,B 设置等待时间3秒,线程A先获取到锁,由于业务原因进行阻塞,此时线程2开始获取锁。线程A业务执行了4秒,那么首先线程2获取锁失败。如果线程A执行业务在3秒内完成,那么线程2可以成功获取锁。