手写分布式锁
qa
redis_4">redis除了做缓存,还有什么用法
redis_cap_9">redis 单机与集群的cap分析
锁的种类
一个分布式锁需要满足的条件和刚需
- 独占性:任何时间只能有一个线程占有
- 高可用:
- 在redis集群环境下,不能因为一个节点挂了而出现获取锁和释放锁失败的情况
- 高并发请求下,依旧性能 ok
- 防死锁:杜绝死锁,必须有超时控制机制与撤销操作,有个兜底终止跳出方案
- 不乱抢:防止张冠李戴,不能私下unlock别人的锁,只能自己加锁自己释放,自己约的锁含着泪也要自己解
- 重入性:同一个节点的同一个线程,获得锁之后,他也可以再次获取这个锁
分布式锁及其重点
setnx 来获取锁
redis__31">boot+redis 基础案例
使用场景
@Service
@Slf4j
public class InventoryService {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Value("${server.port}")private String port;private Lock lock = new ReentrantLock();public String sale() {String retMessage = "";lock.lock();try {//1 查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//2 判断库存是否足够Integer inventoryNumber = result == null? 0 : Integer.parseInt(result);//3 扣除库存,每次减少一个if (inventoryNumber > 0) {inventoryNumber--;stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(inventoryNumber));retMessage = "成功卖出一个商品,剩余库存" + inventoryNumber;System.out.println(retMessage + ",端口号:" + port);}else {retMessage = "商品卖完了";}} finally {lock.unlock();}return retMessage;}
}
@RestController
@Api(tags = "redis 分布式锁测试")
public class InventoryController {@Autowiredprivate InventoryService inventoryService;@ApiOperation("扣除库存,一次卖一个")@GetMapping("/inventory/sale")public String sale() {return inventoryService.sale();}}
GET http://localhost:6000/inventory/sale
手写分布式锁分析
拷贝出来一份
nginx 负载均衡
upstream stock {
server 127.0.0.1:6001 weight=1;
server 127.0.0.1:6000 weight=1;
}server {
listen 9000;
server_name localhost;location / {
proxy_pass http://stock;
index index.html index.htm;
}
}
GET http://localhost:9000/inventory/sale
jmeter
添加线程组 100*1
添加取样器-http请求
添加 listener-聚合报告
jmeter 100个线程1s执行完毕
执行完毕查看 库存数
为什么加了synchronized 和 lock 但是没有控制住
3.1 版本 setnx
public String sale() {String retMessage = "";String key = "zzyRedisLock";String uuidValue = IdUtil.simpleUUID() + Thread.currentThread().getId();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);if (!flag) {// 暂停20s,进行递归重试try {TimeUnit.MICROSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}sale();} else {// 抢锁成功的请求线程,进行正常的业务逻辑操作,扣减库存try {//1 查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//2 判断库存是否足够Integer inventoryNumber = result == null? 0 : Integer.parseInt(result);//3 扣除库存,每次减少一个if (inventoryNumber > 0) {inventoryNumber--;stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(inventoryNumber));retMessage = "成功卖出一个商品,剩余库存" + inventoryNumber;System.out.println(retMessage + ",端口号:" + port);}else {retMessage = "商品卖完了";}} finally {stringRedisTemplate.delete(key);}}return retMessage + ",端口号:" + port;}
3.2 自旋代替递归重试 while替代if
public String sale() {String retMessage = "";String key = "zzyRedisLock";String uuidValue = IdUtil.simpleUUID() + Thread.currentThread().getId();// 不用递归了,高并发下使用自旋替代递归重试,使用whilewhile (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) {try {TimeUnit.MICROSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}}// 抢锁成功的请求线程,进行正常的业务逻辑操作,扣减库存try {//1 查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//2 判断库存是否足够Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);//3 扣除库存,每次减少一个if (inventoryNumber > 0) {inventoryNumber--;stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(inventoryNumber));retMessage = "成功卖出一个商品,剩余库存" + inventoryNumber;System.out.println(retMessage + ",端口号:" + port);} else {retMessage = "商品卖完了";}} finally {stringRedisTemplate.delete(key);}return retMessage + ",端口号:" + port;}
部署了微服务的Java程序挂了,代码层面根本没有走到 finally 这块,没办法保证解锁(无过期时间该key一直存在),这个key没有被删除,需要加入一个过期时间限定 key
4.1 setnx 添加过期时间
4.2 加锁和过期时间必须在同一行,保证原子性
5.0 防止误删key
锁超时误删除
6.0 lua 脚本实现删除时判断和删除操作的原子性
Distributed Locks with Redis
介绍 lua 脚本
调用 lua 脚本,教程
- 实现 hello lua
注意 0 作为唤醒词
- 实现 set expire get
redis.call 用来调用命令,最后一个需要返回使用 return
- mset 重在掌握 lua脚本中 args传参的用法
2 -> key argv 数量
k1 k2 -> key1 key2 前两个 key,后面都是 argv
lua1 lua2 -> argv1 argv2
- 官网lua脚本 如何执行呢
if redis.call("get",KEYS[1]) == ARGV[1] thenreturn redis.call("del",KEYS[1])
elsereturn 0
end
补充说明 if elseif else
// 改进点,修改为 Lua 脚本的 redis 分布式锁调用,必须保证原子性,参考官网脚本案例
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +"return redis.call('del',KEYS[1]) " +"else " +"return 0 " +"end";
stringRedisTemplate.execute(new DefaultRedisScript(luaScript, Boolean.class), Arrays.asList(key), uuidValue);
new DefaultRedisScript(luaScript, Boolean.class)
->Boolean.class
指定返回值,不指定会报错
7.0 可重入 + 设计模式
juc 中可重入的解释
可重入锁的解释
juc 中可重入锁的案例演示(同步代码块,同步方法,Lock重点)
同步代码块
同步方法
注意使用 Lock 的时候,重入的时候加锁解锁次数要匹配,解锁次数不足则会导致锁仍然被占用,记得这里有个计数器
redis__aqs__343">redis 中如何实现 aqs 中可重入规范
需要一个计数器,需要 kkv 这种结构才能满足 -> hset
实现思路分析
lua 脚本分析
加锁Lock
- 判断redis分布式锁key是否存在 EXISTS key
- 不存在,hset新建当前线程属于自己的锁 uuid:ThreadId
- 存在,说明已经有锁 HEXISTS key uuid:ThreadId
- 不是自己的
- 是自己的,自增一次
-- 加锁的lua的脚本,对标我们的lock方法
-- 加锁 v1
if redis.call('exists', 'key') == 0 thenredis.call('hset', 'key', 'uuid:threadid', 1)redis.call('expire', 'key', '50')return 1
elseif redis.call('hexists', 'key', 'uuid:threadid') == 1 thenredis.call('hincrby', 'key', 'uuid:threadid', 1)redis.call('expire', 'key', '50')return 1
elsereturn 0
end-- v2 合并相同的代码,用incrby替代hset 简化代码
if redis.call('exists', 'key') == 0 or redis.call('hexists', 'key', 'uuid:threadid') == 1 thenredis.call('hincrby', 'key', 'uuid:threadid', 1)redis.call('expire', 'key', '50')return 1
elsereturn 0
end-- v3 脚本ok了, 替换参数
if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 thenredis.call('hincrby', KEY[1], ARGV[1], 1)redis.call('expire', KEY[1], ARGV[2])return 1
elsereturn 0
end-- 合并到一行
if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEY[1], ARGV[1], 1) redis.call('expire', KEY[1], ARGV[2]) return 1 else return 0 end
加锁 unLock
设计思路:解锁,还得是自己的锁
-- v1 解锁
if redis.call('hexists', 'key', 'uuid:threadid') == 0 thenreturn nil
elseif redis.call('hincrby', 'key', 'uuid:threadid', -1) == 0 thenreturn redis.call('del', 'key')
elsereturn 0
end
-- v2 替换下参数
if redis.call('hexists', KEYS[1], ARGV[1]) == 0 thenreturn nil
elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 thenreturn redis.call('del', KEYS[1])
elsereturn 0
end
给出测试代码
加锁四次,解锁四次
EVAL "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end" 1 zzyyRedisLock 001:1 50
EVAL "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end" 1 zzyyRedisLock 001:1 50
EVAL "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end" 1 zzyyRedisLock 001:1 50
EVAL "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end" 1 zzyyRedisLock 001:1 50
ttl zzyyRedisLock
HGET zzyyRedisLock 001:1
EVAL "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then return redis.call('del', KEYS[1]) else return 0 end" 1 zzyyRedisLock 001:1
EVAL "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then return redis.call('del', KEYS[1]) else return 0 end" 1 zzyyRedisLock 001:1
EVAL "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then return redis.call('del', KEYS[1]) else return 0 end" 1 zzyyRedisLock 001:1
EVAL "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then return redis.call('del', KEYS[1]) else return 0 end" 1 zzyyRedisLock 001:1
lua脚本整合进程序->实现 Lock 接口
我们创建一个 Lock 实现类,重写lock(重载到 tryLock 中最终实现),unlock方法
@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time == -1L) {String luaScript = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +" redis.call('expire', KEYS[1], ARGV[2]) " +" return 1 " +"else" +" return 0 " +"end";while (!(Boolean) stringRedisTemplate.execute(new DefaultRedisScript(luaScript, Boolean.class),Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))){try {Thread.sleep(60);}catch (InterruptedException e) {e.printStackTrace();}}return true;}return false;}@Overridepublic void unlock() {String luaScript = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +" return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";// nil = false 1 = true 0 = false// 这里返回值不方便使用 BooleanLong flag = (Long) stringRedisTemplate.execute(new DefaultRedisScript(luaScript, Long.class),Arrays.asList(lockName), uuidValue);if (null == flag) {throw new RuntimeException("锁不存在");}}
7.1 工厂模式优化,方便整合其他锁实现
锁的实现写死了,以后如果引入别的类型的锁,zookeeper 等
@Component
public class DistributedLockFactory {@Autowiredprivate StringRedisTemplate stringRedisTemplate;private String lockName;public Lock getDistributedLock(String lockType) {if (lockType == null) {return null;}if (lockType.equalsIgnoreCase("REDIS")) {this.lockName = "zzyRedisLock";return new RedisDistributedLock(stringRedisTemplate, lockName);}else if (lockType.equalsIgnoreCase("ZOOKEEPER")) {this.lockName = "zzyZookeeperLock";// TODO zookeeper lockreturn null;} else if (lockType.equalsIgnoreCase("MYSQL")) {// TODO mysql lock}return null;}}
7.2 可重入测试重点->解决uuid不一致,最终实现可重入
解决重入后 uuid 不一致问题
uuid 直接从工厂获取
这块说的比较碎,结合起来理解下
8.0 自动实现锁续期
CAP
Redis 可能存在异步复制导致的锁丢失,主节点还没来得及把刚刚set进来的这条数据同步到从节点,master就挂了,从机上位,但从机上无该数据
zookeeper 只有整个同步全部成功,才会注册成功,速度会稍慢。主节点挂了,只有选举出新老大后,才可用
euraka
加钟的 lua
在拿到锁的同时添加一个后台程序,为ttl 的 1/3
续期后再调用自己添加 监视程序,保证能够一直续期
if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then return redis.call('expire', KEYS[1], ARGV[1])
elsereturn 0
end
代码中实现->加一个延时定时器任务
递归调用,3/1 时间间隔,如果锁仍然持有的话
比如 expireTime = 30,第20秒触发,继续续到 30
// tryLock 返回之前
// 新建一个后台扫描程序,来监视key目前的ttl,是否到我们规定的 1/2 1/3 来实现续期
resetExpire();public void resetExpire() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +"return redis.call('expire', KEYS[1], ARGV[2]) " +"else " +"return 0 " +"end";new Timer().schedule(new TimerTask() {@Overridepublic void run() {if ((boolean) stringRedisTemplate.execute(new DefaultRedisScript(script, Boolean.class),Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) {resetExpire();}}}, (this.expireTime * 1000) / 3);}
小总结&如果让你自研一把分布式锁,你要怎么做
- 按照 juc Lock 接口规范编写
- lock 加锁关键逻辑
- 加锁
- 给key设置一个值,为避免死锁,并给定一个过期时间
- 自旋
- 续期
- 加锁
- unlock 解锁关键逻辑
- 将 key 键删除,但也不能乱删,不能删除其他客户端的锁
redisson_614">redlock与 redisson
来由->我们手写的分布式锁有什么缺点
redis 单机故障
RedLock算法设计理念
容错公式
v9.0
Getting Started - Redisson Reference Guide
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.20.1</version>
</dependency>
@Bean
public Redisson redisson() {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);return (Redisson) Redisson.create(config);
}RLock lock = redisson.getLock(key);
看门狗,续期
v9.1
Redisson 源码分析
多机案例->MultiLock 创建联锁
但 RedLock 被标记弃用
将多个 RLock 对象关联为一个联锁对象,只有所有节点加锁成功,联锁才成功
实操
我们创建三个 RedisClient,然后创建一个联锁即可