基于Redis的分布式锁实现方案

news/2025/1/12 0:49:48/

1.分布式锁简介

简单来说,分布式锁是针对集群环境下多台机器竞争公共资源提出的方案。

单机环境下,线程共享堆内存,jdk提供了同步机制来应对资源竞争,比如synchronized关键字,AQS队列同步器等,只要我们设置内存标记,并且这个标记具有原子性和可见性,这样多线程环境下通过标记实现资源的同步操作,这个标记可以理解为锁的实现。所以我们通常会采取synchronized标记方法或代码块,或者RetreenLock去做互斥。

集群环境下,基于内存的锁机制只对单个节点有效,无法扩展,即节点1只能对自己做多线程同步,无法对节点2做限制,因为这时候已经是多进程了。那就需要一把公共的锁,由第三方实现,对所有节点的所有线程进行并发控制,所以这个锁也必须具有原子性和可见性。

分布式锁的设计应该是:

  1. 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器-上的一个线程执行。
  2. 这把锁要是一把可重入锁(避免死锁)
  3. 这把锁最好是一把阻塞锁(根据业务需求考虑)
  4. 这把锁最好是一把公平锁(根据业务需求考虑)
  5. 有高可用的获取锁和释放锁功能
  6. 获取锁和释放锁的性能要好

2.分布式锁实现

主要有三种,基于数据库、基于Redis、基于Zookeeper,本文介绍前两种;

2.1 基于数据库

  1. 设置唯一主键或字段,通过insert操作实现,这种操作具有幂等性,所以可以保证同一时间只能有一条记录插入,方法执行完后delete这条记录;

  2. 乐观锁和悲观锁:乐观锁就是加版本号,通过修改标记字段,判断是否获取锁成功;悲观锁就是用for update给记录加排它锁,锁住这条记录就可以保证同一时间只能由一条线程操作;

    注:由于性能问题,基于数据库的方案基本不会被采用;

2.2 基于Redis

由于Redis是单进程单线程,IO多路复用,所以线程安全问题和性能问题不用去做过多考虑和设计;
介绍Redis的几个原子操作:

  1. setnx(key, value):如果 key 不存在,则设置当前 key 成功,返回 1;如果当前 key 已经存在,则设置当前 key 失败,返回 0。
  2. getSet(key, value):设置新值并返回旧值。如果key不存在,把值设置成value并返回null;如果key存在,把值设置成value并返回之前存在的值。
  3. get(key):返回当前key对应的value。

下面是Springboot使用Redis的逻辑代码:

2.2.1.设计一个RedisLock类

RedisLock封装对Redis的操作,重点在tryLock方法,流程如下:

  1. 所有线程进入尝试获取锁,执行setNX方法,value为每个线程获取的当前时间+超时时间,执行成功则拿到锁,执行业务逻辑;执行失败,代表锁已经被占用,进行下一步判断;
  2. 为了防止超时产生死锁,要进行超时判断,如果拿到锁的线程崩溃了,后面的线程通过判断超时后,强制抢占锁;首先调用get()方法获取Redis里被setNX的值currentValue ,和当前时间比较,如果小于当前时间,代表已经超时,开始竞争锁资源;如果大于当前时间,代表未超时,进行下一次尝试;
  3. 在上一步判断超时后,调用getSet()方法进行替换并得到旧值oldValue,由于这个时候多条线程都在调用这个方法,但是只有最快的线程能拿到过期值currentValue ,判断oldValue是否等于currentValue,相等则代表最先拿到并且替换掉过期锁(当然大家都在getSet,当前的值也会很快被其他线程替换掉,即拿到锁,value也已经不是自己的原本的value,但是产生的误差可以忽略),如果不相等,则代表来晚了,锁已经被抢了,进行下一次尝试;
  4. 返回tryLock结果;

