基于注解实现去重表消息防止重复消费

ops/2025/1/22 8:13:23/

基于注解实现去重表消息防止重复消费

1. 背景/问题

分布式系统中,消息队列(如RocketMQ、Kafka)的 消息重复消费 是常见问题,主要原因包括:

  • 网络抖动:生产者或消费者因网络不稳定触发消息重发。
  • 消费者超时:消费者处理时间过长,消息队列误判为失败并重新投递。
  • 集群故障转移:消费者宕机后,未完成的消息会被其他节点重新拉取。

重复消费带来的问题

  • 业务逻辑多次执行(如重复扣款、重复生成订单)。
  • 数据一致性被破坏(如库存超卖、积分累加错误)。
  • 系统资源浪费,影响性能和稳定性。

为了避免这种情况发生,需要在客户端实现一些机制来确保消息不会被重复消费,例如记录消费者已经处理的消息 ID、使用分布式锁来控制消费进程的唯一性等。这些机制能够保证消息被成功处理,同时也能够提高系统的可靠性和稳定性。

2. 什么是幂等性

幂等性 是指对同一操作的多次执行所产生的影响与一次执行的影响相同。

  • 消息消费场景:无论消息被消费多少次,最终结果应与消费一次一致。
  • 实现目标:通过幂等设计,确保业务逻辑的重复执行不会产生副作用。

3. 幂等设计

核心思路

  1. 幂等标识:为每条消息生成唯一标识(如业务ID + 消息ID),记录其处理状态。
  2. 状态管理:通过数据库或Redis维护幂等标识的状态(如“消费中”“已消费”)。
  3. 过期时间:防止因系统崩溃导致状态长期滞留,需设置合理的超时时间(如10分钟)。
[消费者接收消息]  │  ▼  
[解析消息,生成唯一幂等标识]  │  ▼  
[查询幂等标识状态]  │  
┌───────┴───────┐  
│ 存在且已消费  │           [返回成功,丢弃消息]  
└───────┬───────┘  │  
┌───────┴───────┐  
│ 存在且消费中  │           [延迟消费,等待重试]  
└───────┬───────┘  │  
┌───────┴───────┐  
│   不存在      │  
└───────┬───────┘  │  
[设置幂等标识为“消费中”,并设置过期时间]  │  ▼  
[执行业务逻辑]  │  ▼  
[业务执行成功?]  │  
┌───────┴───────┐  
│     是        │           [更新标识为“已消费”]  
│               │           [删除或保留标识]  
└───────┬───────┘  │  
┌───────┴───────┐  
│     否        │           [删除标识,允许重试]  
└───────┬───────┘  │  ▼  
[流程结束]  

4.抽象通用幂等组件

消息防重复消费幂等组件是通用的通常会提取出来也可供其他模块/服务 使用

4.1自定义幂等注解

提供了一种通用的幂等注解,并通过 SpEL 的形式生成去重表全局唯一 Key

java">@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {/*** 设置防重令牌 Key 前缀*/String keyPrefix() default "";/*** 通过 SpEL 表达式生成的唯一 Key*/String key();/*** 设置防重令牌 Key 过期时间,单位秒,默认 1 小时*/long keyTimeout() default 3600L;
}

4.2. 定义幂等枚举

幂等需要设置两个状态,消费中和已消费,创建对应的枚举

java">@RequiredArgsConstructor
public enum IdempotentMQConsumeStatusEnum {/*** 消费中*/CONSUMING("0"),/*** 已消费*/CONSUMED("1");@Getterprivate final String code;/*** 如果消费状态等于消费中,返回失败** @param consumeStatus 消费状态* @return 是否消费失败*/public static boolean isError(String consumeStatus) {return Objects.equals(CONSUMING.code, consumeStatus);}
}

4.3.通过 AOP 的方式进行增强注解

如果说方法上加了注解,会被这段 AOP 代码以环绕增强方式执行

java">@Slf4j
@Aspect
@RequiredArgsConstructor
public final class NoMQDuplicateConsumeAspect {private final StringRedisTemplate stringRedisTemplate;private static final String LUA_SCRIPT = """local key = KEYS[1]local value = ARGV[1]local expire_time_ms = ARGV[2]return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)""";/*** 增强方法标记 {@link NoMQDuplicateConsume} 注解逻辑*/@Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)")public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint);String uniqueKey = noMQDuplicateConsume.keyPrefix() + SpELUtil.parseKey(noMQDuplicateConsume.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());String absentAndGet = stringRedisTemplate.execute(RedisScript.of(LUA_SCRIPT, String.class),List.of(uniqueKey),IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout())));// 如果不为空证明已经有if (Objects.nonNull(absentAndGet)) {boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");if (errorFlag) {throw new ServiceException(String.format("消息消费者幂等异常,幂等标识:%s", uniqueKey));}return null;}Object result;try {// 执行标记了消息队列防重复消费注解的方法原逻辑result = joinPoint.proceed();// 设置防重令牌 Key 过期时间,单位秒stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS);} catch (Throwable ex) {// 删除幂等 Key,让消息队列消费者重试逻辑进行重新消费stringRedisTemplate.delete(uniqueKey);throw ex;}return result;}/*** @return 返回自定义防重复消费注解*/public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());return targetMethod.getAnnotation(NoMQDuplicateConsume.class);}

lua脚本解释

java">local key = KEYS[1] # 第一个 Key,即幂等唯一标识 uniqueKey
local value = ARGV[1] # 第一个参数,即初始化幂等消费状态,为消费中
local expire_time_ms = ARGV[2] # 第二个参数,即幂等 Key 过期时间return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)

该脚本的主要作用是:在 Redis 中尝试以 NX 方式设置一个键,即如果键不存在,则设置新值,并返回设置之前的旧值,同时为该键设置过期时间(以毫秒为单位)。

获取到 Redis 里面的 Key 值后,可能会有三个流程执行:

absentAndGet 为空:代表消息是第一次到达,执行完 LUA 脚本后,会在 Redis 设置 Key 的 Value 值为 0,消费中状态。

absentAndGet 为 0:代表已经有相同消息到达并且还没有处理完,会通过抛异常的形式让 RocketMQ 重试。

absentAndGet 为 1:代表已经有相同消息消费完成,返回空表示不执行任何处理。

4.4.注册为 Spring Bean

另外可以看看另一篇基于分布式锁注解防重复提交

https://blog.csdn.net/sjsjsbbsbsn/article/details/145131305?spm=1001.2014.3001.5501

java">public class IdempotentConfiguration {/*** 防止消息队列消费者重复消费消息切面控制器*/@Beanpublic NoMQDuplicateConsumeAspect noMQDuplicateConsumeAspect(StringRedisTemplate stringRedisTemplate) {return new NoMQDuplicateConsumeAspect(stringRedisTemplate);}
}

