业务背景
考虑这样两个场景:
- 消息 M 被发送到消息中间件并被消费者 A 接收,但在消费过程中系统重启,由于消息未被标记为成功消费,中间件会重复投递,直到消费成功,但可能导致消息被多次投递。
-
消费者 A 在消费完成消息 M 后准备通知中间件成功时系统重启,中间件认为消息未被消费,继续投递,导致消息被重复消费。
在分布式场景下,保证消息不丢和避免消息重复投递是矛盾的,但是消息重复投递是可以解决的,而消息丢失则非常麻烦。
幂等设计
下述方案的优点在于,使用 Redis 消息去重表,不依赖事务,针对消息表本身做了状态的区分:消费中、消费完成。
如果消息已经在消费中,抛出异常,消息会触发延迟消费,在 RocketMQ 的场景下如果消息消费失败,会间隔时间后再次发起消费流程。
通过该方案可以解决什么问题?
- 消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
- 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
- 支持上游业务生产者重发的业务重复的消息幂等问题。
为什么要给初始化的幂等标识新增 10 分钟过期时间?
在并发场景下,我们使用消息状态来实现并发控制,以使第二条消息被不断延迟消费(即重试)。但如果在此期间第一条消息也因某些异常原因(例如机器重启或外部异常)未成功消费,该怎么办呢?因为每次查询时都会显示消费中的状态,所以延迟消费会一直进行下去,直到最终被视为消费失败并被投递到死信 Topic 中(RocketMQ 默认最多可以重复消费 16 次)。
针对这个问题,我们采取了一种解决方案:在插入消息表时,必须为每条消息设置一个最长消费过期时间,例如 10 分钟。这意味着,如果某个消息在消费过程中超过了 10 分钟,就会被视为消费失败并从消息表中删除。
设计思路
对于幂等性的实现,首先想到的就是第4节的Spring AOP环绕通知,思路基本一致。
1. 自定义幂等注解
java">@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {
/*** 设置防重令牌 Key 前缀*/String keyPrefix() default "";
/*** 通过 SpEL 表达式生成的唯一 Key*/String key();
/*** 设置防重令牌 Key 过期时间,单位秒,默认 1 小时*/long keyTimeout() default 3600L;
}
2. 定义 AOP 逻辑方法增强
幂等需要设置两个状态,消费中和已消费,创建对应的枚举:
java">@RequiredArgsConstructor
public enum IdempotentMQConsumeStatusEnum {/*** 消费中*/CONSUMING("0"),/*** 已消费*/CONSUMED("1");@Getterprivate final String code;/*** 如果消费状态等于消费中,返回失败** @param consumeStatus 消费状态* @return 是否消费失败*/public static boolean isError(String consumeStatus) {return Objects.equals(CONSUMING.code, consumeStatus);}
}
接下来通过 AOP 的方式进行增强注解,如果说方法上加了注解,会被这段 AOP 代码以环绕增强方式执行。具体可以参考第四节(1)(2)内容~
java">@Slf4j
@Aspect
@RequiredArgsConstructor
public final class NoMQDuplicateConsumeAspect {
private final StringRedisTemplate stringRedisTemplate;
private static final String LUA_SCRIPT = """local key = KEYS[1]local value = ARGV[1]local expire_time_ms = ARGV[2]return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)""";
/*** 增强方法标记 {@link NoMQDuplicateConsume} 注解逻辑*/@Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)")public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint);String uniqueKey = noMQDuplicateConsume.keyPrefix() + SpELUtil.parseKey(noMQDuplicateConsume.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());
String absentAndGet = stringRedisTemplate.execute(RedisScript.of(LUA_SCRIPT, String.class),List.of(uniqueKey),IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout())));
// 如果不为空证明已经有if (Objects.nonNull(absentAndGet)) {boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");if (errorFlag) {throw new ServiceException(String.format("消息消费者幂等异常,幂等标识:%s", uniqueKey));}return null;}
Object result;try {// 执行标记了消息队列防重复消费注解的方法原逻辑result = joinPoint.proceed();
// 设置防重令牌 Key 过期时间,单位秒stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS);} catch (Throwable ex) {// 删除幂等 Key,让消息队列消费者重试逻辑进行重新消费stringRedisTemplate.delete(uniqueKey);throw ex;}return result;}
/*** @return 返回自定义防重复消费注解*/public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());return targetMethod.getAnnotation(NoMQDuplicateConsume.class);}
}
被环绕增强的方法代码如下:
java">@Slf4j(topic = "CouponTaskExecuteConsumer")
public class CouponTaskExecuteConsumer implements RocketMQListener<MessageWrapper<CouponTaskExecuteEvent>> {
@NoMQDuplicateConsume(keyPrefix = "coupon_task_execute:idempotent:",key = "#messageWrapper.message.couponTaskId",keyTimeout = 120)@Overridepublic void onMessage(MessageWrapper<CouponTaskExecuteEvent> messageWrapper) {// ......}
}
可以看到消费者的onMessage方法被刚才的自定义注解标记,那么当请求路由/api/merchant-admin/coupon-task/create后经过一系列操作,CouponTaskExecuteConsumer准备消费消息时会被AOP拦截,执行前后环绕。
流程图如下
- 目标方法:访问/api/merchant-admin/coupon-task/create对应的createCouponTask方法,触发环绕拦截,等待环绕结束。
- 环绕通知前:通过SpEL生成全局唯一key,执行LUA脚本:
- 缓存中获取该key对应的值value
- 若key存在?则直接返回值(value = 1表明该消息已经处于消费完成状态;value=0表明消息正在被消费状态)抛出错误,逻辑结束
- 若key不存在则设置value=0(过期时间2min),并返回NULL
- 调用目标方法:继续执行目标方法
- 环绕通知后:设置key的value为1(即消费完成状态),过期时间2min
一些疑问?
1. 为什么要设置 2 分钟幂等?10 分钟不行么?
这个 2 分钟是个经验值,也就是说你这个消息消费的时间是否能够在 2 分钟内执行完成,如果不行需要设置更长的时间。
2. 如果 2 分钟幂等结束后有新的一模一样的请求呢?
这是个伪命题,一般的幂等都是因为网络抖动同时到达,不太可能一个消息都执行完了挺长时间,然后又有一模一样的消息再来消费。
如果面试官非要揪着这个点不放的话,可以把这个幂等标识存放到 MySQL 数据库,进行分表存储。这样存个 10 天半个月也不怕。但是要注意,MySQL 是没有到期删除机制的,还得配合定时任务删除之前的无效数据。