目录
分布式锁
一:基本原理和实现方式
我们实现分布式锁有两个基本方法:
一个是获取锁
一个是删除锁
获取我们使用setnx,只有不存在才会set成功作为互斥锁;
删除直接删除这个锁的键就行了;
但是要考虑一种情况,就是获取锁之后服务宕机了,那么就无法释放锁,也就会出现死锁,服务挂了,为了预防这种情况的发生我们要设置过期时间,就算服务宕机,过了过期时间锁也会释放;
还有一种情况就是设置锁,还没去设置锁的过期时间这个时候服务就宕机了,那么也会出现死锁,我们要保证获取锁和设置过期时间这个操作的原子性,redis种就提供了方法可以同时设置过期时间和互斥锁:
set lock t1 nx ex 10
二:分布式锁的实现
我们去实现这个接口:
public class SimpleRedisLock implements ILock{//锁的名称private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}//锁的前缀是个常量private static final String KEY_VALUE ="lock:";/*** 获取锁* @param timeoutSec* @return*/@Overridepublic boolean trylock(long timeoutSec) {//获取当前线程id作为锁的valuelong id = Thread.currentThread().getId();//设值尝试获取锁,true是成功,false是失败Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(KEY_VALUE + name, id + "", timeoutSec, TimeUnit.SECONDS);//因为直接返回b的话jvm会自动装箱拆箱,可能会造成空指针异常,如果b为true返回就是true,如果b为false返回就是false,如果为空返回也是false;return Boolean.TRUE.equals(b);}/*** 释放锁*/@Overridepublic void unlock() {stringRedisTemplate.delete(KEY_VALUE + name);}
}
我们在一人一单的问题使用分布式锁:
Long id = UserHolder.getUser().getId();
SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order" + id, stringRedisTemplate);
boolean trylock = simpleRedisLock.trylock(1200);
if (!trylock){return Result.fail("一人只能下一单");
}
try {IVoucherOrderService orderService = (IVoucherOrderService) AopContext.currentProxy();return orderService.createVoucherOrder(voucherId);
} catch (IllegalStateException e) {throw new RuntimeException(e);
}finally {simpleRedisLock.unlock();
}
1:分布式锁的误删问题
存在的问题是这样的:
线程1去获取锁,但是呢,业务阻塞超时了,锁就自动释放了,这个时候线程2来了,因为锁释放了,线程2可以获取到锁,就在这时,线程1阻塞通过了,完成了业务之后就要释放锁了,这个时候释放的锁是线程2的锁,线程2还在执行,这个时候线程3也去获取锁成功了,那么就出现了线程2,3并发执行的情况,造成并发安全问题;
这里的问题是线程释放锁释放的不是自己的锁,所以我们解决办法就是在线程释放的时候判断一下是不是自己的锁,怎么判断呢,我们之前获取锁的时候存入的value是线程的id,这个就是线程的唯一标识,我们只需要判断当前线程id和存入的value是否一致就行;
2:解决误删问题
//锁的前缀是个常量
private static final String KEY_VALUE ="lock:";
//加上线程的前缀,保证不同集群不同线程的id是唯一
public static final String ID_VALUE= UUID.randomUUID().toString(true);/*** 获取锁* @param timeoutSec* @return*/
@Override
public boolean trylock(long timeoutSec) {//获取当前线程id作为锁的valueString id =ID_VALUE+ Thread.currentThread().getId();//设值尝试获取锁,true是成功,false是失败Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(KEY_VALUE + name, id + "", timeoutSec, TimeUnit.SECONDS);//因为直接返回b的话jvm会自动装箱拆箱,可能会造成空指针异常,如果b为true返回就是true,如果b为false返回就是false,如果为空返回也是false;return Boolean.TRUE.equals(b);
}
只要在锁获取的时候给线程加上唯一标识就行,我们原来用的是线程id,这样不能保证唯一,因为线程id是在同一个jvm内部是递增的,不同jvm的线程id可能相同,那么就需要加上前缀保证线程标识的唯一性,那么就可以使用uuid来保证每一台jvm的uuid是不同的,标识就是唯一的;
释放锁:
@Override
public void unlock() {if (stringRedisTemplate.opsForValue().get(KEY_VALUE + name).equals(ID_VALUE+ Thread.currentThread().getId())){stringRedisTemplate.delete(KEY_VALUE + name);}
}
判断锁中的val和当前线程的标识是否一致;
这样就解决了误删的问题;
但是还有一个问题:
假设一个线程1获取锁之后执行业务,执行完之后,要释放锁,判断了当前锁是否是自己的(锁的value),在要执行删除操作的时候堵塞了,锁超时释放了,这个时候线程2获取锁,执行业务,就在这时,线程1阻塞结束,执行的代码在释放锁上,因为同一个业务锁的key都是一样的,所以线程一能够把锁释放,这时线程2还没执行结束,线程3来了,就出现了线程安全问题;
所以说我们要将判断锁和释放锁作为一个原子性事件,要么同时发生要么同时失败
这就要用lua了
三:lua脚本解决多条命令原子性问题
--获取当前key
local key =KEY[1]
--获取当前线程标识
local id =ARGV[1]--获取锁中的标识
local _id=redis.call('get',key)
--判断是否是同一个标识:
if(id==_id) thenreturn redis.call("del",key)
end
return 0
在idea中我们执行lua
脚本我们可以在resouce中创建一个lua文件:
要安装插件才行:
插件名叫emmylua;
调用lua脚本
//声明一个DefaultRedisScript,原来调取lua脚本
public static final DefaultRedisScript UNLOCK_LUA;
static {//初始化UNLOCK_LUA=new DefaultRedisScript<>();//定位到lua脚本的位置UNLOCK_LUA.setLocation(new ClassPathResource("unlock.lua"));//设置lua脚本的返回值UNLOCK_LUA.setResultType(Long.class);
}
@Override
public void unlock() {// Collections.singletonList创建单元素的集合stringRedisTemplate.execute(UNLOCK_LUA,Collections.singletonList(KEY_VALUE + name),(ID_VALUE+ Thread.currentThread().getId()));
}
这样将判断标识和释放锁的命令写在lua脚本中,就能够保证命令执行的原子性,就不会出现之前说的误删的情况
我们实现分布式锁的思路:
获取锁:使用setnx,互斥的特性,获取值相当于设置锁,使用uuid保证线程的唯一性;
释放锁:释放之前判断锁释放是当前线程的锁,并且将判断锁和释放锁的命令写在一个lua脚本中保证执行命令的原子性;
四:Redisson
redisson_254">1:redisson入门
步骤1:
引入依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.33.0</version>
</dependency>
步骤二:
配置客户端:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){Config config = new Config();config.useSingleServer().setAddress("redis://123.249.86.145:6379").setPassword("12345");return Redisson.create(config);}
}
redisson_290">2:redisson可重入锁的原理
可重入锁的逻辑:
获取锁:如果锁不存在就获取锁,然后设置过期时间,如果存在了,就判断锁的线程标识是否是自己,如果是自己,那么就让锁的value加一;
释放锁:释放锁的时候判断锁是否是自己的,如果不是自己的就返回空,如果是自己的就让value-1,然后再判断value是否大于0,如果大于0,就重置锁的时间,等于0就删除锁
3:解决尝试等待,超时释放的问题(看门狗)
redisson内部是如何解决重复尝试的呢?尝试解读一下:首先线程会尝试获取锁,获取锁成功就返回空,获取锁失败就返回ttl,也就是锁的超时释放时间,我们重复尝试肯定是建立在获取锁失败的基础上,当返回的是ttl时,我们会判断当前时间减去我们尝试获取锁的时刻的时间,也就是尝试获取锁使用的时间,是否大于我们设置的等待时间,如果大于我们设置的等待时间,那么就尝试获取锁失败,就返回false,如果还有时间,我们不是直接再去获取锁,而是通过订阅机制,我们等待释放锁的信号,如果等待时间超时就直接返回false,如果没抄时,我们在重新尝试锁,失败在判断是否还有时间然后定义然后重试,就这么循环下去,要么拿到锁,要么超过等待时间获取锁失败;这样就解决我们获取锁失败一次就返回失败,可以一直尝试;
然后再来讲一下超时释放问题,也就是过了设置的超时时间,业务还没执行完,锁就释放了,出现了并发安全问题,怎么解决的呢,我们如果没有设置这个锁的超时释放时间,那么redisson就会自动给我们的超时释放时间设置为-1,这个时候才会开启看门狗机制,具体是什么情况呢,就是内部会开启一个任务,这个任务会一直刷新锁的超时时间,一直无限的刷新,每10秒刷新一次,刷新一次延长30秒,也就会一直存在这个锁,知道完成任务释放锁才会取消这个看门狗任务;
那么释放锁的时候如果释放失败就返回异常,释放成功就要做两件事,一个是发送释放锁的信息给正在订阅尝试获取锁的,还有一件事就是将看门狗任务终止;
4:主从一致性问题
主从一致性问题:在redis集群中有主从关系,一个主节点有多个从节点,当主节点宕机时,会选一个从节点作为主节点;
主节点和从节点为了数据一样会做主从同步操作;有一个问题就是当java应用向主节点发送请求获取锁,获取锁成功了,但是在主从同步还没完成的时候,主节点宕机了,这个时候哨兵就会发现宕机,然后从从节点中选出一个作为主节点,而因为数据未同步,新的主节点中没有锁,也就是锁失效了,那么其他线程发送请求就能获取锁,这就是锁失效的问题
如何解决呢:我们不设置主从关系,只设置节点,每个节点都相当与主节点,而且获取锁的时候,必须要所有的节点都获取锁成功才算获取锁成功,如果有一个节点宕机了,他的从节点成为了新的节点,这个时候有线程来获取锁也是不成功的,因为要在所有节点中获取锁,而其他节点已经持有锁了;这就解决了主从一致性问题;
,获取锁成功了,但是在主从同步还没完成的时候,主节点宕机了,这个时候哨兵就会发现宕机,然后从从节点中选出一个作为主节点,而因为数据未同步,新的主节点中没有锁,也就是锁失效了,那么其他线程发送请求就能获取锁,这就是锁失效的问题
[外链图片转存中…(img-1BgOGPaG-1730426881642)]
如何解决呢:我们不设置主从关系,只设置节点,每个节点都相当与主节点,而且获取锁的时候,必须要所有的节点都获取锁成功才算获取锁成功,如果有一个节点宕机了,他的从节点成为了新的节点,这个时候有线程来获取锁也是不成功的,因为要在所有节点中获取锁,而其他节点已经持有锁了;这就解决了主从一致性问题;