Kafka是如何实现幂等性的??

embedded/2025/3/29 17:20:26/

Kafka通过幂等生产者(Idempotent Producer)机制来实现消息的幂等性,确保每条消息在Kafka中只被处理一次,即使在生产者重试发送的情况下也不会导致重复消息。以下是Kafka实现幂等性的详细说明:

1. 幂等生产者的基本概念

幂等性(Idempotence):指操作的结果不会因为多次执行而改变。在Kafka中,幂等性确保每条消息在Topic的Partition中只被写入一次。

2. 幂等生产者的启用

要启用幂等生产者,需要在生产者配置中设置以下参数:

  • enable.idempotence: 设置为 true 以启用幂等生产者。
  • transactional.id: 可选参数,用于事务性生产者。如果需要事务支持,必须设置此参数。
示例配置:
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
enable.idempotence=true

3. 幂等生产者的内部机制

3.1. 生产者ID(Producer ID)
  • 唯一标识:每个幂等生产者在启动时会从Kafka集群获取一个唯一的 Producer ID
  • 持久化Producer ID 由Kafka集群持久化存储,确保在生产者重启后仍然有效。
3.2. 序列号(Sequence Number)
  • 递增序列:每个Partition维护一个递增的序列号。
  • 唯一性:每条消息在发送时会携带一个唯一的序列号,确保消息在Partition中的唯一性。
3.3. 请求重试
  • 自动重试:幂等生产者会自动重试发送失败的消息。
  • 幂等性保证:即使消息被重试多次,Kafka也会确保每条消息只被写入一次。

4. 幂等生产者的使用示例

4.1. 配置生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class IdempotentProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");KafkaProducer<String, String> producer = new KafkaProducer<>(props);try {for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);producer.send(record, (metadata, exception) -> {if (exception != null) {exception.printStackTrace();} else {System.out.printf("Sent message to topic %s partition %d with offset %d%n",metadata.topic(), metadata.partition(), metadata.offset());}});}} finally {producer.close();}}
}
4.2. 验证幂等性
  • 发送重复消息:即使生产者发送了重复的消息,Kafka也会确保每条消息只被写入一次。
  • 验证结果:可以通过Kafka的消费者来验证Topic中的消息是否重复。

5. 幂等生产者的限制

  • 单分区幂等性:幂等性保证的是单个Partition内的消息唯一性,而不是整个Topic。
  • 顺序保证:幂等生产者保证消息在Partition内的顺序,但不保证消息在Topic内的全局顺序。
  • 性能影响:启用幂等性可能会带来一定的性能开销,尤其是在高吞吐量的场景下。

6. 幂等生产者与事务性生产者的关系

  • 幂等生产者:确保单条消息的幂等性。
  • 事务性生产者:提供更高级的事务支持,确保多条消息的原子性。
6.1. 启用事务性生产者
  • transactional.id:必须设置此参数。
  • enable.idempotence:默认为 true,不需要显式设置。
6.2. 示例配置
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
transactional.id=my-transactional-id
6.3. 示例代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class TransactionalProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<>("my-topic", "key1", "value1"));producer.send(new ProducerRecord<>("my-topic", "key2", "value2"));producer.commitTransaction();} catch (Exception e) {producer.abortTransaction();e.printStackTrace();} finally {producer.close();}}
}

7. 总结

Kafka通过幂等生产者机制确保每条消息在Partition中只被写入一次,即使在生产者重试发送的情况下也不会导致重复消息。启用幂等生产者需要设置 enable.idempotence=true,并且Kafka会自动处理消息的唯一性和顺序性。对于更高级的事务支持,可以使用事务性生产者,设置 transactional.id 参数。

通过合理配置和使用幂等生产者,可以有效避免因Rebalance或其他原因引起的重复消费问题,确保消息的可靠性和一致性。


http://www.ppmy.cn/embedded/176845.html

相关文章

【Linux-驱动开发-驱动分类】

Linux-驱动开发-驱动分类 ■ Linux-驱动分类■ Linux-字符设备■ 字符设备-注册与注销函数■ 字符设备-具体操作函数■ 字符设备-LICENSE 和作者信息■ 示例一&#xff1a;■ 示例一&#xff1a; 寄存器物理地址映射■ 新字符设备驱动■ 示例一&#xff1a;新字符设备驱动 ■ L…

单片机中C++的局部static变量的初始化仍然遵循控制流

实验 执行如下测试代码 class Test { public:Test(){bsp::di::Console().WriteLine("构造");} };void test_func() {bsp::di::Console().WriteLine("第一条语句");static Test test; }执行两次 test_func &#xff0c;在串口观察输出 可以看到 static …

基于STC89C51的太阳自动跟踪系统的设计与实现—单片机控制步进电机实现太阳跟踪控制(仿真+程序+原理图+PCB+文档)

摘 要 随着我国经济的飞速发展&#xff0c;促使各种能源使用入不敷出&#xff0c;尤其是最主要的能源&#xff0c;煤炭石油资源不断消耗与短缺&#xff0c;因此人类寻找其他替代能源的脚步正在加快。而太阳能则具有无污染﹑可再生﹑储量大等优点&#xff0c;且分布范围广&…

TDengine 3.3.2.0 集群报错 Post “http://buildkitsandbox:6041/rest/sql“

原因&#xff1a; 初始化时处于内网环境下&#xff0c;Post “http://buildkitsandbox:6041/rest/sql“ 无法访问 修复&#xff1a; vi /etc/hosts将buildkitsandbox映射为本机节点 外网环境下初始化时没有该问题

DeepSeek V3 0324:在Mac Studio上点燃的AI核爆

一、一场没有烟花的核爆&#xff1a;DeepSeek的无声颠覆 1.1 静默发布背后的核聚变能量 2025年3月25日&#xff0c;DeepSeek在Hugging Face上悄然上传了一个名为V3 0324的模型&#xff0c;没有预热、没有发布会&#xff0c;甚至没有一张产品海报。但6850亿参数的庞然大物&…

如何理解 Apache Iceberg 与湖仓一体(Lakehouse)?

一、什么是湖仓一体&#xff08;Lakehouse&#xff09;&#xff1f; 湖仓一体是一种融合了数据湖的灵活存储能力与数据仓库的高效分析功能的现代数据架构。它通过整合两者的优势&#xff0c;解决了传统架构的局限性&#xff0c;为企业数据处理提供了更全面的解决方案。 数据湖…

蓝桥杯算法精讲:二分查找实战与变种解析

适合人群&#xff1a;蓝桥杯备考生 | 算法竞赛入门者 | 二分查找进阶学习者 目录 一、二分查找核心要点 1. 算法思想 2. 适用条件 3. 算法模板 二、蓝桥杯真题实战 例题&#xff1a;分巧克力&#xff08;蓝桥杯2017省赛&#xff09; 三、二分查找变种与技巧 1. 查找左边…

Ubuntu部署Docker搭建靶场

前言 我们需要部署Docker来搭建靶场题目&#xff0c;他可以提供一个隔离的环境&#xff0c;方便在不同的机器上部署&#xff0c;接下来&#xff0c;我会记录我的操作过程&#xff0c;简单的部署一道题目 Docker安装 不推荐在物理机上部署&#xff0c;可能会遇到一些问题&…