【消息队列】聊一下如何避免消息的重复消费

news/2024/12/1 0:37:32/

什么是重复消费

一条消息在传输过程中,为了保证消息的不丢失,可能会多少量的消息进行重试,这样就可能导致Broker接受到的消息出现重复,如果说下游系统没有针对业务上的处理,那么可能导致同一笔借款或者支付订单出现重复扣款或者重复还款的情况。业务上是不允许出现的。
在MQTT 协议,给出了如下三种方式

  • 至多一次 (At Most Once) : 也就是针对每条消息即使出现异常的情况,也只会发送一次。应用场景:在一些硬件数据上传热点数据,可以使用。
  • 至少一次(At Least Once) : 为了保证消息不可靠传输,可能针对少量消息进行重复发送,那么就会出现同一个消息重复出现。
  • 恰好一次(Exactly Once) : 消息只会发送一次,不允许丢失也不会重复。

幂等性

幂等在计算机中是非常重要的概念,说白了就是多次执行一段函数的结果是一样的,比如说 * 1,那么 执行多次结果都是本身,但是如果是+1的操作,那么就不是幂等,每次结果都会加1。而数据库中增删改查,只有查是幂等,其他都会修改数据原有的状态。

在重试的场景下,我们一般都需要进行重试&幂等 进行配合使用,因为在数据在网络中传输,比如出现网络抖动,数据丢包以及挖掘机挖断网线(开玩笑😝了)这个时候 一般上游系统就会重试操作,而下游系统就需要支持幂等。
往大了说在分布式高可用设计架构中,幂等&重试 也是我们需要在架构设计中要考虑的要点,不仅仅设计中间件的应用中,所以你看知识都是相通的。

接着我们说,既然可能出现消息的重复消费,那么要么在Producer端保证消息的幂等发送,要么在Consumer端保证幂等消费。

Producer 幂等

在Kafka 0.11版本后,引入了幂等性和事务。具体就是通过在配置对象中添加如下
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
Producer 自动升级成幂等性 Producer
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。
在这里插入图片描述
所以幂等性只能保证的是在单分区单会话内不重复。 说白了就是,每次重启Broker,就会失效。
但是如果我们想保证在所有分区下Producer的幂等性,就需要考虑事务型Producer。这也是幂等型Producer和事务型Producer的最大区别。

Kafka事务原理

在这里插入图片描述

事务Producer

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
设置事务型 Producer 的方法也很简单,满足两个要求即可:

  • 和幂等性 Producer 一样,开启 enable.idempotence = true
  • 设置 Producer 端参数 transctional. id。最好为其设置一个有意义的名字。
		kafkaProducer.initTransactions(); //初始化kafkaProducer.beginTransaction(); //开启事务try {for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("test1","aaa"+i));}kafkaProducer.commitTransaction(); //提交事务} catch (Exception e) {kafkaProducer.abortTransaction(); //出现异常 事务回滚}

幂等性 Producer 只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。
但是事务型性能比较差。

Consumer幂等

上面我们说了生产端的幂等发送,但是只在生产端保证其实不够,消费端也需要保证幂等消费。
也即:At least once + 幂等消费 = Exactly once。

  • 数据库唯一约束/redis set Nx
  • 为更新数据添加条件
  • 记录并检查操作

我们来总体说一下上面的三种方案,
第一种,一般就是在消费端消费的时候将消费数据插入到DB中,并且保证唯一约束,当消费一次后,即使同一条消息在消费一次,那么数据库层面唯一约束也会保证存在记录,不会执行后面的逻辑。或者使用redis的setNx进行保证。
第二种,在消费端消费消息的时候,可以判断一下当前的状态或者数据是否是已经处理过的状态,如果是直接丢弃,比如生产端发送的消息是初始化订单状态,但是消费端如果接受到的是处理中订单,说明已经处理过,没必要进行处理。

小结

本节主要介绍了消息重复消费,以及引出幂等性,然后分别是生产端(幂等/事务)以及消费端的幂等,但是在开篇已经说了,幂等超时重试不仅仅在消息队列中存在,在HTTP服务设计,保证表单或者APP重复提交,以及在微服务设计幂等,保证RPC自动重试也是同样适用的。


http://www.ppmy.cn/news/39067.html

相关文章

【网口交换机:交换机KSZ9897学习-笔记-资料汇总-记录】

【网口交换机&#xff1a;交换机KSZ9897学习-笔记-资料汇总-记录】1、概述2、 自己的学习与摸索之路第一阶段&#xff1a;随意在网上查找相关资料第二阶段&#xff1a;针对性在网上资料第三阶段&#xff1a;测试并且使用开发板第四阶段&#xff1a;针对性使用工具进行测试。2、…

操作系统-内存管理

一、总论 1.1 硬件术语 ​ 为了不让读者懵逼&#xff08;主要是我自己也懵逼&#xff09;&#xff0c;所以特地整理一下在后面会用到术语。 ​ 我们电脑上有个东西叫做内存&#xff0c;他的大小比较小&#xff0c;像我的电脑就是 16 GB 的。它是由 ROM 和 RAM 组成的&#x…

(5)(5.9) 推力损失和偏航不平衡警告

文章目录 前言 1 潜在的推力损失 2 偏航不平衡 前言 如果你看到推力损失或偏航不平衡的警告&#xff0c;这个页面概述了一些应该做的检查和修改来解决这个问题。在大多数情况下&#xff0c;这些警告是由于错误的硬件选择或设置造成的。 这些警告是为了检测推进系统的硬件故…

【C语言】预处理和程序环境

目录 程序的环境 运行环境 翻译环境 编译的过程 预编译阶段 编译阶段 汇编阶段 链接阶段&#xff08;不属于编译阶段&#xff09; 预处理详解 预定义符号 #define #define定义标识符 #define定义宏 #define的替换规则 #和##的使用 带副作用的宏参数 宏和…

二叉树的5个性质【要点:完全二叉树的性质】

只讲不会的 普通二叉树就要讲排列顺序了&#xff01;&#xff01;&#xff01; 预备&#xff1a;满二叉树&#xff1a;1.前提是它必须是二叉树 2.每个结点&#xff08;除了终端结点外&#xff09;都是2个子女。 要点1&#xff1a;关于普通的树的结点的计算&#xff0…

Transformer 笔记目录

一、介绍 导论&#xff1a;Transformer 背景介绍&#xff0c;Transformer 能胜任的任务介绍。相关知识&#xff1a;深度学习基础&#xff08;神经网络&#xff0c;回归&#xff0c;分类&#xff0c;优化&#xff0c;激活函数等&#xff09;&#xff0c;具体介绍序列到序列模型…

[linux]基础IO

文章目录基础IO1. 重新谈论文件1.1 准备工作1.1.1 提出问题1.1.2 达成共识1.2 回忆C语言文件操作1.2.1 写文件辨析fprintfsnprintf1.2.2 读文件1.2.3 向文件追加1.3 文件操作的系统调用1.3.1 OS接口open的介绍(比特位标记)1.3.2 写入操作1.3.3 追加操作1.3.4 只读操作1.4 回答问…

Spring 6 IOC容器加载过程与核心方法refresh源码浅析

前言&#xff1a;本篇只对主线核心逻辑进行梳理分析&#xff0c;本篇以AnnotationConfigApplicationContext容器为例进行切入分析【Spring版本为: v6.0.2】 一、实例化容器AnnotationConfigApplicationContext 我们启动容器的时候&#xff0c;虽然只是new了一个AnnotationConf…