4.5EL工具类

java">public class SpELUtil {/*** 校验并返回实际使用的 spEL 表达式** @param spEl spEL 表达式* @return 实际使用的 spEL 表达式*/public static Object parseKey(String spEl, Method method, Object[] contextObj) {List<String> spELFlag = ListUtil.of("#", "T(");Optional<String> optional = spELFlag.stream().filter(spEl::contains).findFirst();if (optional.isPresent()) {return parse(spEl, method, contextObj);}return spEl;}/*** 转换参数为字符串** @param spEl       spEl 表达式* @param contextObj 上下文对象* @return 解析的字符串值*/public static Object parse(String spEl, Method method, Object[] contextObj) {DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer();ExpressionParser parser = new SpelExpressionParser();Expression exp = parser.parseExpression(spEl);String[] params = discoverer.getParameterNames(method);StandardEvaluationContext context = new StandardEvaluationContext();if (ArrayUtil.isNotEmpty(params)) {for (int len = 0; len < params.length; len++) {context.setVariable(params[len], contextObj[len]);}}return exp.getValue(context);}
}

5.实战使用

使用天机学堂项目来进行实战

5.1写入common模块

在这里插入图片描述

5.2使用

在这里插入图片描述

直接加上注解就可以

但是实际上这里不存在幂等问题,因为userId和courseId设置了唯一索引,所以这里不存在幂等性,不需要加上幂等注解


http://www.ppmy.cn/ops/152143.html

相关文章

深度学习基础--LSTM学习笔记(李沐《动手学习深度学习》)

前言 LSTM是RNN模型的升级版&#xff0c;神经网络模型较为复杂&#xff0c;这里是学习笔记的记录&#xff1b;LSTM比较复杂&#xff0c;可以先看&#xff1a; 深度学习基础–一文搞懂RNN 深度学习基础–GRU学习笔记(李沐《动手学习深度学习》) RNN&#xff1a;RNN讲解参考&am…

「2024 博客之星」自研Java框架 Sunrays-Framework 使用教程

文章目录 0.序言我的成长历程遇到挫折&#xff0c;陷入低谷重拾信心&#xff0c;迎接未来开源与分享我为何如此看重这次评选最后的心声 1.概述1.主要功能2.相关链接 2.系统要求构建工具框架和语言数据库与缓存消息队列与对象存储 3.快速入门0.配置Maven中央仓库1.打开settings.…

医院管理系统小程序设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…

资料03:【TODOS案例】微信小程序开发bilibili

样式 抽象数据类型 页面数据绑定 事件传参

2025美赛数学建模B题思路+模型+代码+论文

2025美赛数学建模A题B题C题D题E题思路模型代码&#xff08;1.24第一时间更新&#xff0c;更新见文末名片&#xff09; 论文数学建模感想 纪念逝去的大学数学建模&#xff1a;两次校赛&#xff0c;两次国赛&#xff0c;两次美赛&#xff0c;一次电工杯。从大一下学期组队到现在…

经验收录/用复盘的心态去学习

1.日拱一卒&#xff0c;想法积极。每次解决一点眼前的现实问题&#xff0c;长远来看是最高效的方法&#xff0c;一开始目标太远大&#xff0c;反而增加负担&#xff0c;在能力不够时想得太多反而会不愿意努力。 2.摆脱之前的思路。养成批判性思维&#xff0c;旁观者视角&#…

Linux C\C++方式下的文件I/O编程

【图书推荐】《Linux C与C一线开发实践&#xff08;第2版&#xff09;》_linux c与c一线开发实践pdf-CSDN博客 《Linux C与C一线开发实践&#xff08;第2版&#xff09;&#xff08;Linux技术丛书&#xff09;》(朱文伟&#xff0c;李建英)【摘要 书评 试读】- 京东图书 Lin…

Data Filtering Network 论文阅读和理解

目录 一、TL&#xff1b;DR 二、Introduction 2.1 apple的结论 2.2 业界做法&#xff1a; 2.3 我们的做法&#xff08;Apple&#xff09; 2.4 如何获取好的DFN 三、未完待续&#xff08;这周出去购物了&#xff0c;下周继续补充&#xff09; 一、TL&#xff1b;DR 核心…