简单理解下基于 Redisson 库的分布式锁机制

ops/2024/11/23 17:38:40/

目录

    • 简单理解下基于 Redisson 库的分布式锁机制
      • 代码流程:
        • 方法的调用:
        • 具体锁的实现:
          • riderBalance 方法:
          • tryLock 方法(重载):
          • tryLock 方法(核心实现):

在这里插入图片描述

简单理解下基于 Redisson 库的分布式锁机制

这段代码实现了一个基于 Redisson 库的分布式锁机制

Redisson 是一个基于 Redis 的 Java 客户端库,它提供了丰富的分布式数据结构和工具,旨在使 Java 开发者能够轻松地在分布式环境下使用 Redis。

Redisson 是 Redis 的高级封装,除了常规的 Redis 命令支持外,它还提供了很多额外的功能,如分布式锁、分布式集合、分布式队列、分布式缓存等,使得开发者在分布式系统中实现高效、可靠的操作。

比方对一个用户所拥有的金额进行增减操作,肯定需要上锁才能保证一定的安全性。

代码流程:

方法的调用:

一个简单的流程:

LockUtil.riderBalance(riderId, () -> {…}) 是一个加锁操作,是一种使用分布式锁来确保线程安全的机制,确保在修改 余额时,避免并发冲突。

简要来说,它的作用是在执行余额操作时,确保只有一个线程能够对指定的骑手账户进行操作,避免多个线程同时修改余额,导致数据不一致的情况。

LockUtil.riderBalance(riderId, …) 通过 riderId 获取一个锁,这样就能确保在同一时刻,只有一个线程可以执行传入的操作逻辑。

riderId 是骑手的唯一标识,通过它来加锁,防止对同一个骑手账户的并发操作。

