Kafka核心参数与使用02

news/2025/1/11 4:31:17/

一、从基础的客户端说起

Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。

1. 消息发送者主流程

一个最基础的 Producer 发送消息的步骤如下:

  1. 设置 Producer 核心属性

    • 例如:bootstrap.servers(集群地址)、key.serializervalue.serializer 等。
    • 大多数核心配置在 ProducerConfig 中都有对应的注释说明。
  2. 构建消息

    • Kafka 消息是一个 Key-Value 结构,Key 常用于分区路由,Value 则是业务真正要传递的内容。
  3. 发送消息

    • 单向发送producer.send(record); 仅发出消息,不关心服务端响应。
    • 同步发送producer.send(record).get(); 获取服务端响应前会阻塞。
    • 异步发送producer.send(record, callback); 服务端响应时会回调。

示例核心代码示意:

java">Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =new ProducerRecord<>(TOPIC, key, value);
// 单向发送
producer.send(record);
// 同步发送
producer.send(record).get();
// 异步发送
producer.send(record, new Callback() { ... });

2. 消息消费者主流程

Consumer 侧,同样有三步:

  1. 设置 Consumer 核心属性

    • 例如:bootstrap.serversgroup.idkey.deserializervalue.deserializer 等。
  2. 拉取消息

    • Kafka 采用 Pull 模式:消费者主动调用 poll() 拉取消息。
  3. 处理消息,提交位点

    • 手动提交:consumer.commitSync()consumer.commitAsync()
    • 自动提交:设置 enable.auto.commit = true 及相应提交周期参数。

示例核心代码示意:

java">Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 业务处理}// 手动提交 offsetconsumer.commitSync();
}

二、从客户端属性来梳理客户端工作机制

Kafka 的核心特色在于高并发、高吞吐以及在网络不稳定、服务随时会崩溃等复杂场景下仍能保证消息安全性。以下属性与机制能帮助我们从客户端视角理解 Kafka。

1. 消费者分组消费机制

  • Group 机制
    每个 Consumer 都会指定一个 group.id。同一个 Consumer Group 内,Topic 的每个 Partition 只会被同组里的一个 Consumer 消费。不同组之间则是互不影响、各自消费。
  • offset 提交
    • offset 保存在 Broker 端,但由 Consumer “主导”提交。
    • 提交方式有:
      • 同步:commitSync(),安全但速度慢;
      • 异步:commitAsync(),快但可能丢失或重复消费。
    • auto.offset.reset
      • 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(earliest, latest, none)决定从何处开始消费。

提示:Offset 提交与消息处理之间并非完全同步,一旦无法保证强一致性,可能出现消息重复与消息丢失。可根据业务需求与场景选择手动提交或自动提交,也可将 offset 存入外部存储(如 Redis)自行管理。

2. 生产者拦截器机制

  • 通过配置 interceptor.classes 可以指定一个或多个实现了 ProducerInterceptor 接口的拦截器。
  • 典型功能:在发送前统一添加/修改消息内容,或者在发送后做监控/统计等操作。

3. 消息序列化机制

  • Producer 端:
    • key.serializer / value.serializer:将对象序列化为 byte[]
    • 内置如 StringSerializerIntegerSerializer 等;可自定义自定义序列化类。
  • Consumer 端:
    • key.deserializer / value.deserializer:将 byte[] 反序列化为业务对象。
  • 如果使用自定义类型(POJO)进行传输,则需要编写自定义 Serializer/Deserializer。
    • 核心思想:定长字段不定长字段的序列化与反序列化。

4. 消息分区路由机制

  • Producer 侧:
    • 通过 partitioner.class 指定自定义的分区器(Partitioner 接口)。Kafka 内置默认逻辑:
      • 若无 key,则采用黏性分区策略(Sticky Partition);
      • 若指定 key,则对 key 进行哈希得到分区;
      • 也可改为轮询策略(RoundRobinPartitioner)。
  • Consumer 侧:
    • 通过 partition.assignment.strategy 指定分区分配策略,内置 RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor 等。
      • Range:按顺序将分区切块分配。
      • RoundRobin:轮询分配。
      • Sticky:尽可能保持现有分配不变,同时保证分配均匀。

5. 生产者消息缓存机制

生产者为了提高吞吐量,会将消息先写入一个本地缓存(RecordAccumulator),然后 sender 线程批量发送到 Broker:

  • buffer.memory:缓存总大小,默认 32MB。
  • batch.size:每个分区发送批次大小,默认 16KB。
  • linger.ms:即便 batch.size 未填满,等待 linger.ms 毫秒后也会将消息批量发送。
  • max.in.flight.requests.per.connection:同一连接上未收到响应的请求数上限。

6. 发送应答机制

  • acks 用于控制生产者发送完消息后何时认为消息“成功”:
    • acks=0:不等待 Broker 确认,吞吐量高,安全性低。
    • acks=1:只等待 Leader 分区写入,常见设置。
    • acks=all-1:等待所有副本写入,安全性最高,吞吐量相对低。
  • 还需配合 Broker 端 min.insync.replicas 参数,控制副本个数不足时直接返回错误。

7. 生产者消息幂等性

  • 为保证 Exactly-once 语义,需要开启 enable.idempotence(幂等性)。
  • 幂等性主要依赖 PID + SequenceNumber 机制:
    • Producer 向同一分区发送消息时,每条消息都有一个单调递增的序列号。
    • Broker 针对 <PID, Partition> 维护序列号,只接收递增消息,防止消息重复写入。
  • 幂等性要求:
    • acks=all
    • retries>0
    • max.in.flight.requests.per.connection<=5

