「Kafka」Kafka消息可靠性和重复消费问题(五)

ops/2024/10/9 5:05:28/

在 Kafka 中,实现消息的可靠性避免重复消费是保证数据一致性和系统稳定性的关键。Kafka 提供了多种机制来实现这两个目标。

1. Kafka 消息可靠性

Kafka 的可靠性主要体现在消息的投递存储上,以确保消息不会丢失。具体来说,有以下几个措施:

1.1. 副本机制(Replication)
  • Kafka 中的每个分区可以配置多个副本。一个分区的主副本(Leader)处理消息的读写请求,其他副本(Follower)复制 Leader 的数据。当 Leader 出现故障时,Kafka 会选择另一个可用的 Follower 作为新的 Leader,从而保证分区的数据不丢失。
  • 副本因子的配置 replication.factor 可以设为 2 或 3,通常在生产环境中建议至少使用 3。
1.2. ACK机制
  • 生产者发送消息时可以配置 acks 参数来控制消息写入的可靠性:
    • acks=0:生产者发送消息后不会等待确认,即消息发送后可能立即丢失。
    • acks=1:生产者会等待 Leader 副本的写入确认,减少网络延迟,但如果 Leader 写入后立即宕机,消息可能会丢失。
    • acks=all:所有同步副本完成写入确认,生产者才认为消息成功写入,这可以保证最高级别的可靠性。
  • 可靠性要求较高的情况下,建议使用 acks=all
1.3. 持久化机制
  • Kafka 使用分区日志来存储消息,数据一旦写入分区就会持久化到磁盘。Kafka 依赖操作系统的页缓存进行高效磁盘写入,提高吞吐量并确保数据持久化。
  • 可以通过配置 flush.messagesflush.ms 控制消息写入磁盘的频率。
1.4. 生产者重试机制
  • 生产者可以设置 retries 参数,指定消息发送失败时的重试次数,从而保证在临时网络或 Leader 故障时,消息不会轻易丢失。
  • 与之配合使用的还有 max.in.flight.requests.per.connection,用于限制并发请求数,以避免顺序性问题。

2. 重复消费

尽管 Kafka 设计为“至少一次”投递的系统,这意味着消费者可能会重复消费消息。以下方法可以用来减少或避免重复消费:

2.1. 幂等生产者
  • Kafka 在 0.11 版本后引入了幂等生产者,通过设置 enable.idempotence=true 实现。幂等生产者能确保在重试的情况下,消息不会被重复写入分区。这使得同一消息在生产者端只被写入一次,避免了重复生产。
2.2. 事务性生产者和消费者
  • Kafka 支持端到端事务,允许生产者和消费者在事务模式下进行消息的写入和消费。事务可以确保消息消费和处理是原子性的,防止重复消费。
  • 生产者可以使用 initTransactionscommitTransaction 等方法,将多条消息当作一个事务写入多个分区。
  • 消费者端可以开启 read_committed 模式,以确保仅消费已提交的消息。
2.3. 手动提交偏移量
  • 在 Kafka 中,消费者可以通过手动提交偏移量来控制重复消费问题。手动提交偏移量后,只有在确认处理成功后提交,避免因消费者宕机导致重复消费。
  • 例如,消费者处理消息成功后,可以调用 commitSync() 提交偏移量;遇到异常时则不提交,从而在下次重试时重新处理该消息。
2.4. 消费幂等性
  • 为了完全消除重复消费的影响,消费者应用程序需要具备幂等性设计。即使消息重复消费,也能确保消息处理的最终结果不变。例如,在数据库更新时使用唯一主键或进行重复校验,以避免重复写入。

小结

Kafka 提供了副本机制、ACK 机制、幂等生产者、事务性消费者等多种机制来提升消息的可靠性和减少重复消费。但从应用层面上来看,为了彻底避免重复消费的影响,还需消费者端具备幂等性设计。


http://www.ppmy.cn/ops/123003.html

相关文章

注册安全分析报告:惠农网

前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 暴力破解密码,造成用户信息泄露短信盗刷的安全问题,影响业务及导致用户投诉带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞…

【C++指南】类和对象(二):类的默认成员函数——全面剖析 :构造函数

💓 博客主页:倔强的石头的CSDN主页 📝Gitee主页:倔强的石头的gitee主页 ⏩ 文章专栏:《C指南》 期待您的关注 ​ 阅读本篇文章之前,你需要具备的前置知识:类和对象的基础 点击下方链接 【C指南…

G2O 通过工厂函数类 OptimizationAlgorithmFactory 来生成固定搭配的优化算法

OptimizationAlgorithmFactory 类位于 optimization_algorithm_factory.h //***g2o源码 g2o/g2o/core/optimization_algorithm_factory.h ***// /*** \brief create solvers based on their short name** Factory to allocate solvers based on their short name.* The Factor…

使用CANFD路由实现CAN与CANFD互通

随着科技的发展,汽车电子和工业领域中CAN通信需要承载数据量也越来越大,传统CAN通信有了向CANFD通信过渡的倾向。在实现过渡的过程中可能会出现自己设备是CAN通信,客户设备是CANFD通信的情况,或者自己设备是CANFD通信,…

Street Gaussians 学习笔记

目录 3D Gaussian Splatting 依赖项: diff-gaussian-rasterization 浙大和理想汽车提出Street Gaussians:用于动态城市场景建模 3D Gaussian Splatting 最近的一项工作3D Gaussian Splatting (3D GS),在3D世界中定义了一组各向异性的高斯…

从0开始下载安装并使用unity

首先我们要在浏览器上找到unity的官网 这一个就是了,我们点进去后是这个界面: 然后我们点击上面这张图的左下角的“下载Unity Hub”,推荐后续安装都装在D盘: 这里他会让我们注册一个账号,如果之前有的话登录就行了&am…

【TypeScript】抽象类 interface type的异同

#抽象类 interface type相似点很多,容易混淆使用# 抽象类(abstract) 定义类的格式,既可以包含抽象方法,也可以包含具体方法 一个类只能继承(extends)一个抽象类 abstract class Person {constructor(public name: s…

MySQL运维

MySQL运维 创建健壮的MySQL健康检查Python类 在本文中,我们将介绍如何创建一个强大而灵活的Python类,用于封装MySQL运维命令并提供易用的接口。这个类不仅支持后续扩展,还提供完备的响应和错误信息,同时要求必要的登录信息以确保…