【快速解决】kafka崩了,重启之后,想继续消费,怎么做?

embedded/2024/11/17 6:01:09/

目录

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

1、一个问题:

 2、查看消费者消费主题__consumer_offsets

3、一个重要前提:消费时要提交offset

二、指定 Offset 消费


假如遇到kafka崩了,你重启kafka之后,想要继续消费,应该怎么办?

  1. 首先确定要消费的主题是哪几个
  2. 其次使用命令或者其他的组件查看 __consumer_offset 主题下的偏移量信息,找到我们关心的主题在崩溃之前消费到了哪里。
  3. 最后使用 java 代码,里面有一个非常重要的方法 seek,指定需要消费的主题,分区以及偏移量,就可以继续消费了。

下面是解决这个问题的具体步骤!!! 

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

        因为__consumer_offset 主题下记录了主题的偏移量信息,所以提交offset之后,消费__consumer_offset 主题便可查看所有主题的偏移量信息

1、一个问题:

__consumer_offset 主题下的数据是不能查看的,怎么解决?

解决方案:

在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false。然后分发一下

分发:(这里我使用了脚本)

xsync.sh /opt/installs/kafka3/config/consumer.properties

注意:修改之前要先关闭kafka和zookeeper,修改完毕后再开启!

说明:默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。如果不修改是无法查看offset的值的,因为这些都是加密数据。

 2、查看消费者消费主题__consumer_offsets

kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --consumer.config /opt/installs/kafka3/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 

查询结果如下:

图中:1是消费者组名;2是Topic主题名;3是分区;4是偏移量,说明该主题崩溃前消费到了这里

此时便查询到了偏移量信息!!!

3、一个重要前提:消费时要提交offset

能查询到偏移量的前提是消费时要自动提交 offset(默认开启)或者手动提交 offset

一般不用管,因为自动提交会默认开启!!!

二、指定 Offset 消费

 kafka提供了seek方法,可以让我们从分区的固定位置开始消费。

seek (TopicPartition Partition,offset offset):指定分区和偏移量

package com.bigdata._03offsetTest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;public class _03CustomConsumerSeek {public static void main(String[] args) {Properties properties = new Properties();//连接kafka setProperty和put都行properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test5");// 是否自动提交 offset  通过这个字段设置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<String> topics = new ArrayList<>();topics.add("bigdata");// list可以设置多个主题的名称kafkaConsumer.subscribe(topics);// 执行计划// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}// 获取分区0的offset =100 以后的数据kafkaConsumer.seek(new TopicPartition("bigdata",0),100);// 因为消费者是不停的消费,所以是while truewhile(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record:records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}
}

执行这个java代码就可以从精确的指定位置继续消费了!!!

结果如下:

从上图可以看出,确实是从指定的主题、分区、偏移量开始消费的! 


http://www.ppmy.cn/embedded/138181.html

相关文章

大数据实训室建设的必要性

一、大数据发展的背景 大数据作为当今信息技术领域的核心驱动力&#xff0c;正在深刻地改变着社会的各个方面。它不仅仅是指数据量庞大&#xff0c;更重要的是指数据的多样性、实时性和复杂性。随着云计算、物联网等技术的迅猛发展&#xff0c;大数据已成为推动经济社会发展的…

无人机图传系统介绍——CKESC电调小课堂11.0

1. 概述 无人机图传系统是无人机的一个关键组成部分&#xff0c;其主要功能是将无人机在空中拍摄到的视频或图像数据实时传输到地面接收设备。这使得操作人员能够在地面实时查看无人机拍摄的内容&#xff0c;从而有效地控制无人机的拍摄角度、位置等参数&#xff0c;以满足各种…

Ubuntu20.04 为脚本文件创建桌面快捷方式 ubuntu

Ubuntu20.04 为脚本文件创建桌面快捷方式 在Ubuntu 20.04中&#xff0c;为脚本文件&#xff08;如 .sh 文件&#xff09;创建桌面快捷方式是一种提升用户效率的实用方法&#xff0c;允许用户通过图形用户界面直接执行重要或常用的脚本。以下是一种详细、专业且逻辑清晰的通用方…

TCP最后一次握⼿连接阶段,如果ACK包丢失会怎样?

2024年10月NJSD技术盛典暨第十届NJSD软件开发者大会、第八届IAS互联网架构大会在南京召开。百度文心快码总经理臧志分享了《AI原生研发新范式的实践与思考》&#xff0c;探讨了大模型赋能下的研发变革及如何在公司和行业中落地&#xff0c;AI原生研发新范式的内涵和推动经验。 …

流程图图解@RequestBody @RequestPart @RequestParam @ModelAttribute

RequestBody 只能用一次&#xff0c;因为只有一个请求体 #mermaid-svg-8WZfkzl0GPvOiNj3 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-8WZfkzl0GPvOiNj3 .error-icon{fill:#552222;}#mermaid-svg-8WZfkzl0GPvOiNj…

邻接多重表、十字链表、边集数组

关于数据结构的一个整理&#xff1a; 1、链式有序表的合并 2、栈 3、队列 4、二叉树、哈夫曼报文 5、图论 6、十大排序 7、校园导航系统 文章目录 1、邻接多重表2、十字链表泛型3、边集数组 1、邻接多重表 1、顶点头文件&#xff1a;VertexNode.h #pragma once #inclu…

蓝领招聘二期笔记

1. 先在md文件中定义大致的规范 2. 修改package.json中不同环境的运行指令 1.如何在组件上传的时候重新命名 files.forEach((file, index) > {const renameBaseName 基础文件const fileName renameBaseName? ${renameBaseName}${index 1 fileList.length}.${file.name…

猎板PCB罗杰斯板材的应用案例

以下是几个猎板 PCB 与罗杰斯板材结合的具体案例&#xff1a; 案例一&#xff1a;5G 通信基站天线 PCB 在 5G 通信基站的天线系统中&#xff0c;对高频信号的传输和处理要求极高。猎板 PCB 采用罗杰斯板材&#xff0c;凭借其稳定的低介电常数&#xff08;如 RO4003C 板材&…