基于注解实现去重表消息防止重复消费

news/2025/1/22 19:19:01/

基于注解实现去重表消息防止重复消费

1. 背景/问题

分布式系统中,消息队列(如RocketMQ、Kafka)的 消息重复消费 是常见问题,主要原因包括:

  • 网络抖动:生产者或消费者因网络不稳定触发消息重发。
  • 消费者超时:消费者处理时间过长,消息队列误判为失败并重新投递。
  • 集群故障转移:消费者宕机后,未完成的消息会被其他节点重新拉取。

重复消费带来的问题

  • 业务逻辑多次执行(如重复扣款、重复生成订单)。
  • 数据一致性被破坏(如库存超卖、积分累加错误)。
  • 系统资源浪费,影响性能和稳定性。

为了避免这种情况发生,需要在客户端实现一些机制来确保消息不会被重复消费,例如记录消费者已经处理的消息 ID、使用分布式锁来控制消费进程的唯一性等。这些机制能够保证消息被成功处理,同时也能够提高系统的可靠性和稳定性。

2. 什么是幂等性

幂等性 是指对同一操作的多次执行所产生的影响与一次执行的影响相同。

  • 消息消费场景:无论消息被消费多少次,最终结果应与消费一次一致。
  • 实现目标:通过幂等设计,确保业务逻辑的重复执行不会产生副作用。

3. 幂等设计

核心思路

  1. 幂等标识:为每条消息生成唯一标识(如业务ID + 消息ID),记录其处理状态。
  2. 状态管理:通过数据库或Redis维护幂等标识的状态(如“消费中”“已消费”)。
  3. 过期时间:防止因系统崩溃导致状态长期滞留,需设置合理的超时时间(如10分钟)。
[消费者接收消息]  │  ▼  
[解析消息,生成唯一幂等标识]  │  ▼  
[查询幂等标识状态]  │  
┌───────┴───────┐  
│ 存在且已消费  │           [返回成功,丢弃消息]  
└───────┬───────┘  │  
┌───────┴───────┐  
│ 存在且消费中  │           [延迟消费,等待重试]  
└───────┬───────┘  │  
┌───────┴───────┐  
│   不存在      │  
└───────┬───────┘  │  
[设置幂等标识为“消费中”,并设置过期时间]  │  ▼  
[执行业务逻辑]  │  ▼  
[业务执行成功?]  │  
┌───────┴───────┐  
│     是        │           [更新标识为“已消费”]  
│               │           [删除或保留标识]  
└───────┬───────┘  │  
┌───────┴───────┐  
│     否        │           [删除标识,允许重试]  
└───────┬───────┘  │  ▼  
[流程结束]  

4.抽象通用幂等组件

消息防重复消费幂等组件是通用的通常会提取出来也可供其他模块/服务 使用

4.1自定义幂等注解

提供了一种通用的幂等注解,并通过 SpEL 的形式生成去重表全局唯一 Key

java">@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {/*** 设置防重令牌 Key 前缀*/String keyPrefix() default "";/*** 通过 SpEL 表达式生成的唯一 Key*/String key();/*** 设置防重令牌 Key 过期时间,单位秒,默认 1 小时*/long keyTimeout() default 3600L;
}

4.2. 定义幂等枚举

幂等需要设置两个状态,消费中和已消费,创建对应的枚举

java">@RequiredArgsConstructor
public enum IdempotentMQConsumeStatusEnum {/*** 消费中*/CONSUMING("0"),/*** 已消费*/CONSUMED("1");@Getterprivate final String code;/*** 如果消费状态等于消费中,返回失败** @param consumeStatus 消费状态* @return 是否消费失败*/public static boolean isError(String consumeStatus) {return Objects.equals(CONSUMING.code, consumeStatus);}
}

4.3.通过 AOP 的方式进行增强注解

如果说方法上加了注解,会被这段 AOP 代码以环绕增强方式执行

