如何保障kafka的数据不会重复消费呢,如何防止漏掉呢

news/2025/3/30 20:56:19/

在 Kafka 中保障数据不重复消费且不丢失,需要从生产者、消费者和 Kafka 自身配置三个层面综合设计。以下是具体实现方案:

一、防止数据重复消费

1. 消费者端控制
  • 手动提交 Offset
    禁用自动提交(enable.auto.commit=false),在消息处理完成后手动提交 Offset。

    from kafka import KafkaConsumerconsumer = KafkaConsumer('topic',bootstrap_servers='localhost:9092',group_id='group',enable_auto_commit=False  # 禁用自动提交
    )for message in consumer:process_message(message.value)  # 处理消息consumer.commit()  # 手动提交 Offset
    

    注意:需确保消息处理逻辑的幂等性(如通过数据库唯一约束或业务 ID 去重)。

  • 幂等性消费者
    使用 Kafka 消费者的幂等性特性(isolation.level=read_committed),结合事务保证消息处理与 Offset 提交的原子性。

2. 生产者端控制
  • 幂等性生产者
    启用生产者幂等性(enable.idempotence=true),确保重复发送的消息不会被 Kafka 重复写入。
3. Kafka 配置
  • 事务支持
    使用 Kafka 事务(transactional.id),保证生产者发送消息与消费者提交 Offset 的原子性。

二、防止数据丢失

1. 生产者端配置
  • 强确认机制
    设置 acks=all(或 -1),确保消息被所有 ISR(In-Sync Replicas)副本接收后才确认成功。

    from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all',  # 等待所有副本确认retries=3  # 重试次数
    )
    
  • 重试机制
    配置 retries 参数,当消息发送失败时自动重试(需结合 max.in.flight.requests.per.connection 控制并发请求数)。

2. 消费者端配置
  • 手动提交 Offset
    确保消息处理完成后再提交 Offset,避免自动提交导致未处理消息被标记为已消费。

  • 异常处理
    在消息处理逻辑中捕获异常,避免因程序崩溃导致未提交 Offset,从而触发重新消费。

3. Kafka 集群配置
  • 副本机制
    设置 replication.factor >= 2(建议 3),并配置 min.insync.replicas >= 2,确保消息至少被两个副本保存。

  • 日志保留策略
    合理设置 retention.ms(如 7 天),避免消息被过早删除。

三、最佳实践

  1. 幂等性设计
    在业务层通过唯一 ID(如 UUID)或数据库唯一索引,确保重复消息不会导致数据错误。

  2. 监控与报警

    • 监控消费者的 offset lagkafka-consumer-groups.sh 工具),确保消费速度与生产速度匹配。
    • 监控 Kafka 副本同步状态(ISR 列表),及时处理节点故障。
  3. 死信队列(DLQ)
    将无法处理的消息发送到死信队列(如 dead-letter-topic),避免阻塞正常消费流程。

总结

场景解决方案
重复消费手动提交 Offset + 幂等性消费者 + 业务层去重
数据丢失acks=all + 副本机制 + 手动提交 Offset + 异常重试
可靠性保障事务性生产者 + 消费者幂等性 + 监控与报警 + 死信队列

通过以上策略,可在 Kafka 中实现数据的 Exactly-Once 语义(需结合业务层幂等性),满足金融、电商等高可靠性场景的需求。

文章来源:https://blog.csdn.net/qq_35204012/article/details/146533853
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ppmy.cn/news/1583429.html

相关文章

LangChain开发(二)LangChain提示词模板Template使用

文章目录 Prompt tempates(提示词模板)什么是提示词模板?创建一个提示词模板(prompt template)聊天消息提示词模板(chat prompt template)MessagesPlaceholder 提示词追加示例(Few-s…

ShardingSphere5.2.1+达梦数据库分表操作

上一篇使用的ShardingSphere版本为4.1.1&#xff0c;本次使用5.2.1 依赖引用 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instan…

JPA、Hibernate、 Spring Data JPA 以及Mybatis的关系(Java)

一、JPA是啥&#xff1f; 本质&#xff1a;是 Java EE 的 ORM 规范&#xff08;定义接口和注解&#xff0c;如 Entity, Table&#xff09;&#xff0c;不提供具体实现。 &#xff08;在我看来他就是个标准&#xff0c;就是你怎么写都得按人家标准来&#xff0c;不然就不行&…

蓝桥杯—最少操作数

一.题目 分析:每次可以进行三次操作&#xff0c;求在n步操作后可以达到目标数的最小n&#xff0c;和最短路径问题相似&#xff0c;分层遍历加记忆化搜索防止时间复杂度过高&#xff0c;还需要减枝操作 import java.util.HashSet; import java.util.LinkedList; import java.ut…

WebSocket 传输大量数据好不好?稳定不稳定

使用 WebSocket 传输大量数据 是可行的&#xff0c;但在实际应用中需要注意一些限制和优化策略。以下是关于 WebSocket 传输大量数据的详细分析&#xff1a; 1. WebSocket 传输大量数据的可行性 优点 实时性&#xff1a;WebSocket 是全双工通信协议&#xff0c;适合实时传输数…

抱法处势,用术御变-服务器漏洞-golang 语言漏洞

漏洞编号漏洞公告&#xff08;公告内会包含同一软件多个漏洞 CVE&#xff09;CVE-2022-27191Golong golang.org/x/crypto/ssh拒绝服务漏洞&#xff08;CVE-2022-27191&#xff09;CVE-2022-2989Podman 安全漏洞&#xff08;CVE-2022-2989&#xff09;CVE-2022-3064Go-Yaml 安全…

springboot body 转对象强验证属性多余属性抛错误

在Spring Boot中&#xff0c;当使用RequestBody注解来接收HTTP请求中的JSON数据并将其转换为Java对象时&#xff0c;Spring默认会忽略额外的属性。这意味着如果发送的JSON包含一些目标对象中没有定义的属性&#xff0c;Spring不会报错&#xff0c;这些额外的属性会被简单地忽略…

JavaScript基础巩固之小游戏练习

文章目录 一、前言二、练习 一、前言 加强巩固JavaScript基础语法&#xff0c;现记录一下JavaScript的练习过程。 需要的模块&#xff1a;npm install readline-sync 配置&#xff1a;node.js 二、练习 ▰▰▰▰▰▰▰▰▰▰▰▰▰▰▰猜拳游戏&#xff08;简易版&#xff09;…