Redis-分布式锁

ops/2024/11/10 12:14:00/

Redis-分布式

如何使用分布式

正常在一个java服务中使用sync锁或lock锁完全可以满足线程安全问题的,但是在部署集群的情况下,不同的jvm不能锁同一个方法,因此需要分布式锁用来保护线程安全问题。

分布式锁实现

常见的分布式锁解决方案:

  1. Mysql:自带悲观锁,但是不太好维护
  2. redis:利用setnx实现互斥,操作方便,推荐使用
  3. zookeeper:利用节点实现互斥

本章主要采用redis的方式进行实现

public interface ILock {/*** 分布式-互斥锁*/boolean tryLock(String name, Long time, TimeUnit unit);/*** 分布式-释放互斥锁*/void unLock(String name);
}
/*** 分布式锁实现*/
@Component
public class DistributedLock implements ILock{@Resourceprivate StringRedisTemplate stringRedisTemplate;/** 分布式锁key */private final String DISTRIBUTED_LOCK = "distributed_lock:";/*** 分布式互斥锁*/@Overridepublic boolean tryLock(String name, Long time, TimeUnit unit) {// valueString value = Thread.currentThread().getId() + "";// keyString key = DISTRIBUTED_LOCK + name;Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);// 防止自动拆箱空指针return BooleanUtil.isTrue(aBoolean);}/*** 分布式释放锁*/@Overridepublic void unLock(String name) {String key = DISTRIBUTED_LOCK + name;stringRedisTemplate.delete(key);}
}

分布式锁误删问题

在设置互斥锁的时候为了解决redis宕机导致互斥锁永久失效的情况下,加了一个过期时间。此时如果缓存重建的时间比过期时间更长,会导致多个线程释放不同的锁资源导致分布式锁误删问题。
解决误删问题:

  1. 需要在获取锁时存入线程表示(uuid + 线程id)的方式
  2. 在释放锁时需要先获取锁中的线程标识,判断是否与当前线程标识一致
    • 如果一致则释放锁
    • 如果不一致则不释放锁

更新代码:

@Component
public class DistributedLock implements ILock{@Resourceprivate StringRedisTemplate stringRedisTemplate;/** 分布式锁key */private final String DISTRIBUTED_LOCK = "distributed_lock:";/** UUID */private String uuid = UUID.randomUUID(true).toString();/*** 分布式互斥锁*/@Overridepublic boolean tryLock(String name, Long time, TimeUnit unit) {// valueString value = uuid + Thread.currentThread().getId();// keyString key = DISTRIBUTED_LOCK + name;Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);// 防止自动拆箱空指针return BooleanUtil.isTrue(aBoolean);}/*** 分布式释放锁*/@Overridepublic void unLock(String name) {String key = DISTRIBUTED_LOCK + name;String value = uuid + Thread.currentThread().getId();// 获取互斥锁中的值String redisValue = stringRedisTemplate.opsForValue().get(key);if (StringUtils.equals(value,redisValue)){stringRedisTemplate.delete(key);}}}

Redisson入门

正常使用setnx实现分布式锁存在以下几种问题

