Redisson分布式锁的源码解读

ops/2024/12/26 12:08:49/

之前秒杀项目中就用到了这个 Redisson 分布式锁 👇,这篇就一起来看看源码吧!

图片

tryLock 加锁 流程

图片

// RedissonLock.java
@Override
public boolean tryLock() {return get(tryLockAsync());
}@Override
public RFuture<Boolean> tryLockAsync() {return tryLockAsync(Thread.currentThread().getId());
}@Override
public RFuture<Boolean> tryLockAsync(long threadId) {return tryAcquireOnceAsync(-1, -1, null, threadId);
}private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Boolean> acquiredFuture;// 续租时间:锁的过期时间(没有设置的话就用默认的 internalLockLeaseTime 看门狗时间)if (leaseTime > 0) {acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {// lock acquiredif (acquired) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 没配置过期时间就执行这里scheduleExpirationRenewal(threadId);}}return acquired;});return new CompletableFutureWrapper<>(f);
}

代码很长,主要看 tryLockInnerAsyncscheduleExpirationRenewal 方法。

前置知识

图片

图片

// EVAL 命令,用于在 Redis 服务器端执行 Lua 脚本。
RedisStrictCommand<Boolean> EVAL_NULL_BOOLEAN = new RedisStrictCommand<Boolean>("EVAL", new BooleanNullReplayConvertor());// BooleanNullReplayConvertor 判断是不是 NULL。
public class BooleanNullReplayConvertor implements Convertor<Boolean> {@Overridepublic Boolean convert(Object obj) {        return obj == null;     }
}

tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {// getRawName 即 锁的名称return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,// 锁不存在,添加 hash 数据,可重入次数加一,毫秒级别过期时间,返回 null"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 锁存在,可重入次数加一,毫秒级别过期时间,返回 null"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 锁被别人占有, 返回毫秒级别过期时间"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

ARGV[1] 过期时间

ARGV[2] 即 getLockName(threadId) ,这里是 redisson 客户端id + 这个线程 ID , 如下 👇

图片

scheduleExpirationRenewal (看门狗机制)

上面加锁完,就来到这段代码。

没有设置过期时间的话,默认给你设置 30 s 过期,并每隔 10s 自动续期,确保锁不会在使用过程中过期。

同时,防止客户端宕机,留下死锁。

图片

// RedissonBaseLock.javaprotected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {// 看这里 renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}
}private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}// 延时任务,10s 续期一次。Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 续期操作CompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getRawName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// reschedule itselfrenewExpiration();} else {cancelExpirationRenewal(null);}});}// 三分之一时间,30s /3= 10s}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}// 续期脚本
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));
}

get

上面的加锁操作,最终返回的是 return new CompletableFutureWrapper<>(f);  这个异步操作。

还记得上面的 BooleanNullReplayConvertor 吗,当 eval 执行加锁脚本时,成功会返回 null,并在这里转成 True 。

@Override
public <V> V get(RFuture<V> future) {if (Thread.currentThread().getName().startsWith("redisson-netty")) {throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");}try {return future.toCompletableFuture().get();} catch (InterruptedException e) {future.cancel(true);Thread.currentThread().interrupt();throw new RedisException(e);} catch (ExecutionException e) {throw convertException(e);}
}

那么,加锁的部分到这里就结束, 解锁 的就简单过一下 👇

unlock 解锁

图片

// RedissonLock.java
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// 不存在,直接返回 null"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +// 减一"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +// 大于0,设置毫秒级过期时间,并返回0"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +// 删除锁,并向指定channel发布 0 这个消息,并返回1"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +// 返回 null"return nil;",Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

KEYS[1] 为锁名,KEYS[2] channel 名 👇

图片

ARGV[1] 为0 👇, ARGV[2] 过期时间,ARGV[3] 为 redisson 客户端id + 这个线程 ID

图片

解锁后,取消续期任务。

图片

结尾

通过源码,我们了解到上文提到的 redisson 框架的几个特点:自动续期可重入锁lua脚本


http://www.ppmy.cn/ops/144629.html

相关文章

【国产NI替代】基于国产FPGA+兆易创新GD32F450的全国产16振动+2转速(24bits)高精度终端采集板卡

16振动2转速&#xff08;24bits&#xff09;高精度终端采集板卡 采用AG16KF256国产FPGA兆易创新GD32F450 国产ARM的硬件架构&#xff0c;虽然比T3处理器的运算 能力弱&#xff0c;但是具备成本更低&#xff0c;代码更易维护 的特点。 内置算法可以完成特征值的计算以及请求…

深入理解 JVM 垃圾回收机制

在 Java 开发领域&#xff0c;JVM&#xff08;Java 虚拟机&#xff09;的垃圾回收机制是保障程序高效稳定运行的关键环节。它自动处理内存管理中繁琐且易错的垃圾回收任务。 一、垃圾回收的基本概念 在程序运行过程中&#xff0c;会不断创建各种对象&#xff0c;这些对象占用…

小程序租赁系统的优势与未来发展潜力分析

内容概要 小程序租赁系统正在成为租赁行业的热门工具&#xff0c;大家都在谈论它的优势和未来潜力。让我们简单分析一下这些优势&#xff1a; “便捷性和高效性是小程序租赁系统的两个杀手锏&#xff0c;让我们一起揭开它们的神秘面纱&#xff01;” 在许多情况下&#xff0c;…

Android Bootable Recovery 中的 applypatch.cpp 文件解析

Android Bootable Recovery 中的 applypatch.cpp 文件解析 目录 引言Android Recovery 模式概述applypatch.cpp 文件的作用核心功能解析 4.1 补丁应用的基本原理4.2 文件差异算法4.3 内存管理与优化4.4 错误处理与恢复机制代码结构与关键函数分析 5.1 main() 函数5.2 apply_pa…

关于生活的事

作者&#xff1a;丢丢 链接&#xff1a;https://www.zhihu.com/question/361343495/answer/21351211308 来源&#xff1a;知乎 著作权归作者所有。商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处。 告诉你一个锻炼好身体的方法&#xff0c;我发现越是让人健康…

Pytorch | 从零构建MobileNet对CIFAR10进行分类

Pytorch | 从零构建MobileNet对CIFAR10进行分类 CIFAR10数据集MobileNet设计理念网络结构技术优势应用领域 MobileNet结构代码详解结构代码代码详解DepthwiseSeparableConv 类初始化方法前向传播 forward 方法 MobileNet 类初始化方法前向传播 forward 方法 训练过程和测试结果…

[计算机网络]RIP协议

RIP协议 1&#xff09;什么是RIP RIP是一种分布式的&#xff0c;基于距离向量的路由选择协议 运行RIP协议的路由器&#xff0c;维护从它自己到其他每一个目的网络的距离记录。 2&#xff09;距离的定义 从一个路由器到直接连接的网络的距离定义为0。 从一个路由器到非直接连接…

Rust之抽空学习系列(五)—— 所有权(上)

Rust之抽空学习系列&#xff08;五&#xff09;—— 所有权&#xff08;上&#xff09; 1、什么是所有权 所有权是确保Rust程序安全的一种机制 安全则是指程序中没有未定义的行为未定义的行为是指在执行一段代码时&#xff0c;结果不可预测且未被编程语言指定的情况Rust的基…