文章目录
- 简单的分布式锁实现流程
- Lua脚本介绍
- Redisson实现分布式锁原理
- 基本使用
- 原理
- 首先是lock加锁逻辑
- 锁续命逻辑
- 自旋重试逻辑
- 释放锁唤醒其他阻塞线程逻辑
- RedLock红锁
- 介绍与基本使用
- 问题
- 分布式锁性能提升
简单的分布式锁实现流程
最初的版本,使用setnx命令加锁,判断加锁是否成功。–> 执行业务代码 —> 释放锁
问题: 业务代码出异常了就没有释放锁
**优化:**使用try{}finally{}包起来释放锁
**问题:**执行业务代码时服务器宕机了,锁就不会释放了
优化: 加过期时间,和setnx一起,保证操作的原子性
**问题:**业务代码执行耗时超过了锁过期时间,其他进程加锁了,前一个进程业务代码执行释放锁时把其他进程加的锁给释放掉了
**优化:**生成唯一id放value中,释放锁时判断是否相等
**问题:**校验value是否相等与释放锁不是原子性的,可能会出现高并发问题
**优化: ** 锁续命 + lua脚本保证校验value是否相等与释放锁的原子性
Lua脚本介绍
Redis2.6推出脚本功能
使用脚本的好处:
- 减少网络开销,可以一次执行多条命令
- 原子操作,保证了多条命令的原子性
- 代替redis事务功能,redis事务一般不用,官方推荐如果要使用redis的事务功能可以用redis lua替代。
在redis-cli中可以使用EVAL命令对lua脚本进行求值,EVAL命令格式如下:
EVAL script numbers key [key...] arg [arg...]
- script是一断lua脚本
- numbers的指定之后的多个参数,其中前面多少个是Key
- Key 从第三个参数开始算起,表示脚本中用到的哪些Key,这些Key是通过全局变量KEYS数组,用1为基数的访问形式KEYS[1]、KEYS[2]… …
- arg,这些不是键名参数的附加参数,可以用全局变量ARGV数组访问,ARGV[1],ARGV[2]… …
案例:
127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 arg1 arg2
1) "key1"
2) "key2"
3) "arg1"
4) "arg2"
java代码案例
// 一个扣减库存的操作,把剩余库存和要减的数量先变为能比较的数字型,然后在进行比较和减法操作jedis.set("product_stock_10016", "15"); //初始化商品10016的库存
String script = " local count = redis.call('get', KEYS[1]) " +" local a = tonumber(count) " +" local b = tonumber(ARGV[1]) " +" if a >= b then " +" redis.call('set', KEYS[1], a-b) " +" return 1 " +" end " +" return 0 ";
Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), Arrays.asList("10"));
System.out.println(obj);
Redisson实现分布式锁原理
基本使用
引入依赖
<!--使用redisson作为分布式锁-->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.6.5</version>
</dependency>
配置Redisson
@Configuration
public class RedissonConfig {@Beanpublic Redisson redissonClient() {// 创建配置 指定redis地址及节点信息// 我们要在地址前加上redis:// ,SSL连接则需要加上rediss://Config config = new Config();config.useSingleServer().setAddress("redis://82.156.9.191:6379").setPassword("XXX");return (Redisson) Redisson.create(config);}}
业务代码测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedissonTest {@Autowiredprivate Redisson redisson;@Testpublic void redisson() {String myLock = "my_lock";// 1.获取一把锁,只要锁的名字一样,就是同一把锁RLock lock = redisson.getLock(myLock);// 加锁lock.lock();try {System.out.println("加锁成功,执行业务代码..." + Thread.currentThread().getId());Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();} finally {// 释放锁lock.unlock();}}
}
原理
Redisson的核心流程图如下图所示
刚开始会有两个线程去调用lock()
方法加锁,但是只会有一个线程加锁成功,如果线程1加锁成功了那么就会另外开启一个线程,默认每隔10s去检查锁是否还存在,如果还存在则重新设置锁过期时间为30秒。默认锁的过期时间是30秒,看门狗间隔时间是 key过期时间的1/3
如果线程2没有加锁成功,那么它会进行自旋,阻塞一段时间不断去重试获取锁
当线程1执行完后,调用unlock()
方法释放锁后,会唤醒其他正在等待锁的线程。
首先是lock加锁逻辑
接下来点进tryAcquire()
方法,再会进入到tryAcquireAsync()
—> tryLockInnerAsync()
而tryLockInnerAsync()
方法的代码如下,其实就是使用的lua脚本去加锁,
第一段if是判断锁对象是否存在,如果=0就表示不存在,然后就使用hset存一个值
ARGV[2]
也就是 getLockName(threadId)
就是一个uuid+线程id。接下来再指定过期时间
第二段if就是可重入锁的逻辑,给hset
最后一个参数加1
最后一行就表示没有加锁成功,把当前锁的过期时间返回
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
锁续命逻辑
当锁添加成功之后才会有锁续命的逻辑,当上面的tryLockInnerAsync()
方法尝试加锁之后,方法的返回值是Future对象,然后这里会添加一个监听器,当tryLockInnerAsync()
方法执行完有返回之后,如果加锁成功则返回null,加锁失败就返回锁过期时间,所以最终就会调用到scheduleExpirationRenewal
方法中去进行锁续命逻辑。
详细的scheduleExpirationRenewal()
代码如下
核心思想是首先等一段时间,延迟执行TimerTask类的run()方法,等待的时间是key过期时间的三分之一,默认是10s。
在run()方法中重新执行lua脚本为key设置默认30s的过期时间。
然后再递归调用自己scheduleExpirationRenewal()
,然后又等一段时间执行run()方法
自旋重试逻辑
从加锁逻辑中我们可以知道,如果某个线程调用tryLockInnerAsync()
方法没有加锁成功,那么返回的是这个锁的过期时间,那么接下来也就回到了加锁部分的第一张图中了
如果加锁成功是返回null,如果加锁没成功是返回的锁过期时间,所以这里接下来就是一个while(true)死循环,不断尝试获取锁
try {while (true) {// 每一次都去尝试加锁ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// getEntry(threadId).getLatch()获取的是一个信号量对象,这个信号量对象在下面 释放锁唤醒其他阻塞线程 中会出现// tryAcquire()就是阻塞方法,会阻塞ttl时间if (ttl >= 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}
} finally {unsubscribe(future, threadId);
}
释放锁唤醒其他阻塞线程逻辑
实际上使用的是Redis的发布订阅功能来实现的,首先是在加锁的业务逻辑中,如果加锁失败了 则去订阅一个channel
进入到subscribe()
方法中就能发现实际上是调用了getChannelName()
方法得到一个ChannelName,并订阅它
protected RFuture<RedissonLockEntry> subscribe(long threadId) {return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}// 也就是说实际上所有加锁失败的线程都会订阅 redisson_lock__channel 名字的channel
String getChannelName() {return prefixName("redisson_lock__channel", getName());
}
接下来再就是解锁的逻辑
unlock()
方法的业务逻辑如下图所示
我们接下来再进入到unlockInnerAsync()
方法中
我们可以知道只要释放锁了那么就会往redisson_lock__channel
名字的channel 中发送一个 0 的消息。
这里发布了一条消息,接下来就会订阅者这边的代码,走到onMessage()
方法中
RedLock红锁
介绍与基本使用
我们redis生产环境一般都是以集群的方式存在的,而Redis主从数据复制是异步的,那么就会有可能出现master节点加锁成功了,但是在数据同步给从节点之前宕机了,然后从节点重新选举出主节点,这个时候其他线程就又能加锁了。
Redisson中提供了一种红锁的机制来解决这种主从异步复制数据导致的问题,但是RedLock并没有完全解决,它还存在一些缺陷。
RedLock的核心思想是往多个redis节点中同时执行加锁setnx命令,这些节点互相独立存在,没有主从关系,如果超过半数的节点加锁成功才会认为本次加锁成功
基于这种实现原理我们就能发现客户端在进行加锁时效率是变低了,因为需要往多个节点发送命令并且等待执行结果返回;并且还牺牲了一些AP,保证了一些CP,因为多个节点中如果挂了一半,那么就永远加锁不成功了。
RedLock的基本使用
@RestController
public class IndexController {@Autowiredprivate Redisson redisson1;@Autowiredprivate Redisson redisson2;@Autowiredprivate Redisson redisson3;@RequestMapping("/redlock")public String redlock() {String lockKey = "product_001";//这里需要自己实例化不同redis实例的redisson客户端连接// 要往ioc容器中注册多个Redisson的bean对象,这些多个redis节点是独立存在的,没有主从关系RLock lock1 = redisson1.getLock(lockKey);RLock lock2 = redisson2.getLock(lockKey);RLock lock3 = redisson3.getLock(lockKey);/*** 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)*/RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {/*** waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)*/boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);if (res) {//成功获得锁,在这里处理业务}} catch (Exception e) {throw new RuntimeException("lock fail");} finally {//无论如何, 最后都要解锁redLock.unlock();}return "end";}}
问题
使用Redlock的一些问题:
-
这些多个redis节点,如果给他们各自也加一个slave节点,那么就有可能出现主从异步复制数据的问题,可以某个或多个master节点中加了lockKey,但是还没有同步给slave节点就宕机了,从节点变为主节点后这时它是没有lockKey的,就可能又会出现其他线程来加锁并超过半数节点加锁成功。
-
如果不给各个redis节点加Slave,那么如果挂了一半数量的节点,那么就永远不会加锁成功
-
如果多加一些redis节点,总不能挂那么多吧,但是影响加锁性能,加一次锁需要往这么多的节点发送命令,还有等待加锁成功超过半数的响应。我们使用Redis就是因为它的高性能,
-
其中还有持久化机制可能导致某个节点加锁丢失数据。假如使用aof持久化机制,一般我们采用的是每秒持久化一次。如果这个时候有三个节点,前两个加锁成功后一个加锁失败了,这个时候已经返回给客户端加锁成功,在这一秒内持久化前某个节点宕机了,然后又重启,那么这个时候三个节点中有两个节点没有lockKey。
分布式锁性能提升
分布式锁的本质是将多线程并行变为了串行,但是串行就有点违背高并发了。
对于并发要求较高的场景我们通过一些优化手段来提升分布式锁的效率
-
锁的粒度控制的越小越好,从业务功能上以及锁的代码段都是越少越好
-
考虑分段锁,比如扣减库存操作,库存有1000,我们之前就是一个lockKey来控制,我们可以进行拆分为10个lockKey,他们各自负责扣减100次。
但是这其中有很多细节性的问题需要考虑,比如客户端如何决定要使用哪一个lockKey、某个lockKey的库存减完后就不能再被客户端继续拿到使用、某个key库存只有1但是这次客户端要下单了5个,那么还需要使用下一个lockKey去减4…
-
读多写少的场景使用读写锁
-
对于类似于单例模式的双重检测机制这一类场景,可以使用tryLock()方法来指定一个最大的等待时长