  1. 不可重入锁:同一现成无法多次获取同一把锁
  2. 不可重试:获取锁只尝试一次就返回,无法重试
  3. 超时释放:业务执行耗时较长,会导致锁释放
  4. 主从一致性:集群的情况下主节点宕机后同步数据过程种,导致锁失效

Redisson 是一个 Java 高级 Redis 客户端,提供了基于 Redis 的分布式和可扩展的 Java 数据结构,如并发集合(Concurrent Collections)、同步器(Synchronizers)、分布式服务(Distributed Services)等。Redisson 构建于 Jedis 之上,旨在简化 Redis 的使用,尤其对于分布式环境中的应用程序而言,它提供了一种易于使用的 API 来处理 Redis 中的数据,并实现了多种分布式锁和其他高级功能。Redisson底层采用的是Netty 框架

案例:每个用户对一件商品只能下一单。

配置文件

redisson:# redis key前缀keyPrefix:# 线程池数量threads: 4# Netty线程池数量nettyThreads: 8# 单节点配置singleServerConfig:# 客户端名称clientName: ${ruoyi.name}# 最小空闲连接数connectionMinimumIdleSize: 8# 连接池大小connectionPoolSize: 32# 连接空闲超时,单位:毫秒idleConnectionTimeout: 10000# 命令等待超时,单位:毫秒timeout: 3000# 发布和订阅连接池大小subscriptionConnectionPoolSize: 50
/*** Redisson 配置属性**/
@Data
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {/*** redis缓存key前缀*/private String keyPrefix;/*** 线程池数量,默认值 = 当前处理核数量 * 2*/private int threads;/*** Netty线程池数量,默认值 = 当前处理核数量 * 2*/private int nettyThreads;/*** 单机服务配置*/private SingleServerConfig singleServerConfig;/*** 集群服务配置*/private ClusterServersConfig clusterServersConfig;@Data@NoArgsConstructorpublic static class SingleServerConfig {/*** 客户端名称*/private String clientName;/*** 最小空闲连接数*/private int connectionMinimumIdleSize;/*** 连接池大小*/private int connectionPoolSize;/*** 连接空闲超时,单位:毫秒*/private int idleConnectionTimeout;/*** 命令等待超时,单位:毫秒*/private int timeout;/*** 发布和订阅连接池大小*/private int subscriptionConnectionPoolSize;}@Data@NoArgsConstructorpublic static class ClusterServersConfig {/*** 客户端名称*/private String clientName;/*** master最小空闲连接数*/private int masterConnectionMinimumIdleSize;/*** master连接池大小*/private int masterConnectionPoolSize;/*** slave最小空闲连接数*/private int slaveConnectionMinimumIdleSize;/*** slave连接池大小*/private int slaveConnectionPoolSize;/*** 连接空闲超时,单位:毫秒*/private int idleConnectionTimeout;/*** 命令等待超时,单位:毫秒*/private int timeout;/*** 发布和订阅连接池大小*/private int subscriptionConnectionPoolSize;/*** 读取模式*/private ReadMode readMode;/*** 订阅模式*/private SubscriptionMode subscriptionMode;}}
/*** redis配置**/
@Slf4j
@AutoConfiguration
@EnableCaching
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisConfig {@Autowiredprivate RedissonProperties redissonProperties;@Autowiredprivate ObjectMapper objectMapper;@Beanpublic RedissonAutoConfigurationCustomizer redissonCustomizer() {return config -> {ObjectMapper om = objectMapper.copy();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);// 指定序列化输入的类型,类必须是非final修饰的。序列化时将对象全类名一起保存下来om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);TypedJsonJacksonCodec jsonCodec = new TypedJsonJacksonCodec(Object.class, om);// 组合序列化 key 使用 String 内容使用通用 json 格式CompositeCodec codec = new CompositeCodec(StringCodec.INSTANCE, jsonCodec, jsonCodec);config.setThreads(redissonProperties.getThreads()).setNettyThreads(redissonProperties.getNettyThreads())// 缓存 Lua 脚本 减少网络传输(redisson 大部分的功能都是基于 Lua 脚本实现).setUseScriptCache(true).setCodec(codec);RedissonProperties.SingleServerConfig singleServerConfig = redissonProperties.getSingleServerConfig();if (ObjectUtil.isNotNull(singleServerConfig)) {// 使用单机模式config.useSingleServer()//设置redis key前缀.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix())).setTimeout(singleServerConfig.getTimeout()).setClientName(singleServerConfig.getClientName()).setIdleConnectionTimeout(singleServerConfig.getIdleConnectionTimeout()).setSubscriptionConnectionPoolSize(singleServerConfig.getSubscriptionConnectionPoolSize()).setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize()).setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());}// 集群配置方式 参考下方注释RedissonProperties.ClusterServersConfig clusterServersConfig = redissonProperties.getClusterServersConfig();if (ObjectUtil.isNotNull(clusterServersConfig)) {config.useClusterServers()//设置redis key前缀.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix())).setTimeout(clusterServersConfig.getTimeout()).setClientName(clusterServersConfig.getClientName()).setIdleConnectionTimeout(clusterServersConfig.getIdleConnectionTimeout()).setSubscriptionConnectionPoolSize(clusterServersConfig.getSubscriptionConnectionPoolSize()).setMasterConnectionMinimumIdleSize(clusterServersConfig.getMasterConnectionMinimumIdleSize()).setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize()).setSlaveConnectionMinimumIdleSize(clusterServersConfig.getSlaveConnectionMinimumIdleSize()).setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize()).setReadMode(clusterServersConfig.getReadMode()).setSubscriptionMode(clusterServersConfig.getSubscriptionMode());}log.info("初始化 redis 配置");};}
@RequiredArgsConstructor
@Service
public class BookOrderServiceImpl implements IBookOrderService {private final BookOrderMapper baseMapper;private final SysUserMapper sysUserMapper;private final BooksMapper booksMapper;private final BookOrderDetailMapper bookOrderDetailMapper;/** 自定义分布式锁 */private final DistributedLock distributedLock;/** redission  */private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);/*** 模拟库存扣减并发问题*/@Transactional(rollbackFor = Exception.class)@Overridepublic void inventory(String bookId,Long userId) {// 一人一单校验Long aLong = bookOrderDetailMapper.selectCount(Wrappers.lambdaQuery(BookOrderDetail.class).eq(BookOrderDetail::getNumber, userId).eq(BookOrderDetail::getBookId,bookId));if (aLong > 0){throw new ServiceException("下单失败");}// 自定义获取锁// boolean piker = distributedLock.tryLock("PIKER", 10L, TimeUnit.SECONDS);// redisClient分布式RLock lock = CLIENT.getLock("lock:order:");// 默认失败不等待锁时间,锁过期时间30秒boolean piker = lock.tryLock();if (piker){try {// 订单业务placingOrder(bookId, userId);}finally {// 自定义释放锁// distributedLock.unLock("PIKER");// redisson 释放锁lock.unlock();}}}/*** 业务操作*/private void placingOrder(String bookId, Long userId) {// 1.减少库存Books books = booksMapper.selectById(bookId);books.setStockQuantity(books.getStockQuantity() - 1);booksMapper.updateById(books);// 2.增加订单BookOrderDetail bookOrder = new BookOrderDetail();bookOrder.setBookId(Long.parseLong(bookId));bookOrder.setNumber(userId.intValue());bookOrderDetailMapper.insert(bookOrder);}
}

Redisson-分布式锁实现原理

1.可重入锁
方法1{获取锁调用方法2
}
方法2{获取锁
}

以上这种情况下使用自定义的setnx方式就会造成死锁的情况,比较经典的重入锁。

Rdisson使用Lua脚本来实现可重入锁的。
在这里插入图片描述

2.重试机制,超时释放

重试机制:在设置互斥锁时有两个线程A,B。A线程先获取锁资源,之后B在获取锁就会一直失败,因为锁的互斥性,没有重试的机制。

超时释放:给锁设置一个过期时间,防止redis宕机情况下锁一直没有办法被释放导致死锁情况,或者因为业务原因导致缓存重建时间大于锁过期时间导致数据丢失

注意:redisson不同版本的代码不同,但是整体流程是大差不大的,下面是结合黑马程序猿老师结合总结的流程图。
如果自己设置失效时间的话,锁过期时间就不是-1因此就不会触发看门狗机制了。

获取锁:

在这里插入图片描述

释放锁:

在这里插入图片描述

有了以上的机制可以实现:我有三个线程 A,B 设置等待时间3秒,线程A先获取到锁,由于业务原因进行阻塞,此时线程2开始获取锁。线程A业务执行了4秒,那么首先线程2获取锁失败。如果线程A执行业务在3秒内完成,那么线程2可以成功获取锁。


http://www.ppmy.cn/ops/5237.html

相关文章

uni-app vue3 setup 如何使用 onShow

在uni-app中&#xff0c;onShow是uni.onAppShow的别名&#xff0c;用于监听当前小程序被用户切换到前台运行时触发。在Vue 3中&#xff0c;你可以通过以下方式使用onShow&#xff1a; 在页面的vue文件中添加onShow方法&#xff1a; javascript <button click“onShow”&g…

C# .NET 中的反应式系统

概述&#xff1a;反应式系统已成为构建健壮、可扩展和响应迅速的应用程序的强大范式。这些系统被设计为更具弹性、弹性和消息驱动性&#xff0c;确保它们在各种条件下保持响应&#xff0c;包括高负载、网络延迟和故障。在本文中&#xff0c;我们将探讨 .NET 生态系统中反应式系…

ElasticSearch可视化工具:kibana + elasticsearch-head

kibana 下载 地址&#xff1a;https://www.elastic.co/cn/downloads/kibana 下载别的版本&#xff1a;https://www.elastic.co/cn/downloads/past-releases#kibana 将Kibana安装包解压缩 进入config目录&#xff0c;在kibana.yml中添加es服务器地址。&#xff08;如果之前没…

AppBuilder升级!工作流编排正式上线!AssistantsAPI开放邀测!

>>【v0.5.3版本】 上线时间&#xff1a;2024/4/14 关键发版信息&#xff1a; 低代码态&#xff1a;新增工作流&#xff0c;低代码制作组件 自定义组件&#xff1a;支持用户自定义创建组件&#xff0c;并被Agent自动编排调用
 工作流框架&#xff1a;组件支持流式编排…

【HCIP学习】OSPF协议基础

一、OSPF基础 1、技术背景&#xff08;RIP中存在的问题&#xff09; RIP中存在最大跳数为15的限制&#xff0c;不能适应大规模组网 周期性发送全部路由信息&#xff0c;占用大量的带宽资源 路由收敛速度慢 以跳数作为度量值 存在路由环路可能性 每隔30秒更新 2、OSPF协议…

OWASP发布10大开源软件风险清单

3月20日&#xff0c;xz-utils 项目被爆植入后门震惊了整个开源社区&#xff0c;2021 年 Apache Log4j 漏洞事件依旧历历在目。倘若该后门未被及时发现&#xff0c;那么将很有可能成为影响最大的软件供应链漏洞之一。近几年爆发的一系列供应链漏洞和风险&#xff0c;使得“加强开…

简介:Asp.Net Core进阶高级编程教程

课程简介目录 &#x1f680;前言一、课程背景二、课程目的三、课程特点四、课程适合人员六、最后 &#x1f680;前言 本文是《.Net Core进阶编程课程》教程专栏的导航站&#xff08;点击链接&#xff0c;跳转到专栏主页&#xff0c;欢迎订阅&#xff0c;持续更新…&#xff09…

【笔记】Android 网络漫游更新网络状态、运营商名称等信息日志分析

业务知识 漫游有国内和国际漫游之分,Android代码定义如下: //frameworks/base/telephony/java/android/telephony/ServiceState.java/*** registered in a domestic roaming network* @hide*/@SystemApipublic static final int ROAMING_TYPE_DOMESTIC = 2;/*** registered…