1.分布式锁简介
简单来说,分布式锁是针对集群环境下多台机器竞争公共资源提出的方案。
单机环境下,线程共享堆内存,jdk提供了同步机制来应对资源竞争,比如synchronized关键字,AQS队列同步器等,只要我们设置内存标记,并且这个标记具有原子性和可见性,这样多线程环境下通过标记实现资源的同步操作,这个标记可以理解为锁的实现。所以我们通常会采取synchronized标记方法或代码块,或者RetreenLock去做互斥。
集群环境下,基于内存的锁机制只对单个节点有效,无法扩展,即节点1只能对自己做多线程同步,无法对节点2做限制,因为这时候已经是多进程了。那就需要一把公共的锁,由第三方实现,对所有节点的所有线程进行并发控制,所以这个锁也必须具有原子性和可见性。
分布式锁的设计应该是:
- 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器-上的一个线程执行。
- 这把锁要是一把可重入锁(避免死锁)
- 这把锁最好是一把阻塞锁(根据业务需求考虑)
- 这把锁最好是一把公平锁(根据业务需求考虑)
- 有高可用的获取锁和释放锁功能
- 获取锁和释放锁的性能要好
2.分布式锁实现
主要有三种,基于数据库、基于Redis、基于Zookeeper,本文介绍前两种;
2.1 基于数据库
-
设置唯一主键或字段,通过insert操作实现,这种操作具有幂等性,所以可以保证同一时间只能有一条记录插入,方法执行完后delete这条记录;
-
乐观锁和悲观锁:乐观锁就是加版本号,通过修改标记字段,判断是否获取锁成功;悲观锁就是用for update给记录加排它锁,锁住这条记录就可以保证同一时间只能由一条线程操作;
注:由于性能问题,基于数据库的方案基本不会被采用;
2.2 基于Redis
由于Redis是单进程单线程,IO多路复用,所以线程安全问题和性能问题不用去做过多考虑和设计;
介绍Redis的几个原子操作:
- setnx(key, value):如果 key 不存在,则设置当前 key 成功,返回 1;如果当前 key 已经存在,则设置当前 key 失败,返回 0。
- getSet(key, value):设置新值并返回旧值。如果key不存在,把值设置成value并返回null;如果key存在,把值设置成value并返回之前存在的值。
- get(key):返回当前key对应的value。
下面是Springboot使用Redis的逻辑代码:
2.2.1.设计一个RedisLock类
RedisLock封装对Redis的操作,重点在tryLock方法,流程如下:
- 所有线程进入尝试获取锁,执行setNX方法,value为每个线程获取的当前时间+超时时间,执行成功则拿到锁,执行业务逻辑;执行失败,代表锁已经被占用,进行下一步判断;
- 为了防止超时产生死锁,要进行超时判断,如果拿到锁的线程崩溃了,后面的线程通过判断超时后,强制抢占锁;首先调用get()方法获取Redis里被setNX的值currentValue ,和当前时间比较,如果小于当前时间,代表已经超时,开始竞争锁资源;如果大于当前时间,代表未超时,进行下一次尝试;
- 在上一步判断超时后,调用getSet()方法进行替换并得到旧值oldValue,由于这个时候多条线程都在调用这个方法,但是只有最快的线程能拿到过期值currentValue ,判断oldValue是否等于currentValue,相等则代表最先拿到并且替换掉过期锁(当然大家都在getSet,当前的值也会很快被其他线程替换掉,即拿到锁,value也已经不是自己的原本的value,但是产生的误差可以忽略),如果不相等,则代表来晚了,锁已经被抢了,进行下一次尝试;
- 返回tryLock结果;
public class RedisLock {/*** 锁默认超时时间60s*/private static final long DEFAULT_EXPIRE_TIME = 60 * 1000;private long expireTime;private String lockKey;private volatile boolean locked = false;private RedisTemplate<String, String> redisTemplate;public RedisLock(RedisTemplate<String, String> redisTemplate, String lockKey) {this.redisTemplate = redisTemplate;this.lockKey = lockKey;this.expireTime = DEFAULT_EXPIRE_TIME;}public RedisLock(RedisTemplate<String, String> redisTemplate, String lockKey, long expireTime) {this(redisTemplate, lockKey);this.expireTime = expireTime;}public String get(String key) {return redisTemplate.opsForValue().get(key);}public void set(String key, String value) {redisTemplate.opsForValue().set(key, value);}private boolean setNX(String key, String value) {return redisTemplate.opsForValue().setIfAbsent(key, value);}private String getSet(String key, String value) {return redisTemplate.opsForValue().getAndSet(key, value);}public boolean tryLock() {try {//模拟每条线程尝试三次int i = 3;while (i > 0) {String value = String.valueOf(System.currentTimeMillis() + expireTime);if (this.setNX(lockKey, value)) {// 获得锁成功并返回locked = true;return true;}/* 如果获取失败,下面判断是否超时 */String currentValue = this.get(lockKey);// 如果从redis取出的值小于当前时间,代表已经超时if (currentValue != null && Long.valueOf(currentValue) < System.currentTimeMillis()) {String oldValue = this.getSet(lockKey, value);if (oldValue != null && oldValue.equals(currentValue)) {// 这种情况存在于,多个线程同步getSet后,只有最快的线程能拿到过期值currentValue,但是自己set的值可能很快被覆盖,这里忽略相差的时间值locked = true;return true;}}i --;//这里可以设置随机等待时间后再尝试try {Thread.sleep(new Random().nextInt(100));} catch (InterruptedException e) {e.printStackTrace();}}return false;} catch (Throwable e) {e.printStackTrace();return false;}}public void unlock() {if (locked) {redisTemplate.delete(lockKey);locked = false;}}}
2.2.2.服务层业务调用
拿到锁后执行业务代码,无论执行结果是什么,在finally里面释放锁,防止死锁。
@Service
public class BossService {@Autowiredprivate RedisTemplate redisTemplate;public boolean killBoss(String boss) {String name = Thread.currentThread().getName();RedisLock redisLock = new RedisLock(redisTemplate, "bossKey");if (redisLock.tryLock()) {try {String bossAmount = redisLock.get(boss);System.out.println("当前数量" + bossAmount);if (Integer.valueOf(bossAmount) > 0) {redisLock.set(boss, String.valueOf(Integer.valueOf(bossAmount) - 1));System.out.println(name + "成功杀死一个boss");} else {System.out.println("boos死完了");}return true;} catch (Exception e) {e.printStackTrace();return false;} finally {redisLock.unlock();}} else {System.out.println(name + "再试一下");return false;}}
}
2.2.3.单元测试
使用CountdownLatch计数器模拟多线程并发:调用await()方法阻塞当前线程,当计数完成后,唤醒所有线程并发执行;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DotACloudApplication.class)
public class BossServiceTest {@Autowiredprivate BossService bossService;ExecutorService exec = Executors.newCachedThreadPool();@Testpublic void killBossTest() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 100; i ++){Runnable runnable = () -> {try {countDownLatch.await();boolean b = bossService.killBoss("bank");} catch (InterruptedException e) {e.printStackTrace();}};exec.submit(runnable);}countDownLatch.countDown();Thread.sleep(120 * 1000);}@Afterpublic void after() {exec.shutdown();}}
这里有一个本菜鸡踩的坑,那就是线程池提交任务后,子线程任务还未执行完时,主线程就结束了(单元测试和main方法不一样,main是非守护线程,所以main结束了子线程可以继续运行)。导致我以为每次都有线程超时挂掉,因为执行结果不对,去Redis查看lockKey每次都有值(正常情况每次释放锁删除lockKey),解决办法就是让主线程睡一会,确保其他线程执行完,当然为了完整保证时序,可以再加一个CountdownLatch,count为线程数,每个线程执行完后count-1,所有线程执行完后主线程await后面输出一句话就可以了。
3.结论
网上使用Redis的基本上都这种方案,弊端也很明显,需要每个节点的系统时间一致,至少误差时间不能超过设置的超时时间,否则每次判断都超时,由于误差导致锁失效,那么并发操作就会出问题。
所以推荐使用redis官方推荐的Redisson,可以参考我另一篇文章:https://blog.csdn.net/unclecoco/article/details/99442998