第 5 章 Kafka 消费者

ops/2024/10/25 15:57:16/

5.1 Kafka 消费方式

5.2 Kafka 消费者工作流程

5.2.1 消费者总体工作流程

5.2.2 消费者组原理

5.2.3 消费者重要参数

5.3 消费者 API

5.3.1 独立消费者案例(订阅主题)

package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 创建消费者对象KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<String, String>(properties);// 注册要消费的主题(可以消费多个主题)ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);// 拉取数据打印while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}
}

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh
--
bootstrap-server hadoop102:9092 --topic first
>hello
(3)在 IDEA 控制台观察接收到的数据。
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 3,
offset = 0, CreateTime = 1629160841112, serialized key size = -1,
serialized value size = 5, headers = RecordHeaders(headers = [],
isReadOnly = false), key = null, value = hello)

5.3.2 独立消费者案例(订阅分区)

2 )实现步骤
1 )代码编写。
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerPartition {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(必须),名字可以任意起properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 消费某个主题的某个分区数据ArrayList<TopicPartition> topicPartitions = newArrayList<>();topicPartitions.add(new TopicPartition("first", 0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}} }
3 )测试
1 )在 IDEA 中执行消费者程序。
(2)在 IDEA 中执行生产者程序 CustomProducerCallback() 在控制台观察生成几个 0
分区的数据。
first 0 381
first 0 382
first 2 168
first 1 165
first 1 166
(3)在 IDEA 控制台,观察接收到的数据,只能消费到 0 号分区数据表示正确。
ConsumerRecord(topic = first, partition = 0 , leaderEpoch = 14,
offset = 381, CreateTime = 1636791331386, serialized key size = -
1, serialized value size = 9, headers = RecordHeaders(headers =
[], isReadOnly = false), key = null, value = atguigu 0)
ConsumerRecord(topic = first, partition = 0 , leaderEpoch = 14,
offset = 382, CreateTime = 1636791331397, serialized key size = -
1, serialized value size = 9, headers = RecordHeaders(headers =
[], isReadOnly = false), key = null, value = atguigu 1)

5.3.3 消费者组案例

2 )案例实操
1 )复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中
的两个消费者。
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer1 {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 创建消费者对象KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<String, String>(properties);// 注册主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);// 拉取数据打印while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}} } 
(2)启动代码中的生产者发送消息,在 IDEA 控制台即可看到两个消费者在消费不同
分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码 Thread.sleep(2); )。
ConsumerRecord(topic = first, partition = 0 , leaderEpoch = 2,
offset = 3, CreateTime = 1629169606820, serialized key size = -1,
serialized value size = 8, headers = RecordHeaders(headers = [],
isReadOnly = false), key = null, value = hello1 )
ConsumerRecord(topic = first, partition = 1 , leaderEpoch = 3,
offset = 2, CreateTime = 1629169609524, serialized key size = -1,
serialized value size = 6, headers = RecordHeaders(headers = [],
isReadOnly = false), key = null, value = hello2 )
ConsumerRecord(topic = first, partition = 2 , leaderEpoch = 3,
offset = 21, CreateTime = 1629169611884, serialized key size = -1,
serialized value size = 6, headers = RecordHeaders(headers = [],
isReadOnly = false), key = null, value = hello3 )
(3)重新发送到一个全新的主题中,由于默认创建的主题分区数为 1 ,可以看到只能
有一个消费者消费到数据。

5.4 生产经验——分区的分配以及再平衡

5.4.1 Range 以及再平衡

(3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {public static void main(String[] args) throwsInterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaProducer<String, String> kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 7; i++) {kafkaProducer.send(new ProducerRecord<>("first", i,"test", "atguigu"));}kafkaProducer.close();} }
说明: Kafka 默认的分区分配策略就是 Range + CooperativeSticky ,所以不需要修改策
略。
(4)观看 3 个消费者分别消费哪些分区的数据。

5.4.2 RoundRobin 以及再平衡

5.4.3 Sticky 以及再平衡

5.5 offset 位移

5.5.1 offset 的默认维护位置

5.5.2 自动提交 offset

