引言
在03篇中我们讲解了lua脚本的使用(Ps:如果你没跳过的话),目前我们通过api也好,lua脚本也好,已经基于redis实现了分布式锁,但看似完美的它其实还存在一些细节问题,本章节将会带领大家去探索这些细节并完善我们的分布式锁,并且在本章中还会为大家讲解一下“红锁”的算法原理。那么话不多说,开始我们今天的学习。
锁续期问题
先来回顾一下: 我们前面为了解决锁因异常情况(例如执行完加锁逻辑服务宕机了)未执行到释放,从而造成锁一直被占用的情况。而为了解决这个问题,我们给每个锁加上了过期时间,但是这又引申出了新的问题:如果锁到期了,而业务还没执行完,此时就给释放了,锁又被新的线程拿到了,那么就又会产生并发问题了。所以,我们是不希望锁在一定时间后自动过期掉的。那么,为了解决这个问题,我们应该在线程拿到锁后一直延长过期时间,直到业务执行完成后才释放这把锁。我们分析下可以怎么做:
1.单独起个服务来处理
我们可以单独起个服务来负责为锁续期,但是这有什么问题呢?如果加锁的服务挂掉了,这个独立的客户端如何感知,如果感知不到,就会一直给锁续期。
2.获取锁进程自己续期
我们还可以获取当前进程来进行锁续期,这样做就算锁挂掉了,续期的进程也随之结束了。进程A自己如果去实现,它需要一边执行业务逻辑,一边又要进行锁续期,那么我们单独起个线程去做这件事就很合适了。
3.异步线程解决锁续期
首先,我们需要编写自动续期的Lua脚本,如下:
if (redis.call('HEXISTS', KEYS[1], ARGV[1])) thenreturn 0;
elseredis.call('PEXPIRE', KEYS[1], ARGV[2])return 1;
end
这个脚本的逻辑很简单,相信各位同学已经明白了。就是先获取锁,获取成功之后则重新设置过期时间。我们来写下代码,如下:
@Overridepublic void lock(TimeUnit timeUnit, Long expireTime) {// 设置锁的过期时间this.expireTime = expireTime;// 循环尝试获取锁while (true) {// 检查并设置分布式锁// 1. 如果锁不存在,创建锁并设置过期时间// 2. 如果当前线程已持有锁,则重入并更新过期时间// 3. 如果其他线程持有锁,返回0表示获取失败String luaScript = "if(redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('pexpire', KEYS[1], ARGV[2]) return 1; end if (redis.call('hexists',KEYS[1], ARGV[1]) == 1) then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('pexpire', KEYS[1], ARGV[2]) return 1; else return 0; end";// 执行Lua脚本Long result = stringRedisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class), Collections.singletonList(this.lockName),uuid,expireTime.toString());// 获取锁成功if (result != null && result.equals(1L)) {// 启动守护线程,定期延长锁的过期时间new Thread(() -> {while (true) {// Lua脚本:检查并延长锁的过期时间// 如果锁仍然存在且被当前线程持有,则延长过期时间String expireLua = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return 0; else redis.call('pexpire', KEYS[1], ARGV[2]) return 1; end";Long expireResult = stringRedisTemplate.execute(new DefaultRedisScript<>(expireLua, Long.class), Collections.singletonList(this.lockName),uuid,expireTime.toString());// 如果锁不存在或已经不属于当前线程,退出守护线程if (expireResult == null || expireResult.equals(0L)) {break;}try {// 休眠时间为过期时间的一半,定期唤醒执行延期Thread.sleep(expireTime / 2);} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();break;}try {// 获取锁失败,等待50ms后重试Thread.sleep(50);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
这里把上章中的可重入Lua脚本替换我们的加锁逻辑,并通过另一个线程来不断地为锁进行续期,这里我们借鉴了Redisson的看门狗机制,在后续的章节中也会讲解到,这里关注我们的实现逻辑即可。
锁阻塞问题
看下我们上面的实现:如果获取锁失败,就会在睡50ms继续轮询,直到获取锁成功为止。这个实现相当于把锁阻塞住了,在某些业务场景下,我们其实需要在某个时间内,如果获取锁失败,就放弃,需要重新请求,比如秒杀抢购就是这样的。所以我们这里就需要在原本的基础上添加一个锁获取的超时时间,以此来解决锁阻塞问题,代码如下:
public boolean tryLock(long time, long expireTime, TimeUnit unit) throws InterruptedException {// 记录开始尝试获取锁的时间戳long startTime = System.currentTimeMillis();// 记录当前时间戳long currentTime = System.currentTimeMillis();boolean lockResult = false;// 在指定的等待时间内循环尝试获取锁// time表示最大等待时间,超过这个时间还未获得锁就返回falsewhile (currentTime - startTime <= time) {// 尝试获取锁boolean result = tryLockInternal(unit, expireTime);if (result) {// 获取锁成功,记录结果并退出循环lockResult = result;break;}// 更新当前时间戳currentTime = System.currentTimeMillis();}return lockResult;
}
那么在扣减库存的业务代码那里也需要加上这个tryLock的逻辑,代码如下:
public String deductStockRedisLock(Long goodsId, Integer count) {AbstractLock lock = null;try {// 创建基于Redis的分布式锁,锁的key为"lock"+商品IDlock = new RedisLock(template, "lock" + goodsId);// 尝试在5秒内获取锁boolean result = lock.tryLock(5000, TimeUnit.MILLISECONDS);if (result) {// 获取锁成功,执行库存扣减逻辑// 1. 查询商品库存数量String stock = template.opsForValue().get("stock" + goodsId);if (StringUtil.isNullOrEmpty(stock)) {return "商品不存在";}Integer lastStock = Integer.parseInt(stock);// 2. 判断库存是否充足if (lastStock < count) {return "库存不足";}// 3. 扣减库存template.opsForValue().set("stock" + goodsId, String.valueOf(lastStock - count));return "库存扣减成功";}// 获取锁超时的处理System.out.println("获取锁超时");return "系统繁忙";} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// 确保在finally块中释放锁if (lock != null) {lock.unlock();}}
}
这里的Lock和tryLock的使用,大家根据自己的业务需要选择性使用即可。
RedLock分布式锁算法原理
此部分为加餐环节,在实际业务中并不推荐大家使用这套算法,这里只是做下知识扩展。那么先来看下我们目前通过redis如何实现加锁的,如下:
客户端发起请求到redis通过hset key指令来设置锁,看起来似乎没什么问题,但是这是建立在redis单机部署的情况下,如果redis挂了呢?那么就获取不到锁,进而导致后续的业务逻辑无法执行,那么对整个业务的影响非常大,假设此时是某个电商项目的下单接口出现这种问题,那么带来的资损将会无法估量。那么这个问题可以解决吗?当然可以,redis是支持主从模式的,如下:
通过主从模式来部署redis,master在执行命令后会通过异步线程向slave中同步数据,而如果master挂掉了,通过内部的哨兵机制也可以将一个slave选择成为新的master,从而保证redis的高可用。那么这样是否就没问题了呢?当然不是,新的master产生后,后续的请求都会到新的master上,如下:
所以我们考虑一下这种情况:来自客户端的请求首先到旧的master上成功执行了hset key命令,此时已经成功加锁,但是它还没来得及向从节点同步数据就挂掉了,新的master并没有同步数据,这个时候又来个新的客户端进行请求,执行了同样的命令加锁,最终导致两个客户端其实加了同一把锁,如下:
针对这个问题,redis的作者提出了著名的红锁算法来解决,如下:
但需要注意的是:RedLock算法在业务中是并不推荐使用的,本处这里只做知识扩展,不推荐使用的原因有很多,比如:
- 实例均需独立部署,实现成本高;
在实际业务中,我们根据业务规模通常要么单机部署,要么集群部署,现在为了实现一个分布式锁,独立部署多个redis,整体成本直线上升。
- 实现复杂,整体加锁效率有所降低;
过去加锁只需要一个hset指令就可以完成,现在有多少个redis实例,就得按序执行多少次。另外,前面我们说到分布式锁需要具备可重入性,假如此时加锁执行两次,但第二次的时候执行到某个实例失败了,也就是其他实例都加了两次,某个节点只加上了一次,后面释放一次的时候,其他节点锁的个数都为1,那个为0,这个时候就会触发向所有redis实例释放锁的请求,进而导致所有锁被释放,这又是新的问题。
- 时钟漂移造成加锁冲突
我们在设置锁的时候会设置过期时间,但如果由于某种原因(Ps:比如运维同学调整了时间、服务器时间同步等等)导致其中一个实例的服务器的时间发生变化进而导致锁的过期释放,进而导致多个客户端都获取锁成功,从而造成并发问题。
小结
至此,redis手撸分布式锁章节就完结了,在一些并发量相对不高的业务场景如果有分布式锁的需求,我们简单手撸下是可以支持的。但我们需要更成熟可靠的方案,因此接下来我们将开启基于Redisson框架实现分布式锁的章节,会给大家带来更为简单的实现方式,那么相对应简单的使用下必然是底层复杂的封装,所以也会对相关的源码进行讲解,敬请期待下吧!