Kafka【八】如何保证消息发送的可靠性、重复性、有序性

news/2024/9/17 7:21:32/ 标签: kafka, 分布式

【1】消息发送的可靠性保证

对于生产者发送的数据,我们有的时候是不关心数据是否已经发送成功的,我们只要发送就可以了。在这种场景中,消息可能会因为某些故障或问题导致丢失,我们将这种情况称之为消息不可靠。虽然消息数据可能会丢失,但是在某些需要高吞吐,低可靠的系统场景中,这种方式也是可以接受的,甚至是必须的。

但是在更多的场景中,我们是需要确定数据是否已经发送成功了且Kafka正确接收到数据的,也就是要保证数据不丢失,这就是所谓的消息可靠性保证。

而这个确定的过程一般是通过Kafka给我们返回的响应确认结果(Acknowledgement)来决定的,这里的响应确认结果我们也可以简称为ACK应答。根据场景,Kafka提供了3种应答处理,可以通过配置对象进行配置。

在 Apache Kafka 中,ACK(Acknowledgment)指的是生产者在发送消息后,从 Kafka Broker 接收到的确认信号。这种确认机制是用来保证消息发送的可靠性的。Kafka 支持不同的 ACK 策略,这些策略允许生产者根据自己的需求来配置不同的确认级别。以下是 Kafka 中关于 ACK 的几个选项:

  1. No Acknowledgment (acks = 0)

    • 在这种模式下,生产者在发送消息后不会等待任何确认就认为消息已经被成功发送。这意味着如果 Broker 在写入消息之前崩溃,消息可能会丢失。这种方式提供了最高的吞吐量,但是没有可靠性保障。
  2. Leader Acknowledgment (acks = 1)

    • 在这种模式下,生产者在发送消息后会等待 Leader 副本确认消息已被写入。如果在确认之后 Leader 崩溃,那么消息仍然可能会丢失,因为还没有同步到 Follower 副本。这种方式提供了较好的吞吐量,但仍然存在一定的数据丢失风险。
  3. All In-Sync Replicas Acknowledgment (acks = all 或 acks = -1)

    • 这是最严格的确认策略,生产者在发送消息后会等待所有 ISR(In-Sync Replicas)的确认。这意味着消息不仅写入了 Leader,还同步到了所有的 Follower 副本。这种方式虽然降低了吞吐量,但是提供了最强的数据持久性和可靠性保障。

选择哪种 ACK 策略取决于应用的具体需求。如果对数据丢失有严格的要求,那么通常会选择 acks=all,以确保消息的持久性和可靠性;如果对性能要求较高,并且可以接受一定程度的数据丢失风险,那么可以选择较低级别的确认策略。

需要注意的是,使用 acks=all 时,如果 ISR 中的任何一个副本无法同步消息,那么生产者将无法发送新的消息,直到问题解决。因此,在配置 ACK 时,也需要考虑集群的健康状况和副本的数量。

假设我们的分区有5个follower副本,编号为1,2,3,4,5:

在这里插入图片描述

但是此时只有3个副本处于和Leader副本之间处于数据同步状态,那么此时分区就存在一个同步副本列表,我们称之为In Syn Replica,简称为ISR。此时,Kafka只要保证ISR中所有的4个副本接收到了数据,就可以对数据请求进行响应了。无需5个副本全部收到数据。

【2】消息发送的重复性

kafka为了提高数据可靠性提供了重试机制用来解决消息丢失问题。如果禁用重试机制,那么一旦数据发送失败,数据就丢失了。而数据重复,恰恰是因为开启重试机制后,如果因为网络阻塞或不稳定,导致数据重新发送。那么数据就有可能是重复的。

kafka提供了幂等性操作解决数据重复,并且幂等性操作要求必须开启重试功能和ACK取值为-1。

在 Apache Kafka 中,解决消息重复发送的问题通常涉及以下几个方面:

1. 幂等性生产者

