【kafka系列】At Least Once语义

embedded/2025/2/21 7:03:24/

目录

1. At-Least-Once语义的定义

2. Kafka实现At-Least-Once的机制

2.1 生产者端

2.2 消费者端

3. At-Least-Once示例

场景描述

3.1 生产者代码(可能重复发送)

3.2 消费者代码(可能重复处理)

4. 典型重复场景分析

场景1:生产者重试导致消息重复

场景2:消费者重复处理消息

5. 业务端如何应对重复

5.1 幂等性处理

5.2 消费者端去重

6. At-Least-Once vs Exactly-Once

7. 总结


1. At-Least-Once语义的定义

At-Least-Once(至少一次) 语义指:

  • 消息从生产者到Broker:确保消息至少成功写入一次(可能因重试导致重复)。
  • 消息从Broker到消费者:确保消息至少被消费一次(可能因消费者重复拉取导致处理多次)。

核心特点消息不丢失,但可能重复


2. Kafka实现At-Least-Once的机制

2.1 生产者端
  • 配置acks=all:确保消息被所有ISR副本写入后才返回成功。
  • 开启重试(retries > 0:若Broker未及时响应,生产者自动重试发送消息。
  • 风险:若Broker已写入但未返回ACK(如网络超时),生产者重试会导致消息重复。
2.2 消费者端
  • 手动提交Offset:消费者处理完消息后手动调用commitSync()提交Offset。
  • 风险:若消费者处理消息后崩溃,未提交Offset,下次启动时会重新拉取消息,导致重复处理。

3. At-Least-Once示例

场景描述

一个用户积分系统:

  • 生产者:发送用户积分增加消息到Topic user_points
  • 消费者:消费消息,为用户增加积分,并提交Offset。
    要求:积分必须至少增加一次(允许重复增加,但需业务端处理重复)。

3.1 生产者代码(可能重复发送)
// 生产者配置(At-Least-Once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "all");      // 确保消息写入所有ISR副本
props.put("retries", 3);       // 重试3次KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送积分消息(可能重复)
producer.send(new ProducerRecord<>("user_points", "user-1001", "+10"));

潜在问题
若Broker写入成功但网络超时,生产者重试会导致消息重复发送到Topic。


3.2 消费者代码(可能重复处理)
// 消费者配置(At-Least-Once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "points-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_points"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息:为用户增加积分addUserPoints(record.key(), Integer.parseInt(record.value()));// 手动提交Offset(可能失败)consumer.commitSync();}
}

潜在问题
addUserPoints()执行成功但commitSync()失败(如消费者崩溃),下次启动时会重新拉取并处理同一消息,导致积分重复增加。


4. 典型重复场景分析

场景1:生产者重试导致消息重复
  • 原因:Broker已写入消息但ACK丢失,生产者重试发送。
  • 结果:Topic中存在两条相同消息(如user-1001, +10)。
场景2:消费者重复处理消息
  • 原因:消费者处理消息后未提交Offset(如崩溃)。
  • 结果:重启后重新拉取并处理同一消息,积分被重复增加。

5. 业务端如何应对重复

5.1 幂等性处理
  • 数据库唯一约束:为每条消息生成唯一ID,插入时去重。
INSERT INTO user_points (user_id, points, msg_id) 
VALUES ('user-1001', 10, 'uuid-123') 
ON CONFLICT (msg_id) DO NOTHING;
  • 业务逻辑幂等:设计接口支持多次调用结果一致。
// 幂等积分增加方法
public void addPoints(String userId, int points, String msgId) {if (!isMessageProcessed(msgId)) {updateUserPoints(userId, points);markMessageAsProcessed(msgId);}
}
5.2 消费者端去重
  • 本地记录已处理消息:使用缓存或数据库记录已处理的Offset或消息ID。
// 消费者处理逻辑(伪代码)
for (ConsumerRecord record : records) {if (!processedMessages.contains(record.offset())) {addUserPoints(record.key(), record.value());processedMessages.add(record.offset());consumer.commitSync();}
}

6. At-Least-Once vs Exactly-Once

特性

At-Least-Once

Exactly-Once

消息丢失风险

不丢失

不丢失

消息重复风险