public class RedisLock {/*** 锁默认超时时间60s*/private static final long DEFAULT_EXPIRE_TIME = 60 * 1000;private long expireTime;private String lockKey;private volatile boolean locked = false;private RedisTemplate<String, String> redisTemplate;public RedisLock(RedisTemplate<String, String> redisTemplate, String lockKey) {this.redisTemplate = redisTemplate;this.lockKey = lockKey;this.expireTime = DEFAULT_EXPIRE_TIME;}public RedisLock(RedisTemplate<String, String> redisTemplate, String lockKey, long expireTime) {this(redisTemplate, lockKey);this.expireTime = expireTime;}public String get(String key) {return redisTemplate.opsForValue().get(key);}public void set(String key, String value) {redisTemplate.opsForValue().set(key, value);}private boolean setNX(String key, String value) {return redisTemplate.opsForValue().setIfAbsent(key, value);}private String getSet(String key, String value) {return redisTemplate.opsForValue().getAndSet(key, value);}public boolean tryLock() {try {//模拟每条线程尝试三次int i = 3;while (i > 0) {String value = String.valueOf(System.currentTimeMillis() + expireTime);if (this.setNX(lockKey, value)) {// 获得锁成功并返回locked = true;return true;}/* 如果获取失败,下面判断是否超时 */String currentValue = this.get(lockKey);// 如果从redis取出的值小于当前时间,代表已经超时if (currentValue != null && Long.valueOf(currentValue) < System.currentTimeMillis()) {String oldValue = this.getSet(lockKey, value);if (oldValue != null && oldValue.equals(currentValue)) {// 这种情况存在于,多个线程同步getSet后,只有最快的线程能拿到过期值currentValue,但是自己set的值可能很快被覆盖,这里忽略相差的时间值locked = true;return true;}}i --;//这里可以设置随机等待时间后再尝试try {Thread.sleep(new Random().nextInt(100));} catch (InterruptedException e) {e.printStackTrace();}}return false;} catch (Throwable e) {e.printStackTrace();return false;}}public void unlock() {if (locked) {redisTemplate.delete(lockKey);locked = false;}}}

2.2.2.服务层业务调用

拿到锁后执行业务代码,无论执行结果是什么,在finally里面释放锁,防止死锁。

@Service
public class BossService {@Autowiredprivate RedisTemplate redisTemplate;public boolean killBoss(String boss) {String name = Thread.currentThread().getName();RedisLock redisLock = new RedisLock(redisTemplate, "bossKey");if (redisLock.tryLock()) {try {String bossAmount = redisLock.get(boss);System.out.println("当前数量" + bossAmount);if (Integer.valueOf(bossAmount) > 0) {redisLock.set(boss, String.valueOf(Integer.valueOf(bossAmount) - 1));System.out.println(name + "成功杀死一个boss");} else {System.out.println("boos死完了");}return true;} catch (Exception e) {e.printStackTrace();return false;} finally {redisLock.unlock();}} else {System.out.println(name + "再试一下");return false;}}
}

2.2.3.单元测试

使用CountdownLatch计数器模拟多线程并发:调用await()方法阻塞当前线程,当计数完成后,唤醒所有线程并发执行;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = DotACloudApplication.class)
public class BossServiceTest {@Autowiredprivate BossService bossService;ExecutorService exec = Executors.newCachedThreadPool();@Testpublic void killBossTest() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 100; i ++){Runnable runnable = () -> {try {countDownLatch.await();boolean b = bossService.killBoss("bank");} catch (InterruptedException e) {e.printStackTrace();}};exec.submit(runnable);}countDownLatch.countDown();Thread.sleep(120 * 1000);}@Afterpublic void after() {exec.shutdown();}}

这里有一个本菜鸡踩的坑,那就是线程池提交任务后,子线程任务还未执行完时,主线程就结束了(单元测试和main方法不一样,main是非守护线程,所以main结束了子线程可以继续运行)。导致我以为每次都有线程超时挂掉,因为执行结果不对,去Redis查看lockKey每次都有值(正常情况每次释放锁删除lockKey),解决办法就是让主线程睡一会,确保其他线程执行完,当然为了完整保证时序,可以再加一个CountdownLatch,count为线程数,每个线程执行完后count-1,所有线程执行完后主线程await后面输出一句话就可以了。