Kafka 0.10.1 版本引入了幂等性生产者(Idempotent Producers)。启用幂等性后,生产者可以保证消息不会被重复发送。幂等性生产者依赖于事务日志来跟踪已发送的消息,并确保即使生产者崩溃,消息也只会被发送一次。

  • 实现原理
    • 生产者为每条消息附加一个序列号。
    • Broker 使用序列号来检查消息是否已经被发送过。
    • 如果 Broker 发现序列号冲突,则拒绝该消息。
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 对生产的数据K, V进行序列化的操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configMap.put(ProducerConfig.ACKS_CONFIG, "-1");//ACK应答
configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//开启幂等性
configMap.put(ProducerConfig.RETRIES_CONFIG, 5);
configMap.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configMap);

kafka提供的幂等性操作只能保证同一个生产者会话中同一个分区中的数据不会重复,一旦数据发送过程中,生产者对象重启,那么幂等性操作就会失效。那么此时就需要使用Kafka的事务功能来解决跨会话的幂等性操作。但是跨分区的幂等性操作是无法实现的。

2. 事务支持

Kafka 从 0.11 版本开始支持事务,这使得生产者可以在事务上下文中发送消息。事务可以确保消息要么全部发送成功,要么全部不发送,从而避免部分消息丢失或重复发送的问题。

  • 实现原理

    • 生产者开启事务,并在一个事务中发送一系列消息。
    • 生产者在消息发送完成后提交事务。
    • 如果生产者崩溃或出现其他异常,则可以回滚事务,取消未完成的消息发送。

    事务支持可以用于确保消息的一致性和完整性。

【3】幂等性与事务支持

幂等性生产者(Idempotent Producers)和事务支持(Transactional Support)是两种不同的机制,它们各自解决了不同的问题,但在实际应用中可以结合起来使用。

幂等性生产者(Idempotent Producers)

幂等性生产者的设计目的是为了确保即使生产者崩溃或重试消息发送,消息也只被写入一次,从而避免重复消息。幂等性生产者不需要开启事务,而是通过以下机制来实现这一目标:

  • 消息序列化:生产者为每个分区的消息生成一个唯一的序列号。
  • 校验重复:Broker 会在接收到消息时检查序列号,如果发现序列号已经存在,则会拒绝这条消息。

幂等性生产者适用于那些希望避免重复消息,但又不需要事务一致性的情况。也就是说,它保证了即使生产者崩溃或重试发送,消息依然只被写入一次,但它并不保证消息的全局顺序或跨分区的一致性。

事务支持(Transactional Support)

事务支持则是为了实现更高级别的消息一致性和原子性,确保消息要么全部发送成功,要么全部不发送。事务支持可以用来处理跨多个分区甚至跨不同系统的复杂操作,确保这些操作作为一个整体成功或失败。

  • 事务上下文:生产者在事务上下文中发送消息,确保消息的发送是原子性的。
  • 提交或回滚:生产者可以在消息处理成功后提交事务,或者在处理失败时回滚事务。

事务支持适用于需要确保消息处理的原子性和一致性的场景,特别是在涉及到跨多个分区或多系统协调的情况下。

幂等性与事务的结合

在一些场景中,你可能会结合使用幂等性生产和事务支持,以达到更高的可靠性和一致性。例如:

  • 幂等性生产者 可以用来防止消息的重复发送。
  • 事务支持 可以用来确保跨多个分区或系统的操作的一致性。

在这种情况下,幂等性生产者确保单个消息不会重复写入,而事务支持则确保整个操作的原子性。

总结

  • 幂等性生产者:防止消息重复发送,适用于单个消息级别的去重。
  • 事务支持:确保操作的原子性和一致性,适用于需要跨分区或系统的一致性操作。

因此,幂等性生产者并不是必须与事务隔离使用才能保证消息的唯一性。相反,幂等性生产者本身就是为了解决消息重复发送问题而设计的。事务支持则是为了实现更高级别的数据一致性和操作的原子性。两者可以独立使用,也可以结合使用以满足不同的需求。

【4】消息发送的有序性保证

在 Apache Kafka 中,保证消息发送的有序性主要依赖于以下几种机制和策略:

1. 单一分区内的消息有序

Kafka 默认保证在一个主题(Topic)的单个分区(Partition)内部的消息是有序的。这是因为消息是按顺序追加到分区的日志文件中的。因此,如果你需要确保消息在主题内的顺序,可以将所有相关消息都发送到同一个分区。