() -> {…}: 这是一个 Lambda 表达式,表示当获得锁之后要执行的具体操作。这个操作包括:获取骑手的余额信息、执行余额扣除、记录操作日志等步骤。

    /*** 扣除余额*/@Override@Transactional(rollbackFor = Exception.class)public boolean deductionBalance(Integer riderId, BigDecimal amount, String title, Integer orderId, String orderCode, RiderCashlogType cashlogType, RiderCashlogStatus cashlogStatus) {return LockUtil.riderBalance(riderId, () -> {// 变更前RiderServiceDto riderBefore = this.getRiderById(riderId);// 减少余额 -------- ★★★★★★★★★★★★★★★★★★★★★★★★boolean flag = xxRiderService.deductionBalance(riderId, amount);// 变更后RiderServiceDto riderAfter = this.getRiderById(riderId);// 增加流水日志if (flag) {RiderCashlogServiceDto cashlog = new RiderCashlogServiceDto();cashlog.setMoneyType(RiderCashlogMoneyType.BALANCE);cashlog.setTitle(title);......return xxRiderCashlogService.addCashlog(cashlog);}return false;}, () -> {throw BizException.newInstance(ErrorCode.BUSY);});}

调用接口方法

    /*** 扣除余额*/boolean deductionBalance(Integer riderId, BigDecimal amount);

这里再进行详细的扣除方法,依然是用同个锁

    /*** 扣除余额*/@Overridepublic boolean deductionBalance(Integer riderId, BigDecimal amount) {if (amount.compareTo(BigDecimal.ZERO) == 0) {return true;}// 加锁return LockUtil.riderBalance(riderId, () -> {if (ObjectUtil.isEmpty(riderId) || amount.compareTo(BigDecimal.ZERO) < 0) {log.error("扣除xx余额失败,riderId:{},amount:{}", riderId, amount);return false;}RiderServiceDto rider = this.getRiderById(riderId);// 获取当前账号的余额BigDecimal valid = rider.getValidMoney();// 判断当前余额是否足够if (valid.compareTo(amount) < 0) {throw BizException.newInstance(ErrorCode.ARGUMENT_ERROR, "扣除余额失败,余额小于" + amount.toPlainString());}return this.update(new UpdateWrapper<RiderEntity>().lambda().eq(RiderEntity::getId, riderId).setSql("valid_money = valid_money - " + amount));}, () -> {throw BizException.newInstance(ErrorCode.BUSY);});}
具体锁的实现:
riderBalance 方法:
    /*** 锁骑手余额------* Integer riderId: 骑手的 ID,作为分布式锁的标识。* Supplier<T> supplier: 提供数据或执行操作的函数接口。*                       传入的 Supplier<T> 函数接口,用来提供需要执行的业务操作。如果获取锁成功,业务操作会在锁内部执行。* InvokeInter lockFail: 如果获取锁失败,执行的操作。*/public static <T> T riderBalance(Integer riderId, Supplier<T> supplier, InvokeInter lockFail) {return tryLock(RedisKey.LOCK_RIDER_BALANCE_XXXXX.getKey(riderId), supplier, lockFail);}
tryLock 方法(重载):
    /*** 设置分布式锁*/public static <T> T tryLock(String key, Supplier<T> supplier, InvokeInter lockFail) {//这是 tryLock 的一个重载方法,它会在尝试获取锁时默认设置超时时间为 10 秒//通过 key、supplier 和 lockFail,它会调用 tryLock 的另一个重载版本,并使用 10 秒的默认超时时间return tryLock(key, supplier, lockFail, 10, TimeUnit.SECONDS);}
tryLock 方法(核心实现):
/*** 设置分布式锁-----------* key: 锁的标识符,通常是与某个资源(如骑手余额)相关的唯一标识。* supplier: 需要执行的业务逻辑。* lockFail: 获取锁失败时的回调操作。* amount 和 unit: 锁的持有时间(amount)和时间单位(unit),用于控制获取锁的最长等待时间。*/public static <T> T tryLock(String key, Supplier<T> supplier, InvokeInter lockFail, long amount, TimeUnit unit) {//通过 Redisson 客户端获取一个 RLock 对象,RLock 是 Redisson 提供的分布式锁实现。RLock lock = _this.redissonClient.getLock(key);try {// lock.isLocked():判断锁是否已经被其他线程持有。//lock.isHeldByCurrentThread():判断当前线程是否已经持有该锁。如果当前线程已经持有锁,它不需要重复获取。if (lock.isLocked() && !lock.isHeldByCurrentThread()) {if (lockFail == null) {// 如果 lockFail 为空,则抛出 BizException 异常,表示当前资源忙(例如余额操作正在进行)throw BizException.newInstance(ErrorCode.BUSY);}// 如果锁已经被其他线程持有且不是当前线程持有,且 lockFail 不为空,就执行 lockFail.invoke() 来处理锁获取失败的情况。lockFail.invoke();//lock.tryLock(amount, unit):尝试在指定的时间内(amount 秒)获取锁。如果成功,执行后续的业务操作。}  else if (lock.tryLock(amount, unit)) {try {// 如果锁获取成功,调用 supplier.get() 执行传入的业务逻辑。return supplier.get();} finally {//无论业务逻辑执行成功与否,都要确保释放锁,lock.unlock() 用于释放锁,防止死锁。lock.unlock();}} else {//如果在 tryLock 的超时时间内无法获得锁(即返回 false),就执行 lockFail.invoke(),或者抛出 BizException 异常表示资源繁忙if (lockFail == null) {throw BizException.newInstance(ErrorCode.BUSY);}lockFail.invoke();}//最终返回 null,因为所有锁的操作是围绕着锁获取和释放展开的,所有的业务执行通过 supplier.get() 来完成,返回值由 supplier 提供return null;} catch (InterruptedException e) {//如果线程在等待锁的过程中被中断,则抛出 BizException 异常,表示当前资源忙(此时无法完成操作)。throw BizException.newInstance(ErrorCode.BUSY);}}

这段代码主要实现了一个分布式锁的功能,通过 Redisson 提供的 RLock 来确保在分布式环境下对资源的访问是互斥的,避免多个线程或进程同时访问同一资源(如xxx余额)。

tryLock 方法用于尝试获取锁,并在成功获取锁时执行指定的业务操作(supplier.get())。如果获取锁失败,依据 lockFail 参数的值执行相应的失败处理操作。

锁的获取有超时机制,超过超时时间仍无法获取锁时会触发失败处理


http://www.ppmy.cn/ops/136096.html

相关文章

Spring Boot 深度解析:快速构建高效、现代化的 Web 应用程序

Spring Boot 是由 Pivotal 团队提供的全新框架&#xff0c;其设计目的是用来简化新 Spring 应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置&#xff0c;从而使开发者能够更加迅速地开始编写代码。Spring Boot 的主要目标是&#xff1a; 简化创建独立的、生产级…

AIVA 技术浅析(四):捕捉音乐作品中的长期依赖关系

为了生成具有连贯性和音乐性的作品&#xff0c;AIVA 运用了多种深度学习模型&#xff0c;其中包括长短期记忆网络&#xff08;LSTM&#xff09;和门控循环单元&#xff08;GRU&#xff09;等循环神经网络&#xff08;RNN&#xff09;的变种。 如何使用 LSTM 和 GRU 来捕捉音乐…

【2024亚太杯亚太赛APMCM C题】数学建模竞赛|宠物行业及相关产业的发展分析与策略|建模过程+完整代码论文全解全析

第一个问题是&#xff1a;请基于附件 1 中的数据以及你的团队收集的额外数据&#xff0c;分析过去五年中国宠物行业按宠物类型的发展情况。并分析中国宠物行业发展的因素&#xff0c;预测未来三年中国宠物行业的发展。 第一个问题&#xff1a;分析中国宠物行业按宠物类型的发展…

sourceTree无效的源路径问题解决

1.点击工具 2.点击选项 3.修改ssh客户端为OpenSSH 4.点击确定&#xff0c;然后重新打开软件

Python小游戏28——水果忍者

首先&#xff0c;你需要安装Pygame库。如果你还没有安装&#xff0c;可以使用以下命令进行安装&#xff1a; 【bash】 pip install pygame 《水果忍者》游戏代码&#xff1a; 【python】 import pygame import random import sys # 初始化Pygame pygame.init() # 设置屏幕尺寸 …

无插件直播流媒体音视频播放器EasyPlayer.js播放器的g711系列的音频,听起来为什么都是杂音

在数字化时代&#xff0c;流媒体播放器已成为信息传播和娱乐消遣的重要工具。随着技术的进步&#xff0c;流媒体播放器的核心技术和发展趋势不断演变&#xff0c;以满足用户对于无缝播放、低延迟和高画质的需求。 EasyPlayer播放器属于一款高效、精炼、稳定且免费的流媒体播放…

Linux 使用gdb调试core文件

core文件和gdb调试 什么是 core 文件&#xff1f;产生core文件的原因&#xff1f;core 文件的控制和生成路径gdb 调试core 文件引用和拓展 什么是 core 文件&#xff1f; 当程序运行过程中出现Segmentation fault (core dumped)错误时&#xff0c;程序停止运行&#xff0c;并产…

“漫步北京”小程序及“气象景观数字化服务平台”上线啦

随着科技的飞速发展&#xff0c;智慧旅游已成为现代旅游业的重要趋势。近日&#xff0c;北京万云科技有限公司联合北京市气象服务中心&#xff0c;打造的“气象景观数字化服务平台“和“漫步北京“小程序已经上线&#xff0c;作为智慧旅游的典型代表&#xff0c;以其丰富的功能…