3.结论

网上使用Redis的基本上都这种方案,弊端也很明显,需要每个节点的系统时间一致,至少误差时间不能超过设置的超时时间,否则每次判断都超时,由于误差导致锁失效,那么并发操作就会出问题。
所以推荐使用redis官方推荐的Redisson,可以参考我另一篇文章:https://blog.csdn.net/unclecoco/article/details/99442998


http://www.ppmy.cn/news/246494.html

相关文章

Netty由浅入深的学习指南(NIO基础)

本章节会重点讲解NIO的Selector、ByteBuffer和Channel三大组件。NIO即非阻塞IO。 1.1 三大组件 1.1.1 Channel(通道) & Buffer(缓冲区) ​ channel有一点类似于stream&#xff0c;他就是读写数据的双向通道&#xff0c;可以从channel将数据读入buffer,也可以将buffer的数…

NIO基础,帮助入门Netty

NIO基础 1、三大组件1.1 Channel & Buffer1.2 Selector 2.ByteBuffer2.1ByteBuffer正确使用2.2 ByteBuffer结构2.3 ByteBuffer 常用方法调试工具类分配空间向buffer中写入数据从buffer中读取数据代码演示get(),get(int i),allocate(),mark(),reset()方法的使用字符串和Byte…

Day76-Netty

title: Day76-Netty date: 2021-07-23 18:03:30 author: Liu_zimo NIO基础 non-blocking io 非阻塞io 三大组件 通道&#xff08;Channel&#xff09;、缓冲区&#xff08;Buffer&#xff09;、选择器&#xff08;Selector&#xff09; Channel & Buffer channel有点类…

NIO多路复用之Selector的使用

Selector的使用 文章目录 Selector的使用一、阻塞 & 非阻塞1. 阻塞2. 非阻塞 二、selector 介绍及常用API1. 多路复用2. 常用API 三、处理 accept 事件四、处理 read 事件1. 为什么事件必须删除2. 处理客户端断开问题2.1 客户端强制断开2.2 客户端正常断开 3. 处理消息边界…

NIO网络编程

一、阻塞模式 服务端 public class server {public static void main(String[] args) throws IOException {//用 nio 来理解阻塞模式单线程//ByteBufferByteBuffer buffer ByteBuffer.allocate(16);//1.创建服务器ServerSocketChannel ssc ServerSocketChannel.open();//2.…

NIO的理解和使用

一、概述 NIO是 non-blocking-io的简称&#xff0c;非阻塞IO&#xff0c;由于它是后续出来的IO模型&#xff0c;有时也叫做 new-IO。NIO是后续比如React等的多路复用的基础模型。它是UNIX的五种IO模型中的一种。 NIO有三大组件&#xff1a;buffer、channel和selector&#xff…

NIO基础笔记

Netty深入浅出笔记 1. NIO基础1.1 三大组件1.1.1 Channel & Buffer1.1.2 Selector 1.2 ByteBuffer1.2.1 ByteBuffer 正确使用姿势1.2.2 ByteBuffer结构1.2.3 ByteBuffer核心属性1.2.4 ByteBuffer常见方法1.2.5 字符串与 ByteBuffer 互转1.2.6 Scattering Reads(分散读取)1.…

Netty学习(一)-- Netty 底层 Java NIO

视频地址&#xff0c;建议观看&#xff0c;老师由浅入深&#xff0c;循序渐进&#xff1a; https://www.bilibili.com/video/BV1py4y1E7oA 前面的学习&#xff1a;https://blog.csdn.net/weixin_43989102/article/details/126078132 目录 1、NIO1.1、Selector1&#xff09;多线…