Kafka面试题----如何保证Kafka消费者在消费过程中不丢失消息

devtools/2025/2/25 5:29:57/

合理配置消费者参数

  • enable.auto.commit:设置为 false,关闭自动提交偏移量。自动提交偏移量存在一定的时间间隔,在这个间隔内如果消费者出现异常,可能会导致部分消息被重复消费或者丢失。关闭自动提交后,由开发者手动控制偏移量的提交,确保消息处理完成后再提交偏移量。
enable.auto.commit=false
  • auto.offset.reset:根据业务需求合理设置该参数。该参数指定当消费者没有有效的偏移量时(例如,消费者首次启动或者偏移量已过期),从何处开始消费消息。

    • earliest:从分区的起始位置开始消费消息,确保不会遗漏任何消息,但可能会导致重复消费。
    • latest:从分区的最新位置开始消费消息,会忽略分区中已有的旧消息。如果需要处理所有消息,不建议使用该值。
auto.offset.reset=earliest

正确处理消息和提交偏移量

  • 手动提交偏移量:在消息处理完成后,手动提交偏移量。可以选择同步提交(commitSync())或异步提交(commitAsync())。
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class ManualOffsetCommitConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "test-topic";consumer.subscribe(Collections.singletonList(topic));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 同步提交偏移量consumer.commitSync();}} finally {consumer.close();}}
}
  • 批量处理和批量提交:如果消息处理速度较慢,可以采用批量处理的方式,处理完一批消息后再统一提交偏移量。但要注意,批量处理过程中如果出现异常,可能需要回滚并重新处理这批消息。
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;public class BatchOffsetCommitConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "test-topic";consumer.subscribe(Collections.singletonList(topic));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();for (ConsumerRecord<String, String> record : records) {// 处理消息System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());TopicPartition partition = new TopicPartition(record.topic(), record.partition());// 记录每个分区的最新偏移量offsetsToCommit.put(partition, new OffsetAndMetadata(record.offset() + 1));}if (!offsetsToCommit.isEmpty()) {// 同步提交偏移量consumer.commitSync(offsetsToCommit);}}} finally {consumer.close();}}
}

处理消费者异常和故障恢复

  • 异常处理:在消息处理过程中,捕获并处理可能出现的异常,确保在异常情况下不会丢失消息。例如,如果处理消息时出现数据库写入失败的异常,可以进行重试或者记录日志,待问题解决后重新处理。
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class ExceptionHandlingConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "test-topic";consumer.subscribe(Collections.singletonList(topic));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {try {// 处理消息System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());} catch (Exception e) {// 记录异常日志System.err.println("Error processing message: " + e.getMessage());// 可以进行重试或者其他处理}}consumer.commitSync();}} finally {consumer.close();}}
}
  • 故障恢复:当消费者发生故障重启时,要确保能够从上次处理的位置继续消费消息。由于关闭了自动提交偏移量,并且手动控制偏移量的提交,消费者重启后会从上次提交的偏移量位置开始消费,从而避免消息丢失。

监控和日志记录

  • 监控消费者状态:使用 Kafka 提供的监控工具或者第三方监控系统,实时监控消费者的状态,包括消费偏移量、消费速率等。及时发现异常情况并进行处理。
  • 日志记录:详细记录消息处理过程中的关键信息,如消息的偏移量、处理结果、异常信息等。当出现问题时,可以通过日志进行排查和恢复。

http://www.ppmy.cn/devtools/161499.html

相关文章

鸿蒙开发深入浅出04(首页数据渲染、搜索、Stack样式堆叠、Grid布局、shadow阴影)

鸿蒙开发深入浅出04&#xff08;首页数据渲染、搜索、Stack样式堆叠、Grid布局、shadow阴影&#xff09; 1、效果展示2、ets/pages/Home.ets3、ets/views/Home/SearchBar.ets4、ets/views/Home/NavList.ets5、ets/views/Home/TileList.ets6、ets/views/Home/PlanList.ets7、后端…

PCF8591一次读取多条通道导致测量值不准确的原因及解决方法

使用PCF8591测量通道电压的时候&#xff0c;只测量一个通道电压是正常的&#xff0c;但是要测量两个通道的电压时&#xff0c;会异常显示。 产生原因 时序精度不够 PCF8591通过选择不同的通道进行模拟信号采样。每次转换前&#xff0c;通道的选择需要一定的时间&#xff0c;…

便携式动平衡仪Qt应用层详细设计说明书

便携式动平衡仪Qt应用层详细设计说明书 (DDD) 版本&#xff1a;1.1 日期&#xff1a;2023年10月 一、文档目录 系统概述应用层架构设计模块详细设计接口定义与数据流关键数据结构代码框架与实现测试计划附录 二、系统概述 2.1 功能需求 开机流程&#xff1a;长按电源键启动…

DeepSeek写俄罗斯方块手机小游戏

DeepSeek写俄罗斯方块手机小游戏 提问 根据提的要求&#xff0c;让DeepSeek整理的需求&#xff0c;进行提问&#xff0c;内容如下&#xff1a; 请生成一个包含以下功能的可运行移动端俄罗斯方块H5文件&#xff1a; 核心功能要求 原生JavaScript实现&#xff0c;适配手机屏幕 …

第六次作业

一.对比 LVS 负载均衡群集的 NAT 模式和 DR 模式&#xff0c;比较其各自的优势 。 LVS-NAT模式的优势 配置简单&#xff1a;NAT模式的配置相对容易&#xff0c;无需复杂的网络设置&#xff0c;适合初学者和小型网络环境。 网络架构灵活&#xff1a;由于使用了NAT技术&#xf…

Deepseek快速做PPT

背景&#xff1a; DeepSeek大纲生成 → Kimi结构化排版 → 数据审查&#xff0c;细节调整 DeepSeek 拥有深度思考能力&#xff0c;擅长逻辑构建与内容生成&#xff0c;它会根据我们的问题进行思考&#xff0c;其深度思考能力当前测试下来&#xff0c;不愧为国内No.1&#xff0…

【JavaEE进阶】数据库连接池

目录 &#x1f334;数据库连接池 &#x1f38b;数据库连接池的使用 &#x1f332;MySQL企业开发规范 &#x1f334;数据库连接池 数据库连接池负责分配、管理和释放数据库连接&#xff0c;它允许应⽤程序重复使⽤⼀个现有的数据库连接&#xff0c;⽽不是再重新建⽴⼀个. 没…

1.1 go环境搭建及基本使用

golang下载地址&#xff1a; Download and install - The Go Programming Language (google.cn) 验证安装是否成功&#xff1a; go version 查看go环境 go env 注意&#xff1a;Go1.11版本之后无需手动配置环境变量,使用go mod 管理项目&#xff0c;也不需要把项目放到GO…