Kafka AdminClient API 来获取特定 Kafka 消费组的消费延迟

ops/2024/12/26 14:52:47/

文章目录

      • 代码流程详解
        • 1. Kafka 配置与创建 `AdminClient`
        • 2. 获取 Topic 的所有分区
        • 3. 获取消费者组的偏移量
        • 4. 获取每个分区的 `log-end-offset`
        • 5. 获取消费者组成员信息
        • 6. 计算 Lag 并输出信息
        • 7. 关闭 `AdminClient`
        • 8. 完整代码
      • 代码功能总结:

这段代码的目标是通过 Kafka AdminClient API 获取特定消费者组在一个特定 Topic 中各个分区的消费延迟(Lag)信息,并输出消费者实例的信息(包括实例 ID 和主机)。该程序会计算每个分区的消费 Lag 并输出消费者的偏移量、日志结束偏移量(log-end-offset)以及每个消费者实例的相关信息。

代码流程详解

1. Kafka 配置与创建 AdminClient
String bootstrapServers = "";  // Kafka 集群的地址(需要根据实际情况调整)
String consumerGroupId = "";  // 消费者组 ID(需要根据实际情况替换)
String topicName = ""; // Topic 名称(根据实际情况替换)Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
AdminClient adminClient = AdminClient.create(adminProps);
  • bootstrapServers:指定 Kafka 集群的地址,通常是一个或多个 Kafka broker 的地址。
  • consumerGroupId:要查询的消费者组 ID。
  • topicName:要查询的 Topic 名称。
  • AdminClient:用于与 Kafka 集群交互的客户端,用于执行诸如描述 Topic、获取消费者组的偏移量等操作。
2. 获取 Topic 的所有分区
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topicName));
Map<String, TopicDescription> topicDescriptions = describeTopicsResult.all().get();
TopicDescription topicDescription = topicDescriptions.get(topicName);
List<TopicPartition> topicPartitions = new ArrayList<>();for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {topicPartitions.add(new TopicPartition(topicName, partitionInfo.partition()));
}
  • describeTopics:用于获取 Topic 的元数据(如分区数量等)。
  • TopicPartition:每个 Topic 会有多个分区,TopicPartition 对象代表了某个 Topic 中的特定分区。
3. 获取消费者组的偏移量
ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(consumerGroupId);
Map<TopicPartition, OffsetAndMetadata> consumerOffsets = offsetsResult.partitionsToOffsetAndMetadata().get();
  • listConsumerGroupOffsets:返回消费者组在每个分区上的当前消费偏移量。返回的是每个 TopicPartition 对应的 OffsetAndMetadata(包括当前偏移量和元数据)。
4. 获取每个分区的 log-end-offset
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap =adminClient.listOffsets(topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()))).all().get();
  • listOffsets:返回指定分区的 log-end-offset,即分区的最后消息的偏移量。OffsetSpec.latest() 表示获取当前最新的偏移量(log-end-offset)。
5. 获取消费者组成员信息
DescribeConsumerGroupsResult consumerGroupResult = adminClient.describeConsumerGroups(Collections.singletonList(consumerGroupId));
Map<String, ConsumerGroupDescription> consumerGroupDescriptionMap = consumerGroupResult.all().get();
ConsumerGroupDescription consumerGroupDescription = consumerGroupDescriptionMap.get(consumerGroupId);
  • describeConsumerGroups:获取消费者组的描述信息,包括该组内的消费者实例信息(例如,消费者的分区分配情况、消费者的主机名等)。