8. 生产者消息事务

  • 幂等性只能保证单个分区的 Exactly-once,如果涉及 多个分区/Topic 则需要“事务”来保证一批消息的一致性。
  • 主要 API:
    • initTransactions()
    • beginTransaction()
    • commitTransaction()
    • abortTransaction()
  • 事务依赖于 transaction.id 来区分不同的 Producer 实例,以便在崩溃重启后继续补偿或回滚先前未完成的事务,保证多分区的一致写入。

三、客户端流程总结

  1. Producer:

    • 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到 RecordAccumulatorSender 线程批量发送到 Broker → 按 acks 等待 Broker 响应 → 提交或重试。
  2. Consumer:

    • 属性配置(反序列化、消费组、分区分配策略等) → poll() 拉取消息 → 业务处理 → 提交 offset(手动或自动),与 Broker 同步消费进度。
  3. 重点:

    • 消息在 Producer 端的缓存发送机制消息在 Consumer 端的主动拉取、分组消费、offset 提交 是理解 Kafka 高并发、高吞吐、高可用的关键。
    • 其他如 幂等性(保证单分区 Exactly-once)和 事务(保证多分区一致性)是针对数据安全性和业务需求的更深入扩展。

四、Spring Boot 集成 Kafka

Spring Boot 中集成 Kafka 本质也是对上述 Producer/Consumer 的封装。

  1. 引入依赖

    java"><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. application.properties 中配置 Kafka 相关参数

    • 和原生 Kafka 参数名称基本一致,如 spring.kafka.producer.*spring.kafka.consumer.* 等。
    • 典型参数:bootstrap-servers, acks, batch-size, enable-auto-commit, auto-offset-reset 等。
  3. 使用 KafkaTemplate 发送消息

    java">@RestController
    public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/send/{message}")public void sendMessage(@PathVariable("message") String msg) {kafkaTemplate.send("topic1", msg);}
    }
    
  4. 使用 @KafkaListener 声明消息消费者

    java">@Component
    public class KafkaConsumerListener {@KafkaListener(topics = {"topic1"})public void onMessage(ConsumerRecord<?, ?> record) {System.out.println("消费内容:" + record.value());}
    }
    

 


结语

  • 想要真正掌握 Kafka,重点在于建立整体的数据流转模型
    • Producer 端如何将消息分区、缓存、发送、应答、重试、保证幂等与事务;
    • Consumer 端如何分组消费、订阅分区、拉取消息、提交 offset。
  • 熟悉这些机制后,再去看各种客户端配置就会轻松许多,能够结合实际业务场景做灵活配置与调优。
  • Spring Boot 也只是对原生 Kafka 客户端的进一步封装,一旦理解 Kafka 底层机制与各项参数原理,使用 Spring Boot 时只需“对号入座”地进行配置即可。

http://www.ppmy.cn/news/1562154.html

相关文章

【每日学点鸿蒙知识】跳转三方地图、getStringSync性能、键盘避让模式等

1、跳转三方地图导航页 类似于Android 跳转到地图APP 导航页面&#xff1a; // 目标地点的经纬度和名称 double destinationLat 36.547901; double destinationLon 104.258354; String destinationName "目的地名称"; // 构建URI Uri uri Uri.parse("…

运行vue项目,显示“npm”无法识别为 cmdlet、函数、脚本文件或可操作程序的名称

PS D:\weduproject\wedu1\wedu\wedu-fast-vue> npm run dev&#xff0c;运行时出现像下面这样的报红信息&#xff0c; npm : The term npm is not recognized as the name of a cmdlet, function, script file, or operable program. Check the spelling of the name, or …

Nginx:HTTP 方法控制

什么是 HTTP 方法控制? HTTP 方法控制 是指在 Nginx 中配置规则,以限制哪些 HTTP 请求方法被允许访问特定资源。HTTP 定义了多种请求方法,每种方法都有其特定用途: GET:用于请求获取指定资源。POST:用于向指定资源提交数据,通常用于提交表单或上传文件。PUT:用于更新指…

pandas系列----DataFrame简介

DataFrame是Pandas库中最常用的数据结构之一&#xff0c;它是一个类似于二维数组或表格的数据结构。DataFrame由多个列组成&#xff0c;每个列可以是不同的数据类型&#xff08;如整数、浮点数、字符串等&#xff09;。每列都有一个列标签&#xff08;column label&#xff09;…

【微服务】6、限流 熔断

线程隔离与容错处理 本视频主要讲解了在购物车业务中&#xff0c;因商品微服务响应慢导致的问题及解决方案&#xff0c;重点介绍了线程隔离后查询购物车业务不可用的情况&#xff0c;以及如何通过Fallback逻辑进行缓解&#xff0c;包括配置Feign调用为簇点资源、添加Fallback逻…

QT-TCP-server

为了实现高性能的TCP通讯&#xff0c;以下是一个基于Qt的示例&#xff0c;展示如何利用多个线程、非阻塞I/O、数据分块和自定义协议进行优化。该示例以TCP服务器和客户端的形式展示&#xff0c;能够承受高负载并实现快速数据传输。 高性能TCP Server示例 #include <QTcpSe…

算法5--位运算

目录 基础经典例题[面试题 01.01. 判定字符是否唯一](https://leetcode.cn/problems/is-unique-lcci/description/)[268. 丢失的数字](https://leetcode.cn/problems/missing-number/description/)[371. 两整数之和](https://leetcode.cn/problems/sum-of-two-integers/descrip…

Redis数据结构ZipList和QuickList原理解析

大家好&#xff0c;我是袁庭新。 在数据库的世界里&#xff0c;Redis 以其高效和灵活备受瞩目。而其中的 ZipList 和 QuickList 数据结构更是独具魅力。它们在内存管理和数据存储方面有着独特的设计理念&#xff0c;深入探究这些结构&#xff0c;能让我们更好地理解 Redis 的强…