如何实现单一分区内消息有序:

  • 固定分区器:你可以通过设置固定的分区器(Partitioner),使所有具有相同键的消息都被发送到同一个分区。例如,使用相同的键(Key)可以使消息被发送到同一分区,从而在该分区内保持顺序。

2. 使用幂等性生产者

尽管幂等性生产者的主要目的是防止消息重复发送,但如果你使用相同的键发送消息,并且启用了幂等性生产者,那么所有具有相同键的消息将被发送到同一个分区,并且在这个分区内保持顺序。

示例配置

假设你需要确保所有消息在一个主题内是有序的,你可以这样配置生产者:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保证消息发送顺序
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 启用幂等性// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

这里的关键配置点是 max.in.flight.requests.per.connection 设置为 1,这可以确保在单个连接上一次只发送一条消息,从而在单一分区内保持消息的顺序。

注意事项

  • 单一分区限制:虽然单一分区内部可以保证消息有序,但这意味着所有相关消息都需要发送到同一个分区,这可能会导致性能瓶颈。
  • 并发处理:如果你需要高并发处理,而不仅仅关注消息的顺序,那么可能需要在多个分区之间平衡负载,并且在客户端实现适当的逻辑来处理顺序问题。

通过上述方法,Kafka 可以在不同程度上保证消息的有序性,但通常需要在性能和有序性之间做出权衡。


http://www.ppmy.cn/news/1522607.html

相关文章

proxy代理解决vue中跨域问题