6. 计算 Lag 并输出信息
for (TopicPartition partition : topicPartitions) {OffsetAndMetadata consumerOffset = consumerOffsets.get(partition);if (consumerOffset != null) {long consumerOffsetValue = consumerOffset.offset(); // 当前消费者的偏移量ListOffsetsResult.ListOffsetsResultInfo logEndOffsetInfo = topicPartitionListOffsetsResultInfoMap.get(partition);long logEndOffset = logEndOffsetInfo.offset(); // Kafka 中该分区的 log-end-offset// 计算 Lag = log-end-offset - consumerOffsetlong lag = logEndOffset - consumerOffsetValue;String consumerInstance = "";// 获取每个消费者实例的信息for (MemberDescription member : consumerGroupDescription.members()) {for (TopicPartition topicPartition : member.assignment().topicPartitions()) {if (topicPartition.topic().equals(partition.topic())) {Field field = MemberDescription.class.getDeclaredField("memberId");  // 获取成员 ID 字段field.setAccessible(true);  // 设置该字段为可访问String memberIdValue = (String) field.get(member);  // 通过反射获取该字段的值consumerInstance = memberIdValue + ":" + member.host();  // 组合消费者 ID 和主机信息break;}}}// 输出每个分区的 Lag 以及消费者实例信息System.out.println("Topic: " + partition.topic() + ", Partition: " + partition.partition() +", Consumer Offset: " + consumerOffsetValue + ", Log End Offset: " + logEndOffset + ", Lag: " + lag + ", consumerInstance : " + consumerInstance);} else {System.out.println("No consumer offset found for partition: " + partition);}
}
  • Lag 计算:Lag 是指 Kafka 中某个分区的 log-end-offset 和消费者的当前偏移量(consumerOffset)之间的差距。即:

    • Lag = log-end-offset - consumerOffset,表示当前消费者尚未消费的消息数量。
  • 反射访问消费者实例信息

    • 通过反射访问 MemberDescription 类中的私有字段 memberId(该字段表示消费者的唯一 ID)。
    • 使用 setAccessible(true) 方法绕过访问控制,使得可以访问私有字段。
    • 获取到 memberId 后,组合消费者的 ID 和主机地址,作为消费者实例的标识。
7. 关闭 AdminClient
adminClient.close();
  • 关闭 AdminClient 实例,释放相关资源。

8. 完整代码
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;public class KafkaLagChecker {public static void main(String[] args) throws ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException {// Kafka 配置String bootstrapServers = "";  // 请根据实际情况调整String consumerGroupId = "";  // 请替换为你的 consumer groupString topicName = ""; // 请替换为你的 Topic 名称// 创建 AdminClientProperties adminProps = new Properties();adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);AdminClient adminClient = AdminClient.create(adminProps);// 获取 topic 中所有分区DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topicName));Map<String, TopicDescription> topicDescriptions = describeTopicsResult.all().get();TopicDescription topicDescription = topicDescriptions.get(topicName);List<TopicPartition> topicPartitions = new ArrayList<>();for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {topicPartitions.add(new TopicPartition(topicName, partitionInfo.partition()));}// 获取 consumer group 的偏移量ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(consumerGroupId);Map<TopicPartition, OffsetAndMetadata> consumerOffsets = offsetsResult.partitionsToOffsetAndMetadata().get();// 获取 topic 分区的 log-end-offsetMap<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap =adminClient.listOffsets(topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()))).all().get();// 获取 consumer group 成员信息DescribeConsumerGroupsResult consumerGroupResult = adminClient.describeConsumerGroups(Collections.singletonList(consumerGroupId));Map<String, ConsumerGroupDescription> consumerGroupDescriptionMap = consumerGroupResult.all().get();ConsumerGroupDescription consumerGroupDescription = consumerGroupDescriptionMap.get(consumerGroupId);// 输出消费者的偏移量与 log-end-offset 比较for (TopicPartition partition : topicPartitions) {OffsetAndMetadata consumerOffset = consumerOffsets.get(partition);if (consumerOffset != null) {long consumerOffsetValue = consumerOffset.offset(); // 消费者的当前偏移量// 获取 Kafka 中该分区的 log-end-offsetListOffsetsResult.ListOffsetsResultInfo logEndOffsetInfo = topicPartitionListOffsetsResultInfoMap.get(partition);long logEndOffset = logEndOffsetInfo.offset(); // Kafka 中该分区的 log-end-offset// 计算 Laglong lag = logEndOffset - consumerOffsetValue;String consumerInstance = "";// 输出每个消费实例的信息for (MemberDescription member : consumerGroupDescription.members()) {for (TopicPartition topicPartition : member.assignment().topicPartitions()) {if (topicPartition.topic().equals(partition.topic())) {Field field = MemberDescription.class.getDeclaredField("memberId");// 设置可以访问私有字段field.setAccessible(true);// 通过反射获取 final 字段的值String memberIdValue = (String) field.get(member);consumerInstance =  memberIdValue + ":" + member.host();break;}}}// 输出每个分区的 Lag,并输出每个消费者实例信息System.out.println("Topic: " + partition.topic() + ", Partition: " + partition.partition() +", Consumer Offset: " + consumerOffsetValue + ", Log End Offset: " + logEndOffset + ", Lag: " + lag + ", consumerInstance : " + consumerInstance);} else {System.out.println("No consumer offset found for partition: " + partition);}}// 关闭 AdminClientadminClient.close();}
}

代码功能总结:

  1. 查询 Kafka 分区的 log-end-offset 和消费者的 consumerOffset
  2. 计算每个分区的消费延迟(Lag)。
  3. 使用反射访问消费者实例的 memberId 字段和主机名。
  4. 输出每个 Topic 分区的消费偏移量、日志结束偏移量、Lag 和消费者实例信息。

http://www.ppmy.cn/ops/139337.html

相关文章

第十一课 Unity编辑器创建的资源优化_预制体和材质篇(Prefabs和Materials)详解

预制体(Prefabs) Unity中的预制体是用来存储游戏对象、子对象及其所需组件的可重用资源&#xff0c;一般来说预制体资源可充当资源模版&#xff0c;在此模版基础上可以在场景中创建新的预制体实例。 使用预制体的好处 由于预制体系统可以自动保持所有实例副本同步&#xff0c…

神经网络和支持向量机的基础——感知机模型

一、感知机模型的原理 感知机模型&#xff0c;也被称为神经元模型&#xff0c;其设计灵感来源于生物神经元的运行机制。它模拟了神经元的信息接收、处理和输出的过程&#xff0c;从而实现了对未知数据的分类。感知机模型的核心是线性回归与符号函数的结合。具体来说&#xff0…

预训练模型与ChatGPT:自然语言处理的革新与前景

目录 一、ChatGPT整体背景认知 &#xff08;一&#xff09;ChatGPT引起关注的原因 &#xff08;二&#xff09;与其他公司的竞争情况 二、NLP学习范式的发展 &#xff08;一&#xff09;规则和机器学习时期 &#xff08;二&#xff09;基于神经网络的监督学习时期 &…

<工具 Claude Desktop> 配置 MCP server 连接本地 SQLite, 本机文件夹(目录) 网络驱动器 Windows 11 系统

也是在学习中... 起因&#xff1a; 抖音博客 艾克AI分享 他的视频 #143《Claude开源MCP彻底打破AI的信息孤岛》 提到: Claude开源的MCP太强了&#xff0c;视频后面是快速演示&#xff0c;反正看了好几遍也没弄明白。菜单都不一样&#xff0c;感觉用的不是同一家 Claude. 探…

如何将快捷指令添加到启动台

如何将快捷指令添加到启动台/Finder/访达&#xff08;Mac&#xff09; 1. 打开快捷指令创建快捷指令 示例创建了一个文件操作测试的快捷指令。 2. 右键选择添加到程序坞 鼠标放在待添加的快捷指令上。 3. 右键添加到访达 鼠标放在待添加的快捷指令上。 之后就可以在启…

C#VB.NET开发整体一键国际化显示

第一章链接 第二章 窗口多国语言显示 在第一章时我们已经了解如何对内容进行多语言化下面讲解如何对窗口多语言显示 在实际开发中单个窗体内可能有很多控件,如果我们对每个控件使用Mu方法进行赋值是异常繁琐的如下 Button1.Text"显示".Mu(); 在对窗口进行多语言化SG…

C++中的操作系统级信号处理——signal与sigaction

在多进程的编程中,信号是一种非常重要的多进程通讯手段。而进程间的信号很大情况是和操作系统是相关的,或者说很多信号是从操作系统中过来的。 我们这一篇就来说一下操作系统的信号。 操作系统中的信号其实在操作系统中可以称作是中断,可以理解为一个循环执行的程序中突然…

第2章:CSS基本语法 --[CSS零基础入门]

CSS&#xff08;层叠样式表&#xff0c;Cascading Style Sheets&#xff09;是用来描述HTML或XML&#xff08;包括各种XML&#xff1a;SVG, MathML 或 XHTML&#xff09;等文档的外观和格式的语言。以下是CSS的基本语法&#xff1a; 1.选择器 1.元素选择器 元素选择器是基于…