java">@Slf4j
@Aspect
@RequiredArgsConstructor
public final class NoMQDuplicateConsumeAspect {private final StringRedisTemplate stringRedisTemplate;private static final String LUA_SCRIPT = """local key = KEYS[1]local value = ARGV[1]local expire_time_ms = ARGV[2]return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)""";/*** 增强方法标记 {@link NoMQDuplicateConsume} 注解逻辑*/@Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)")public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint);String uniqueKey = noMQDuplicateConsume.keyPrefix() + SpELUtil.parseKey(noMQDuplicateConsume.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());String absentAndGet = stringRedisTemplate.execute(RedisScript.of(LUA_SCRIPT, String.class),List.of(uniqueKey),IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout())));// 如果不为空证明已经有if (Objects.nonNull(absentAndGet)) {boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");if (errorFlag) {throw new ServiceException(String.format("消息消费者幂等异常,幂等标识:%s", uniqueKey));}return null;}Object result;try {// 执行标记了消息队列防重复消费注解的方法原逻辑result = joinPoint.proceed();// 设置防重令牌 Key 过期时间,单位秒stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS);} catch (Throwable ex) {// 删除幂等 Key,让消息队列消费者重试逻辑进行重新消费stringRedisTemplate.delete(uniqueKey);throw ex;}return result;}/*** @return 返回自定义防重复消费注解*/public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());return targetMethod.getAnnotation(NoMQDuplicateConsume.class);}

lua脚本解释

java">local key = KEYS[1] # 第一个 Key,即幂等唯一标识 uniqueKey
local value = ARGV[1] # 第一个参数,即初始化幂等消费状态,为消费中
local expire_time_ms = ARGV[2] # 第二个参数,即幂等 Key 过期时间return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)

该脚本的主要作用是:在 Redis 中尝试以 NX 方式设置一个键,即如果键不存在,则设置新值,并返回设置之前的旧值,同时为该键设置过期时间(以毫秒为单位)。

获取到 Redis 里面的 Key 值后,可能会有三个流程执行:

absentAndGet 为空:代表消息是第一次到达,执行完 LUA 脚本后,会在 Redis 设置 Key 的 Value 值为 0,消费中状态。

absentAndGet 为 0:代表已经有相同消息到达并且还没有处理完,会通过抛异常的形式让 RocketMQ 重试。

absentAndGet 为 1:代表已经有相同消息消费完成,返回空表示不执行任何处理。

4.4.注册为 Spring Bean

另外可以看看另一篇基于分布式锁注解防重复提交

https://blog.csdn.net/sjsjsbbsbsn/article/details/145131305?spm=1001.2014.3001.5501

java">public class IdempotentConfiguration {/*** 防止消息队列消费者重复消费消息切面控制器*/@Beanpublic NoMQDuplicateConsumeAspect noMQDuplicateConsumeAspect(StringRedisTemplate stringRedisTemplate) {return new NoMQDuplicateConsumeAspect(stringRedisTemplate);}
}

4.5EL工具类

java">public class SpELUtil {/*** 校验并返回实际使用的 spEL 表达式** @param spEl spEL 表达式* @return 实际使用的 spEL 表达式*/public static Object parseKey(String spEl, Method method, Object[] contextObj) {List<String> spELFlag = ListUtil.of("#", "T(");Optional<String> optional = spELFlag.stream().filter(spEl::contains).findFirst();if (optional.isPresent()) {return parse(spEl, method, contextObj);}return spEl;}/*** 转换参数为字符串** @param spEl       spEl 表达式* @param contextObj 上下文对象* @return 解析的字符串值*/public static Object parse(String spEl, Method method, Object[] contextObj) {DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer();ExpressionParser parser = new SpelExpressionParser();Expression exp = parser.parseExpression(spEl);String[] params = discoverer.getParameterNames(method);StandardEvaluationContext context = new StandardEvaluationContext();if (ArrayUtil.isNotEmpty(params)) {for (int len = 0; len < params.length; len++) {context.setVariable(params[len], contextObj[len]);}}return exp.getValue(context);}
}