vue.config.js module.exports {...// webpack-dev-server 相关配置devServer: {host: 0.0.0.0,port: port,open: true,proxy: {/api: {target: https://vfadmin.insistence.tech/prod-api,changeOrigin: true,pathRewrite: {//[^ process.env.VUE_APP_BASE_API]: ^/api: / …

【 html+css 绚丽Loading 】000044 两仪穿行轮

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享htmlcss 绚丽Loading&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495…

【sql】评估数据迁移复杂度调查汇报240904

难度判断标准&#xff1a; - 高难度&#xff1a;使用多个表&#xff08;TBL&#xff09;或有多个join操作的工具 - 低难度&#xff1a;表数量少且没有join操作的简单工具 - 中等难度&#xff1a;介于高低之间&#xff0c;有少量join操作的工具 5. 最后说明不需要仔细…

25届计算机毕业设计:3步打造北部湾助农平台,Java SpringBoot实践

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

AF透明模式/虚拟网线模式组网部署

透明模式组网 实验拓扑 防火墙基本配置 接口配置 eth1 eth3 放通策略 1. 内网用户上班时间&#xff08;9:00-17:00&#xff09;不允许看视频、玩游戏及网上购物&#xff0c;其余时 间访问互联网不受限制&#xff1b;&#xff08;20 分&#xff09; 应用控制策略 2. 互联…

[论文笔记]RAFT: Adapting Language Model to Domain Specific RAG

引言 今天带来一篇结合RAG和微调的论文&#xff1a;RAFT: Adapting Language Model to Domain Specific RAG。 为了简单&#xff0c;下文中以翻译的口吻记录&#xff0c;比如替换"作者"为"我们"。 本文介绍了检索增强微调(Retrieval Augmented Fine Tunin…

【Impala SQL 造数(一)】

前言 SQL 造数即生成测试数据&#xff0c;一般是编码完成之后的测试阶段所需&#xff0c;测试数据可以用于多种目的&#xff0c;包括测试应用程序的功能、业务场景测试、性能测试、数据恢复测试等。在测试阶段特别是数据类需求&#xff0c;需要很多造数场景&#xff0c;像 Hiv…

尚品汇-支付宝支付同步异步回调实现(四十七)

目录&#xff1a; &#xff08;1&#xff09;订单支付码有效时间 &#xff08;2&#xff09;支付后回调—同步回调 &#xff08;3&#xff09;支付宝回调—异步回调 &#xff08;1&#xff09;订单支付码有效时间 &#xff08;2&#xff09;支付后回调—同步回调 static修饰…

【Jupyter Notebook】安装与使用

打开Anaconda Navigator点击"Install"(Launch安装前是Install)点击"Launch"点击"File"-"New"-"Notebook"​ 5.点击"Select"选择Python版本 6.输入测试代码并按"Enter+Shift"运行代码: 代码如下: …

考研系列-408真题数据结构篇(18-23)

写在前面 此文章是本人在备考过程中408真题数据结构部分(2018年-2023年)的易错题及相应的知识点整理,后期复习也尝尝用到,对于知识提炼归纳理解起到了很大的作用,分享出来希望帮助到大家~ # 2018年 1.堆的建立(从后往前进行调整) 2.算法(正整数和数组之间关系的建立)

k8s API资源对象ingress

有了Service之后&#xff0c;我们可以访问这个Service的IP&#xff08;clusterIP&#xff09;来请求对应的Pod&#xff0c;但是这只能是在集群内部访问。 要想让外部用户访问此资源&#xff0c;可以使用NodePort&#xff0c;即在node节点上暴漏一个端口出来&#xff0c;但是这…

pytorch+深度学习实现图像的神经风格迁移

本文的完整代码和部署教程已上传至本人的GitHub仓库&#xff0c;欢迎各位朋友批评指正&#xff01; 1.各代码文件详解 1.1 train.py train.py 文件负责训练神经风格迁移模型。 加载内容和风格图片&#xff1a;使用 utils.load_image 函数加载并预处理内容和风格图片。初始化…

Banana Pi BPI-SM9 AI 计算模组采用算能科技BM1688芯片方案设计

产品概述 香蕉派 Banana Pi BPI-SM9 16-ENC-A3 深度学习计算模组搭载算能科技高集成度处理器 BM1688&#xff0c;功耗低、算力强、接口丰富、兼容性好。支持INT4/INT8/FP16/BF16/FP32混合精度计算&#xff0c;可支持 16 路高清视频实时分析&#xff0c;灵活应对图像、语音、自…

Java中等题-摆动序列(力扣)

如果连续数字之间的差严格地在正数和负数之间交替&#xff0c;则数字序列称为 摆动序列 。第一个差&#xff08;如果存在的话&#xff09;可能是正数或负数。仅有一个元素或者含两个不等元素的序列也视作摆动序列。 例如&#xff0c; [1, 7, 4, 9, 2, 5] 是一个 摆动序列 &…

数据库锁之行级锁、记录锁、间隙锁和临键锁

1. 行级锁 InnoDB 引擎支持行级锁&#xff0c;而MyISAM 引擎不支持行级锁&#xff0c;只支持表级锁。行级锁是基于索引实现的。 对于普通的select语句&#xff0c;是不会加记录锁的&#xff0c;因为它属于快照读&#xff0c;通过在MVCC中的undo log版本链实现。如果要在查询时对…

Python 安装selenium的办法

之前一直安装python以为要进入python的菜单进行输入 如下 老是提示错误,原来是我搞错了,安装这个直接进入cmd即可 如下 pip install selenium 再用pip list查看一下是否安装成功

git 提交代码由原先账号密码调整为ssh

如果你希望将 Git 提交代码的身份验证方式从用户名和密码切换到 SSH&#xff0c;你需要进行以下几个步骤&#xff1a; 1. 生成 SSH 密钥对 如果你还没有 SSH 密钥对&#xff0c;可以使用以下命令生成一个新的密钥对&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_em…

基于SpringBoot校园快递代取系统

基于springbootvue实现的校园快递代取系统&#xff08;源码L文ppt&#xff09;4-049 3系统设计 3.1.1系统结构图 系统结构图可以把杂乱无章的模块按照设计者的思维方式进行调整排序&#xff0c;可以让设计者在之后的添加&#xff0c;修改程序内容…

C++设计模式——Chain of Responsibility职责链模式

一&#xff0c;职责链模式的定义 职责链模式&#xff0c;又被称为责任链模式&#xff0c;是一种行为型设计模式&#xff0c;它让多个对象依次处理收到的请求&#xff0c;直到处理完成为止。 职责链模式需要使用多个对象&#xff0c;其中的每个对象要么处理请求&#xff0c;要…

iPhone手机清理软件:照片清理功能全解析

在数字化生活中&#xff0c;智能手机成为我们记录生活点滴的主要工具&#xff0c;尤其是iPhone&#xff0c;以其卓越的相机功能备受用户青睐。然而&#xff0c;成千上万的照片迅速堆积&#xff0c;不仅占用了大量存储空间&#xff0c;还使得设备运行缓慢。在众多解决方案中&…