可能重复

不重复

性能开销

低(无需事务)

高(事务与协调开销)

适用场景

允许重复的业务(如日志采集)

金融交易、精准统计

实现复杂度

简单

复杂(需生产者、Broker、消费者协同)


7. 总结

  • At-Least-Once是Kafka的默认语义:通过acks=all和手动提交Offset实现,简单高效。
  • 业务端必须处理重复:通过幂等性设计或去重机制避免数据不一致。
  • 适用场景:日志采集、指标上报等允许少量重复但对丢失敏感的场景。

通过合理配置和业务设计,At-Least-Once可平衡可靠性与性能,是大多数场景的推荐选择。


http://www.ppmy.cn/embedded/163493.html

相关文章

ubuntu安装docker 无法拉取问题

sudo docker run hello-world [sudo] ubuntu 的密码&#xff1a; Unable to find image hello-world:latest locally docker: Error response from daemon: Get "https://registry-1.docker.io/v2/": context deadline exceeded (Client.Timeout exceeded while awai…

增发股票(Seasoned Equity Offering, SEO):企业融资的关键方式(中英双语)

增发股票&#xff1a;企业融资的关键方式 引言 在资本市场中&#xff0c;增发股票&#xff08;Seasoned Equity Offering, SEO&#xff09;是企业通过发行新股票来筹集资金的一种方式。它不仅影响公司的财务状况&#xff0c;也直接影响股东权益、股价走势和市场流动性。 &am…

无人机集群路径规划:5种最新优化算法(CBSO、ECO、AOA、SFOA、PLO)求解无人机集群路径规划,提供MATLAB代码

一、单个无人机路径规划模型介绍 无人机三维路径规划是指在三维空间中为无人机规划一条合理的飞行路径&#xff0c;使其能够安全、高效地完成任务。路径规划是无人机自主飞行的关键技术之一&#xff0c;它可以通过算法和模型来确定无人机的航迹&#xff0c;以避开障碍物、优化…

【嵌入式Linux应用开发基础】进程实战开发

目录 一、进程基础概念回顾 1.1. 进程本质特征 1.2. 进程与程序的区别 1.3. 进程的组成 1.4. 进程 ID&#xff08;PID&#xff09; 1.5. 进程状态 1.6. 嵌入式系统中的特殊性 1.7. 关键命令 1.8. 示例场景 二、进程创建与终止 2.1. 进程创建 - fork()函数 2.2. 进…

SQLMesh 系列教程5- 详解SQL模型

本文将详细介绍 SQLMesh 的 SQL 模型组成要素及其在实际项目中的应用。SQLMesh 是一个强大的数据工程工具&#xff0c;其 SQL 模型由 MODEL DDL、预处理语句、主查询、后处理语句以及可选的 ON VIRTUAL UPDATE 语句组成。我们将通过一个电商平台每日销售报告的实例&#xff0c;…

新手小白学习docker第十弹-------Docker微服务实战

目录 0 引言1 IDEA创建微服务1.1 IDEA配置maven3.91.2 新建项目docker_serve1.3 编写docker_serve代码1.4 启动项目1.5 打包项目1.6 上传项目到虚拟机 2 部署到docker容器2.1 编写Dockerfile2.2 构建镜像2.3 运行容器2.4 访问测试 3 后记 0 引言 真的&#xff0c;这个看着就好…

DeepSeek横空出世,真的拯救了算力焦虑吗?

目录 DeepSeek横空出世&#xff0c;真的拯救了算力焦虑吗&#xff1f; 一、为什么会有算力焦虑 二、来自硅谷四大科技巨头的决策 1、Deepseek在24年底的突然崛起 2、利好算力的大背景下&#xff0c;硅谷四大科技巨头的“落后”加码 3、在算法博弈中加强算力基建的战略 三…

【Linux】文件系统:文件fd

&#x1f525;个人主页&#xff1a;Quitecoder &#x1f525;专栏&#xff1a;linux笔记仓 目录 01.回顾C文件接口02.系统文件I/O02.1 openflags 参数&#xff08;文件打开模式&#xff09;标记位传参1. 访问模式&#xff08;必须指定一个&#xff09;2. 额外控制标志&#xf…