Redis-分布式锁

devtools/2024/10/18 1:14:35/

手写分布式

qa

redis_4">redis除了做缓存,还有什么用法

redis_cap_9">redis 单机与集群的cap分析

锁的种类

一个分布式锁需要满足的条件和刚需

  • 独占性:任何时间只能有一个线程占有
  • 高可用:
    • redis集群环境下,不能因为一个节点挂了而出现获取锁和释放锁失败的情况
    • 高并发请求下,依旧性能 ok
  • 防死锁:杜绝死锁,必须有超时控制机制与撤销操作,有个兜底终止跳出方案
  • 不乱抢:防止张冠李戴,不能私下unlock别人的锁,只能自己加锁自己释放,自己约的锁含着泪也要自己解
  • 重入性:同一个节点的同一个线程,获得锁之后,他也可以再次获取这个锁

分布式锁及其重点

setnx 来获取锁

redis__31">boot+redis 基础案例

使用场景

@Service
@Slf4j
public class InventoryService {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Value("${server.port}")private String port;private Lock lock = new ReentrantLock();public String sale() {String retMessage = "";lock.lock();try {//1 查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//2 判断库存是否足够Integer inventoryNumber = result == null? 0 : Integer.parseInt(result);//3 扣除库存,每次减少一个if (inventoryNumber > 0) {inventoryNumber--;stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(inventoryNumber));retMessage = "成功卖出一个商品,剩余库存" + inventoryNumber;System.out.println(retMessage + ",端口号:" + port);}else {retMessage = "商品卖完了";}} finally {lock.unlock();}return retMessage;}
}
@RestController
@Api(tags = "redis 分布式锁测试")
public class InventoryController {@Autowiredprivate InventoryService inventoryService;@ApiOperation("扣除库存,一次卖一个")@GetMapping("/inventory/sale")public String sale() {return inventoryService.sale();}}

GET http://localhost:6000/inventory/sale

手写分布式锁分析

拷贝出来一份

nginx 负载均衡

upstream stock {
server 127.0.0.1:6001 weight=1;
server 127.0.0.1:6000 weight=1;
}server {
listen 9000;
server_name localhost;location / {
proxy_pass http://stock;
index index.html index.htm;
}
}
GET http://localhost:9000/inventory/sale

jmeter

添加线程组 100*1

添加取样器-http请求

添加 listener-聚合报告

jmeter 100个线程1s执行完毕

执行完毕查看 库存数

为什么加了synchronized 和 lock 但是没有控制住

3.1 版本 setnx
public String sale() {String retMessage = "";String key = "zzyRedisLock";String uuidValue = IdUtil.simpleUUID() + Thread.currentThread().getId();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);if (!flag) {// 暂停20s,进行递归重试try {TimeUnit.MICROSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}sale();} else {// 抢锁成功的请求线程,进行正常的业务逻辑操作,扣减库存try {//1 查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//2 判断库存是否足够Integer inventoryNumber = result == null? 0 : Integer.parseInt(result);//3 扣除库存,每次减少一个if (inventoryNumber > 0) {inventoryNumber--;stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(inventoryNumber));retMessage = "成功卖出一个商品,剩余库存" + inventoryNumber;System.out.println(retMessage + ",端口号:" + port);}else {retMessage = "商品卖完了";}} finally {stringRedisTemplate.delete(key);}}return retMessage + ",端口号:" + port;}

3.2 自旋代替递归重试 while替代if
public String sale() {String retMessage = "";String key = "zzyRedisLock";String uuidValue = IdUtil.simpleUUID() + Thread.currentThread().getId();// 不用递归了,高并发下使用自旋替代递归重试,使用whilewhile (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) {try {TimeUnit.MICROSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}}// 抢锁成功的请求线程,进行正常的业务逻辑操作,扣减库存try {//1 查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//2 判断库存是否足够Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);//3 扣除库存,每次减少一个if (inventoryNumber > 0) {inventoryNumber--;stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(inventoryNumber));retMessage = "成功卖出一个商品,剩余库存" + inventoryNumber;System.out.println(retMessage + ",端口号:" + port);} else {retMessage = "商品卖完了";}} finally {stringRedisTemplate.delete(key);}return retMessage + ",端口号:" + port;}

部署了微服务的Java程序挂了,代码层面根本没有走到 finally 这块,没办法保证解锁(无过期时间该key一直存在),这个key没有被删除,需要加入一个过期时间限定 key

4.1 setnx 添加过期时间

4.2 加锁和过期时间必须在同一行,保证原子性

5.0 防止误删key

锁超时误删除

6.0 lua 脚本实现删除时判断和删除操作的原子性

Distributed Locks with Redis

介绍 lua 脚本

调用 lua 脚本,教程

  1. 实现 hello lua

注意 0 作为唤醒词

  1. 实现 set expire get

redis.call 用来调用命令,最后一个需要返回使用 return

  1. mset 重在掌握 lua脚本中 args传参的用法

2 -> key argv 数量

k1 k2 -> key1 key2 前两个 key,后面都是 argv

lua1 lua2 -> argv1 argv2

  1. 官网lua脚本 如何执行呢
if redis.call("get",KEYS[1]) == ARGV[1] thenreturn redis.call("del",KEYS[1])
elsereturn 0
end

补充说明 if elseif else

// 改进点,修改为 Lua 脚本的 redis 分布式锁调用,必须保证原子性,参考官网脚本案例
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +"return redis.call('del',KEYS[1]) " +"else " +"return 0 " +"end";
stringRedisTemplate.execute(new DefaultRedisScript(luaScript, Boolean.class), Arrays.asList(key), uuidValue);
  • new DefaultRedisScript(luaScript, Boolean.class) -> Boolean.class 指定返回值,不指定会报错

7.0 可重入 + 设计模式
juc 中可重入的解释

可重入锁的解释

juc 中可重入锁的案例演示(同步代码块,同步方法,Lock重点)

同步代码块

同步方法

注意使用 Lock 的时候,重入的时候加锁解锁次数要匹配,解锁次数不足则会导致锁仍然被占用,记得这里有个计数器

redis__aqs__343">redis 中如何实现 aqs 中可重入规范

需要一个计数器,需要 kkv 这种结构才能满足 -> hset

实现思路分析

lua 脚本分析
加锁Lock
  • 判断redis分布式锁key是否存在 EXISTS key
    • 不存在,hset新建当前线程属于自己的锁 uuid:ThreadId
    • 存在,说明已经有锁 HEXISTS key uuid:ThreadId
      • 不是自己的
      • 是自己的,自增一次
-- 加锁的lua的脚本,对标我们的lock方法
-- 加锁 v1
if redis.call('exists', 'key') == 0 thenredis.call('hset', 'key', 'uuid:threadid', 1)redis.call('expire', 'key', '50')return 1
elseif redis.call('hexists', 'key', 'uuid:threadid') == 1 thenredis.call('hincrby', 'key', 'uuid:threadid', 1)redis.call('expire', 'key', '50')return 1
elsereturn 0
end-- v2 合并相同的代码,用incrby替代hset 简化代码
if redis.call('exists', 'key') == 0 or redis.call('hexists', 'key', 'uuid:threadid') == 1 thenredis.call('hincrby', 'key', 'uuid:threadid', 1)redis.call('expire', 'key', '50')return 1
elsereturn 0
end-- v3 脚本ok了,	替换参数
if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 thenredis.call('hincrby', KEY[1], ARGV[1], 1)redis.call('expire', KEY[1], ARGV[2])return 1
elsereturn 0
end-- 合并到一行
if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEY[1], ARGV[1], 1) redis.call('expire', KEY[1], ARGV[2]) return 1 else return 0 end
加锁 unLock

设计思路:解锁,还得是自己的锁

-- v1 解锁
if redis.call('hexists', 'key', 'uuid:threadid') == 0 thenreturn nil
elseif redis.call('hincrby', 'key', 'uuid:threadid', -1) == 0 thenreturn redis.call('del', 'key')
elsereturn 0
end
-- v2 替换下参数
if redis.call('hexists', KEYS[1], ARGV[1]) == 0 thenreturn nil
elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 thenreturn redis.call('del', KEYS[1])
elsereturn 0
end

给出测试代码

加锁四次,解锁四次

EVAL "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end" 1 zzyyRedisLock 001:1 50
EVAL "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end" 1 zzyyRedisLock 001:1 50
EVAL "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end" 1 zzyyRedisLock 001:1 50
EVAL "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end" 1 zzyyRedisLock 001:1 50
ttl zzyyRedisLock
HGET zzyyRedisLock 001:1
EVAL "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then return redis.call('del', KEYS[1]) else return 0 end" 1 zzyyRedisLock 001:1
EVAL "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then return redis.call('del', KEYS[1]) else return 0 end" 1 zzyyRedisLock 001:1
EVAL "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then return redis.call('del', KEYS[1]) else return 0 end" 1 zzyyRedisLock 001:1
EVAL "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then return redis.call('del', KEYS[1]) else return 0 end" 1 zzyyRedisLock 001:1
lua脚本整合进程序->实现 Lock 接口

我们创建一个 Lock 实现类,重写lock(重载到 tryLock 中最终实现),unlock方法

@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time == -1L) {String luaScript = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +"    redis.call('hincrby', KEYS[1], ARGV[1], 1) " +"    redis.call('expire', KEYS[1], ARGV[2]) " +"    return 1 " +"else" +"    return 0 " +"end";while (!(Boolean) stringRedisTemplate.execute(new DefaultRedisScript(luaScript, Boolean.class),Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))){try {Thread.sleep(60);}catch (InterruptedException e) {e.printStackTrace();}}return true;}return false;}@Overridepublic void unlock() {String luaScript = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +"    return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then " +"    return redis.call('del', KEYS[1]) " +"else " +"    return 0 " +"end";// nil = false 1 = true 0 = false// 这里返回值不方便使用 BooleanLong flag = (Long) stringRedisTemplate.execute(new DefaultRedisScript(luaScript, Long.class),Arrays.asList(lockName), uuidValue);if (null == flag) {throw new RuntimeException("锁不存在");}}
7.1 工厂模式优化,方便整合其他锁实现

锁的实现写死了,以后如果引入别的类型的锁,zookeeper 等

@Component
public class DistributedLockFactory {@Autowiredprivate StringRedisTemplate stringRedisTemplate;private String lockName;public Lock getDistributedLock(String lockType) {if (lockType == null) {return null;}if (lockType.equalsIgnoreCase("REDIS")) {this.lockName = "zzyRedisLock";return new RedisDistributedLock(stringRedisTemplate, lockName);}else if (lockType.equalsIgnoreCase("ZOOKEEPER")) {this.lockName = "zzyZookeeperLock";// TODO zookeeper lockreturn null;} else if (lockType.equalsIgnoreCase("MYSQL")) {// TODO mysql lock}return null;}}
7.2 可重入测试重点->解决uuid不一致,最终实现可重入

解决重入后 uuid 不一致问题

uuid 直接从工厂获取

这块说的比较碎,结合起来理解下

8.0 自动实现锁续期
CAP

Redis 可能存在异步复制导致的锁丢失,主节点还没来得及把刚刚set进来的这条数据同步到从节点,master就挂了,从机上位,但从机上无该数据

zookeeper 只有整个同步全部成功,才会注册成功,速度会稍慢。主节点挂了,只有选举出新老大后,才可用

euraka

加钟的 lua

在拿到锁的同时添加一个后台程序,为ttl 的 1/3

续期后再调用自己添加 监视程序,保证能够一直续期

if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then return redis.call('expire', KEYS[1], ARGV[1])
elsereturn 0
end
代码中实现->加一个延时定时器任务

递归调用,3/1 时间间隔,如果锁仍然持有的话

比如 expireTime = 30,第20秒触发,继续续到 30

// tryLock 返回之前
// 新建一个后台扫描程序,来监视key目前的ttl,是否到我们规定的 1/2 1/3 来实现续期
resetExpire();public void resetExpire() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +"return redis.call('expire', KEYS[1], ARGV[2]) " +"else " +"return 0 " +"end";new Timer().schedule(new TimerTask() {@Overridepublic void run() {if ((boolean) stringRedisTemplate.execute(new DefaultRedisScript(script, Boolean.class),Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) {resetExpire();}}}, (this.expireTime * 1000) / 3);}

小总结&如果让你自研一把分布式锁,你要怎么做

  1. 按照 juc Lock 接口规范编写
  2. lock 加锁关键逻辑
    1. 加锁
      1. 给key设置一个值,为避免死锁,并给定一个过期时间
    2. 自旋
    3. 续期

  1. unlock 解锁关键逻辑
    1. 将 key 键删除,但也不能乱删,不能删除其他客户端的锁

redisson_614">redlock与 redisson

来由->我们手写的分布式锁有什么缺点

redis 单机故障

RedLock算法设计理念

容错公式

v9.0

Getting Started - Redisson Reference Guide

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.20.1</version>
</dependency>  
@Bean
public Redisson redisson() {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);return (Redisson) Redisson.create(config);
}RLock lock = redisson.getLock(key);

看门狗,续期

v9.1

Redisson 源码分析

多机案例->MultiLock 创建联锁

但 RedLock 被标记弃用

将多个 RLock 对象关联为一个联锁对象,只有所有节点加锁成功,联锁才成功

实操

我们创建三个 RedisClient,然后创建一个联锁即可


http://www.ppmy.cn/devtools/125408.html

相关文章

C++ include头文件的顺序以及双引号““和尖括号<>的区别

本文章进一步详细解释 #include 的头文件包含机制&#xff0c;包括搜索路径的处理、双引号 "" 和尖括号 <> 在不同环境中的使用差异&#xff0c;以及它们的底层机制。 1. 头文件包含机制和搜索路径详解 #include 是一个预处理指令&#xff0c;用于在编译前将…

Git 工作区、暂存区和仓库

在使用 Git 进行版本控制时&#xff0c;工作区、暂存区和仓库概念的详细解释&#xff1a; 1. 工作区&#xff08;Working Directory&#xff09; 工作区是你在计算机上实际编辑文件的地方。当你克隆一个 Git 仓库或在现有目录中初始化一个 Git 仓库时&#xff0c;这个目录就是…

vue 请求竞态 中断请求 解决切换表格数据,数据发生错乱

//1&#xff0c;声明缓存请求的集合 const pendingRequest new Map(); //2,请求url和method生成key const generateRequestKey <T extends AxiosRequestConfig>(config: T) > {const { method, url } configreturn [method, url].join("&") } //3,缓…

【Android】在安卓中使用 `mobile-ffmpeg` 压缩后的视频,浏览器在线播放提示“没有找到支持的视频格式和 MIME 类型”的解决方案

在安卓中使用 mobile-ffmpeg 压缩后的视频&#xff0c;浏览器在线播放提示“没有找到支持的视频格式和 MIME 类型”的解决方案 你可能在安卓开发中使用了 mobile-ffmpeg 进行视频压缩&#xff0c;而当你尝试在浏览器中在线播放压缩后的视频时&#xff0c;看到提示&#xff1a;…

【算法思想·二叉树】用「遍历」思维解题 II

本文参考labuladongsuanfa笔记[【强化练习】用「遍历」思维解题 II | labuladong 的算法笔记] 如果让你在二叉树中的某些节点上做文章&#xff0c;一般来说也可以直接用遍历的思维模式。 270. 最接近的二叉搜索树值 | 力扣 | LeetCode | 给你二叉搜索树的根节点 root 和一个目…

python:假的身份信息生成模块faker

前言 发现一个有趣的python模块&#xff08;faker&#xff09;&#xff0c;他支持生成多个国家语言下的假身份信息&#xff0c;包含人名、地址、邮箱、公司名、电话号码、甚至是个人简历&#xff01; 你可以拿它做一些自动化测试&#xff0c;或一些跟假数据有关的填充工作。 代…

机器学习笔记(五)--神经网络

神经元网络&#xff1a;M-P神经元模型 神经元接收来自其他n个神经元传递过来的信号&#xff0c;这些信号通过有权重的连接进行传递&#xff0c;神经元接收到的总输入值将与神经元的阈值进行对比&#xff0c;通过激活函数的处理产生神经元的输出。 感知机与多层网络 感知机&a…

如何恢复笔记本电脑上误删除的谷歌浏览器数据

在使用笔记本电脑的过程中&#xff0c;有时我们可能会不小心删除了重要的谷歌浏览器数据&#xff0c;如书签、历史记录或保存的密码。本文将详细介绍如何在笔记本上恢复这些误删除的数据&#xff0c;帮助你找回丢失的信息。&#xff08;本文由https://www.gugeliulanqi.com.cn/…