【kafka实战】05 Kafka消费者消费消息过程源码剖析

embedded/2025/2/13 3:01:59/

1. 概述

Kafka消费者(Consumer)是Kafka系统中负责从Kafka集群中拉取消息的客户端组件。消费者消费消息的过程涉及多个步骤,包括消费者组的协调、分区分配、消息拉取、消息处理等。本文将深入剖析Kafka消费者消费消息的源码,并结合相关原理图进行讲解。

以下是一个使用 Java 编写的 KafkaConsumer 的示例。在这个示例中,我们将创建一个简单的 Kafka 消费者,连接到 Kafka 集群,订阅一个主题,并消费该主题中的消息。

1.1 消费者代码使用示例

  • 已经安装并启动了 Kafka 集群。
  • 你已经添加了 Kafka 客户端依赖到你的项目中。如果你使用 Maven,可以在 pom.xml 中添加以下依赖:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

示例代码

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {private static final String TOPIC_NAME = "test-topic";private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String GROUP_ID = "test-group";public static void main(String[] args) {// 配置消费者属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// 自动提交偏移量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 键和值的反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建 Kafka 消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);try {// 订阅主题consumer.subscribe(Collections.singletonList(TOPIC_NAME));// 持续消费消息while (true) {// 从 Kafka 拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}} catch (Exception e) {e.printStackTrace();} finally {// 关闭消费者consumer.close();}}
}

代码解释

  1. 配置消费者属性

    • BOOTSTRAP_SERVERS_CONFIG:指定 Kafka 集群的地址。
    • GROUP_ID_CONFIG:指定消费者所属的消费组。
    • ENABLE_AUTO_COMMIT_CONFIG:设置是否自动提交偏移量。
    • AUTO_COMMIT_INTERVAL_MS_CONFIG:设置自动提交偏移量的时间间隔。
    • KEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG:指定键和值的反序列化器。
  2. 创建 Kafka 消费者实例:使用配置好的属性创建 KafkaConsumer 实例。

  3. 订阅主题:使用 subscribe 方法订阅指定的主题。

  4. 持续消费消息:使用 poll 方法从 Kafka 拉取消息,并遍历消费记录,打印消息的偏移量、键和值。

  5. 关闭消费者:在消费完成后,使用 close 方法关闭消费者。

注意事项

  • 确保 Kafka 集群的地址和主题名称正确。
  • 如果需要手动提交偏移量,可以将 ENABLE_AUTO_COMMIT_CONFIG 设置为 false,并使用 commitSync()commitAsync() 方法手动提交偏移量。

2. Kafka消费者消费消息的核心流程

Kafka消费者消费消息的核心流程可以分为以下几个步骤:

  1. 消费者组协调:消费者加入消费者组,并与组协调器(GroupCoordinator)进行协调。
  2. 分区分配:组协调器为消费者分配分区。
  3. 消息拉取:消费者从分配的分区中拉取消息。
  4. 消息处理:消费者处理拉取到的消息。
  5. 提交偏移量:消费者提交已处理消息的偏移量。

下面我们将结合源码详细分析每个步骤。
在这里插入图片描述

3. 源码剖析


关键组件说明

  1. ConsumerCoordinator

    • 负责消费者组的协调和分区分配。
    • 管理消费者的心跳和重平衡。
  2. Fetcher

    • 负责从 Kafka Broker 拉取消息。
    • 管理拉取请求和响应的处理。
  3. SubscriptionState

    • 管理消费者订阅的主题和分区。
    • 记录消费者的消费偏移量。
  4. PartitionAssignor

    • 负责分区分配策略的实现。
  5. OffsetCommitCallback

    • 处理偏移量提交的回调逻辑。

3.1 消费者组协调

消费者在启动时,首先需要加入消费者组,并与组协调器进行协调。组协调器负责管理消费者组的成员和分区分配。

// org.apache.kafka.clients.consumer.KafkaConsumer#subscribe
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {// 1. 订阅主题this.subscriptions.subscribe(new HashSet<>(topics), listener);// 2. 加入消费者组coordinator.subscribe(subscriptions);
}

subscribe方法中,消费者首先订阅指定的主题,然后通过coordinator.subscribe方法加入消费者组。组协调器会为消费者分配一个唯一的memberId,并将其加入到消费者组中。

3.2 分区分配

组协调器在消费者加入消费者组后,会为消费者分配分区。分区分配策略由PartitionAssignor决定,Kafka提供了多种内置的分区分配策略,如RangeAssignorRoundRobinAssignor等。

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensurePartitionAssignment
private void ensurePartitionAssignment() {// 1. 获取分区分配结果Map<String, List<TopicPartition>> assignments = partitionAssignor.assign(metadata.fetch(), subscriptions.subscription());// 2. 更新消费者的分区分配subscriptions.assignFromSubscribed(assignments.get(consumerId));
}

ensurePartitionAssignment方法中,组协调器通过partitionAssignor.assign方法为消费者分配分区,并将分配结果更新到消费者的订阅信息中。

3.3 消息拉取

消费者在分配到分区后,会从分配的分区中拉取消息。Kafka消费者采用拉取模式(Pull Model),即消费者主动从Kafka集群中拉取消息。

// org.apache.kafka.clients.consumer.KafkaConsumer#poll
public ConsumerRecords<K, V> poll(Duration timeout) {// 1. 拉取消息Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);// 2. 返回拉取到的消息return new ConsumerRecords<>(records);
}

poll方法中,消费者通过fetcher.fetchRecords方法从Kafka集群中拉取消息。fetcher是Kafka消费者中的一个重要组件,负责管理消息的拉取和偏移量的提交。

3.4 消息处理

消费者在拉取到消息后,会对消息进行处理。消息处理的具体逻辑由用户自定义,通常包括消息的反序列化、业务逻辑处理等。

// org.apache.kafka.clients.consumer.KafkaConsumer#poll
public ConsumerRecords<K, V> poll(Duration timeout) {// 1. 拉取消息Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);// 2. 处理消息for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {for (ConsumerRecord<K, V> record : entry.getValue()) {// 用户自定义的消息处理逻辑processRecord(record);}}// 3. 返回拉取到的消息return new ConsumerRecords<>(records);
}

poll方法中,消费者通过processRecord方法处理每条消息。processRecord方法的具体实现由用户自定义。

3.5 提交偏移量

消费者在处理完消息后,需要提交已处理消息的偏移量。偏移量的提交可以手动或自动进行,Kafka提供了多种偏移量提交策略,如自动提交、同步提交、异步提交等。

// org.apache.kafka.clients.consumer.KafkaConsumer#commitSync
public void commitSync() {// 1. 提交偏移量coordinator.commitOffsetsSync(subscriptions.allConsumed());
}

commitSync方法中,消费者通过coordinator.commitOffsetsSync方法同步提交偏移量。同步提交会阻塞当前线程,直到偏移量提交成功。

4. 原理图

以下是Kafka消费者消费消息的核心流程示意图:

+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|  消费者组协调      | ----> |     分区分配       | ----> |     消息拉取       |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+|v
+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|     消息处理       | <---- |     提交偏移量     | <---- |     网络传输       |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+

5. 总结

Kafka消费者消费消息的过程涉及多个步骤,包括消费者组的协调、分区分配、消息拉取、消息处理和偏移量提交。通过源码剖析,我们可以更深入地理解Kafka消费者的工作原理。希望本文能够帮助你更好地理解Kafka消费者的内部机制。

6. 参考

  • Kafka官方文档
  • Kafka源码

http://www.ppmy.cn/embedded/161085.html

相关文章

C++使用Json保存配置参数

文章目录 1. **设计配置文件的结构**示例配置文件 `config.json`:2. **解析 JSON 配置文件**解析过程:示例:3. **修改配置**修改流程:示例:4. **保存配置文件**示例:5. **完整的操作流程**6. **示例代码分析**主要部分:7. **优势**8. **总结**使用 JSON 来管理配置文件的…

[权限提升] Linux 提权 — 系统内核溢出漏洞提权

关注这个专栏的其他相关笔记&#xff1a;[内网安全] 内网渗透 - 学习手册-CSDN博客 0x01&#xff1a;系统内核溢出漏洞提权介绍 注意&#xff1a;提权很容易让系统崩溃&#xff0c;所以如果是测试的话&#xff0c;提权前最好做好系统备份。 Linux 系统内核溢出提权漏洞与之前的…

【学Rust写CAD】5 三维转换矩阵解析及应用示例

三维转换矩阵是指将一个三维空间中的坐标系转换为另一个三维空间中的坐标系所需要的矩阵。在计算机图形学、计算机视觉等领域&#xff0c;三维转换矩阵是非常重要的基础知识。完整的三维转换矩阵为一个4x4的方阵。 [ 1 0 0 0 d x x x y x z x d y x y y y z y d z x z y z z z…

【JVM详解四】执行引擎

一、概述 Java程序运行时&#xff0c;JVM会加载.class字节码文件&#xff0c;但是字节码并不能直接运行在操作系统之上&#xff0c;而JVM中的执行引擎就是负责将字节码转化为对应平台的机器码让CPU运行的组件。 执行引擎是JVM核心的组成部分之一。可以把JVM架构分成三部分&am…

C++ 设计模式 - 访问者模式

一&#xff1a;概述 访问者模式将作用于对象层次结构的操作封装为一个对象&#xff0c;并使其能够在不修改对象层次结构的情况下定义新的操作。 《设计模式&#xff1a;可复用面向对象软件的基础》一书中的访问者模式因两个原因而具有传奇色彩&#xff1a;一是因为它的复杂性&a…

大模型推理——MLA实现方案

1.整体流程 先上一张图来整体理解下MLA的计算过程 2.实现代码 import math import torch import torch.nn as nn# rms归一化 class RMSNorm(nn.Module):""""""def __init__(self, hidden_size, eps1e-6):super().__init__()self.weight nn.Pa…

1分钟基于腾讯云TI平台搭建DeepSeek大模型

前言 蛇年王炸消息&#xff0c;由深度求索人工智能基础技术研究有限公司开发的先进人工智能模型DeepSeek&#xff0c;最近持续火爆&#xff0c;DeepSeek大模型以其强大的自然语言处理能力和高效的学习速度&#xff0c;为各行各业带来了前所未有的创新机遇。而腾讯云TI平台&…

Unity笔试常考

线程同步的几种方式 1.信号量pv操作 2.互斥加锁 3.条件变量 五层网络协议指的是哪五层 1.应用层 2.运输层 3.网络层 4.链路层 5.物理层 TCP和UDP区别 tcp 面向连接&#xff0c;保证发送顺序&#xff0c;速度慢&#xff0c;必须在线&#xff0c;三次握手&#xff0c;4次挥手…