5.实战使用

使用天机学堂项目来进行实战

5.1写入common模块

在这里插入图片描述

5.2使用

在这里插入图片描述

直接加上注解就可以

但是实际上这里不存在幂等问题,因为userId和courseId设置了唯一索引,所以这里不存在幂等性,不需要加上幂等注解


http://www.ppmy.cn/news/1565284.html

相关文章

谈谈MySQL中的索引和事务

目录 1. 索引 1.1 索引介绍 1.2 缺陷 1.3 使用 1.3.1 查看索引 1.3.2 创建索引 1.3.3 删除索引 2. 索引底层的数据结构 2.1 B树 3. 事务 3.1 为什么使用事务 3.2 事务的使用 3.3 事务的基本特性 1. 索引 1.1 索引介绍 索引相当于一本书的目录(index), 在一…

GIT的常规使用

分别如果提交了两次git,如 sepolicy$ git log commit 695ceb9d8726d1faa72eda7dea1feccf4805b606 (HEAD -> master) Author: kang <xxx.com> Date: Tue Jan 21 11:21:22 2025 0800 usb disk ok commit 3b3ff9f6c7b30370a8a0c2c7f33013995a808641 Author: kang <…

第2章:Python TDD构建Dollar类基础

写在前面 这本书是我们老板推荐过的&#xff0c;我在《价值心法》的推荐书单里也看到了它。用了一段时间 Cursor 软件后&#xff0c;我突然思考&#xff0c;对于测试开发工程师来说&#xff0c;什么才更有价值呢&#xff1f;如何让 AI 工具更好地辅助自己写代码&#xff0c;或许…

Wi-Fi 7、Wi-Fi 6 与 5G、4G 的全方位对比

随着无线通信技术的飞速发展&#xff0c;Wi-Fi 7、Wi-Fi 6&#xff0c;以及5G、4G 已经成为人们生活和工作中不可或缺的网络技术。无论是家庭网络、高速移动通信&#xff0c;还是工业物联网&#xff0c;这些技术都在发挥各自的作用。那么&#xff0c;它们之间有什么区别&#x…

网络安全行业岗位职责

系统安全需求分析师 综合能力 掌握常见的IT系统安全需求 具备总结分析整体网络安全需求及子系统安全需求的能力 具有良好的沟通、团队协作和主动性思考的能力 具备良好的技术文档编制能力 专业知识 熟悉网络、终端、数据、威胁情报、态势感知、流量威胁分析等产品的技术方…

详解Rust 中 String 和 str 的用途与区别

文章目录 1. 基本定义1.1 String1.2 str2. 存储位置与内存模型2.1 String2.2 str3. 用法与区别4. 使用场景4.1 使用 String 的场景4.2 使用 str 的场景5. String 和 str 的关系6. 代码示例分析6.1 从 &str 创建 String6.2 从 String 获取 &str6.3 拼接字符串6.4 静态存…

大华Java开发面试题及参考答案 (上)

TCP 的三次握手和四次挥手过程中各个状态的细节是怎样的&#xff1f; TCP&#xff08;Transmission Control Protocol&#xff09;是一种面向连接的、可靠的传输层协议&#xff0c;其三次握手和四次挥手过程涉及多个状态&#xff0c;以下是详细的状态细节&#xff1a; 三次握手…

Ubuntu22.4挂载大于2.2T磁盘(27T大磁盘)

一、查看磁盘信息 sudo fdisk -l 二、创建文件系统 sudo mkfs.xfs /dev/sdX 三、获取磁盘的 UUID sudo blkid /dev/sdX 四、创建挂载点 sudo mkdir -p /data /dev/sdb: UUID"d01bbb50-28be-4328-bf18-111111111" BLOCK_SIZE"4096" TYPE"xfs&quo…