1)消费者自动提交 offset

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerAutoOffset {public static void main(String[] args) {// 1. 创建 kafka 消费者配置类Properties properties = new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// 配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 提交 offset 的时间周期 1000ms,默认 5sproperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//3. 创建 kafka 消费者KafkaConsumer<String, String> consumer = newKafkaConsumer<>(properties);//4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList("first"));//5. 消费数据while (true){// 读取消息ConsumerRecords<String, String> consumerRecords =consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord.value());}}} }

5.5.3 手动提交 offset

1 )同步提交 offset
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
交的效率比较低。以下为同步提交 offset 的示例。
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerByHandSync {public static void main(String[] args) {// 1. 创建 kafka 消费者配置类Properties properties = new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// 配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//3. 创建 kafka 消费者KafkaConsumer<String, String> consumer = newKafkaConsumer<>(properties);//4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList("first"));//5. 消费数据while (true){// 读取消息ConsumerRecords<String, String> consumerRecords =consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord.value());}// 同步提交 offsetconsumer.commitSync();}} }
2 )异步提交 offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此
吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
以下为异步提交 offset 的示例:
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class CustomConsumerByHandAsync {public static void main(String[] args) {// 1. 创建 kafka 消费者配置类Properties properties = new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// 配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");//3. 创建 Kafka 消费者KafkaConsumer<String, String> consumer = newKafkaConsumer<>(properties);//4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList("first"));//5. 消费数据while (true){// 读取消息ConsumerRecords<String, String> consumerRecords =consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord.value());}// 异步提交 offsetconsumer.commitAsync();}} }

5.5.4 指定 Offset 消费

(4)任意指定 offset 位移开始消费
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class CustomConsumerSeek {public static void main(String[] args) {// 0 配置信息Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key value 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment= new HashSet<>();while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();}// 遍历所有分区,并指定 offset 从 1700 的位置开始消费for (TopicPartition tp: assignment) {kafkaConsumer.seek(tp, 1700);}// 3 消费该主题数据while (true) {ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}} }
注意:每次执行完,需要修改消费者组名;

5.5.5 指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。
例如要求按照时间消费前一天的数据,怎么处理?
操作步骤:
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerForTime {public static void main(String[] args) {// 0 配置信息Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key value 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment = new HashSet<>();while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();}HashMap<TopicPartition, Long> timestampToSearch = newHashMap<>();// 封装集合存储,每个分区对应一天前的数据for (TopicPartition topicPartition : assignment) {timestampToSearch.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);}// 获取从 1 天前开始消费的每个分区的 offsetMap<TopicPartition, OffsetAndTimestamp> offsets =kafkaConsumer.offsetsForTimes(timestampToSearch);// 遍历每个分区,对每个分区设置消费时间。for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp =offsets.get(topicPartition);// 根据时间指定开始消费的位置if (offsetAndTimestamp != null){kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());}}// 3 消费该主题数据while (true) {ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}} }

5.5.6 漏消费和重复消费

5.6 生产经验——消费者事务

5.7 生产经验——数据积压(消费者如何提高吞吐量)


http://www.ppmy.cn/ops/128378.html

相关文章

从零学习大模型(六)-----LoRA(上)

LoRA简介 LoRA&#xff08;Low-Rank Adaptation&#xff09;是一种参数高效的微调技术&#xff0c;旨在降低微调大规模预训练模型的存储和计算成本。**其核心思想是通过对模型的特定参数进行低秩分解&#xff0c;仅对少量附加参数进行训练&#xff0c;从而完成任务适应&#x…

证明在由特定矩阵生成的幺半子群中,存在收敛序列的子序列,其元素也能分别构成收敛序列

设 H H H是 G L 4 ( R ) GL_4(\mathbb{R}) GL4​(R)的由矩阵 ( 1 a 0 0 0 1 0 0 0 0 1 0 0 0 0 1 ) , ( 1 0 0 0 0 1 b 0 0 0 1 0 0 0 0 1 ) , ( 1 0 0 0 0 1 0 0 0 0 1 c 0 0 0 1 ) \begin{pmatrix}1&a&0&0\\ 0&1&0&0\\ 0&0&1&0\\ 0&…

Linux内核常见的网络丢包场景分析,零基础入门到精通,收藏这一篇就够了

摘要 一个数据包在网络中传输的过程中&#xff0c;是没法保证一定能被目的机接收到的。其中有各种各样的丢包原因&#xff0c;今天来学习一下数据包经过 linux 内核时常见的丢包场景。 1 收发包处理流程 有必要再回顾下 linux 内核的收发包处理流程&#xff0c;才能更好的认清…

pytorch nn.NLLLoss和nn.CrossEntropyLoss函数区别

nn.CrossEntropyLoss(交叉熵损失函数) 和nn.NLLLoss (负对数似然损失函数)的区别 输入格式&#xff1a; nn.CrossEntropyLoss&#xff1a;直接接受未归一化的 logits 作为输入&#xff0c;并在内部自动应用 log_softmax 来计算对数概率。nn.NLLLoss&#xff1a;接受对数概率&a…

【分布式微服务云原生】《Redis 分布式锁的挑战与解决方案及 RedLock 的强大魅力》

《Redis 分布式锁的挑战与解决方案及 RedLock 的强大魅力》 摘要&#xff1a; 本文深入探讨了使用 Redis 做分布式锁时可能遇到的各种问题&#xff0c;并详细阐述了相应的解决方案。同时&#xff0c;深入剖析了 RedLock 作为分布式锁的原因及原理&#xff0c;包括其多节点部署…

孤岛架构在异构性方面优势

1. 技术多样性 支持多种技术栈&#xff1a;孤岛架构允许每个孤岛使用最适合其业务需求的技术栈。这意味着可以根据不同的业务功能或数据处理需求选择不同的编程语言、数据库、框架和工具。 2. 独立性 独立开发和部署&#xff1a;每个孤岛可以独立开发和部署&#xff0c;这使…

docker 资源限制+调优详解

容器资源限制介绍 下面我将详细讲解 Docker 的各种资源限制及其在生产环境中的实际应用案例。我们将逐一探讨 CPU、内存、磁盘 I/O 和网络带宽的限制&#xff0c;并提供具体的配置示例和解释。 1. CPU 限制 1.1 设置 CPU 份额 --cpu-shares&#xff1a;设置容器的 CPU 优先…

重修设计模式-行为型-迭代器模式

重修设计模式-行为型-迭代器模式 提供一种方法访问一个容器对象中各个元素&#xff0c;而又不需暴露该对象的内部细节。 迭代器模式&#xff08;Iterator Pattern&#xff09;将容器的遍历操作从容器类中拆分出来&#xff0c;放到迭代器类中&#xff0c;让两者的职责更加单一。…