【kafka实战】04 Kafka生产者发送消息过程源码剖析

ops/2025/2/12 8:58:25/

Kafka生产者发送消息过程源码剖析

1. 概述

Kafka生产者(Producer)是Kafka系统中负责将消息发送到Kafka集群的客户端组件。生产者发送消息的过程涉及多个步骤,包括消息的序列化、分区选择、消息累加、批次发送等。本文将深入剖析Kafka生产者发送消息的源码,并结合相关原理图进行讲解。

Kafka 基本概念与术语

  • Topic(主题):Kafka 中的消息分类逻辑单元,类似于数据库中的表。生产者将消息发送到特定的主题,消费者则从相应主题订阅并接收消息。例如,在一个电商系统里,可以有 “订单主题” 用于传递订单相关信息,“用户行为主题” 记录用户浏览、购买等操作,不同类型的业务数据通过主题进行区分,方便管理与处理。
  • Partition(分区):主题进一步细分的物理存储单元,一个主题可以包含多个分区。分区的存在实现了数据的并行读写,提升了 Kafka 的吞吐量。每个分区在存储层面是一个有序的、不可变的消息序列,消息在分区内按照追加的方式写入,通过分区号来标识。比如一个拥有高并发写入需求的 “日志主题”,可以划分多个分区,让不同的日志数据分散到各个分区,避免单点写入瓶颈。
  • Broker(代理):Kafka 集群中的服务器实例,负责存储和转发消息。一个 Kafka 集群通常由多个 Broker 组成,它们协同工作,实现数据的高可用性与负载均衡。每个 Broker 都有自己的 ID,存储着主题的部分或全部分区数据,当生产者发送消息或消费者获取消息时,需要与 Broker 进行交互。
  • Producer(生产者):如前文所述,是消息的生产者,负责将外部系统的数据封装成消息,发送到 Kafka 集群的指定主题。它要处理消息序列化、缓冲、发送策略以及与集群的交互等诸多复杂任务,确保消息高效可靠传输,像实时数据采集系统中的传感器数据采集模块,就可以作为 Kafka 生产者将采集到的数据推送给集群。
  • Consumer(消费者):与生产者相对,是从 Kafka 集群的主题中拉取消息并进行处理的客户端。消费者可以以不同的消费模式运行,如单个消费者独立消费、多个消费者组成消费组共同消费一个主题,消费组内的消费者通过分区分配策略,协同消费主题下的各个分区,实现数据的并行处理,常见于大数据实时分析场景,不同的分析任务作为消费者从相应主题获取数据进行运算。
  • Consumer Group(消费组):多个消费者组成的逻辑分组,主要用于实现消息的负载均衡与容错。同一消费组内的消费者不会重复消费同一个分区的消息,而是按照一定策略分摊主题下各分区的消费任务,当组内某个消费者出现故障时,其他消费者能自动接管其负责的分区消费,保证数据处理的连续性,例如在一个大规模日志分析系统中,多个日志处理进程组成消费组,共同处理来自 “日志主题” 的海量数据。

2. Kafka生产者发送消息的核心流程

在这里插入图片描述

Kafka生产者发送消息的核心流程可以分为以下几个步骤:

  • 消息创建与序列化:生产者创建消息对象,并将消息的键和值进行序列化。

  • 分区选择:根据分区策略选择消息要发送到的分区。

  • 消息累加:将消息添加到消息累加器(RecordAccumulator)中,等待批量发送。

  • 批次发送:当满足一定条件时,将消息批次发送到Kafka集群。

  • 响应处理:处理Kafka集群返回的响应,确保消息发送成功。

下面我们将结合源码详细分析每个步骤。

Kafka 生产者主要由以下几个重要部分构成:

  • RecordAccumulator:消息收集器,用于缓存待发送的消息。生产者会先将消息批量存入这里,而非一条条直接发送,以提升传输效率。
  • Sender:真正负责将消息发送到 Kafka 集群的组件,它从 RecordAccumulator 中获取批量消息,并与集群建立连接,执行发送操作。
  • Metadata:维护 Kafka 集群的元数据信息,例如集群中有哪些 broker,各个主题的分区分布等。生产者依据这些信息决定消息该发往何处。

3. 源码剖析

3.1 消息创建与序列化

生产者发送消息的第一步是创建消息对象,并将消息的键和值进行序列化。Kafka消息的键和值可以是任意类型的数据,但最终需要序列化为字节数组才能通过网络传输。

// org.apache.kafka.clients.producer.KafkaProducer#send
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// 1. 序列化消息的键和值byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());// 2. 分区选择int partition = partition(record, serializedKey, serializedValue, cluster);// 3. 将消息添加到累加器RecordAccumulator.RecordAppendResult result = accumulator.append(record, serializedKey, serializedValue, headers, partition, maxTimeToBlock);// 4. 如果批次已满或创建了新批次,则唤醒发送线程if (result.batchIsFull || result.newBatchCreated) {this.sender.wakeup();}return result.future;
}

在send方法中,首先通过序列化器将消息的键和值序列化为字节数组。Kafka提供了多种内置的序列化器,如StringSerializerByteArraySerializer等,用户也可以自定义序列化器。

3.2 分区选择

Kafka消息发送到哪个分区是由分区器(Partitioner)决定的。默认情况下,Kafka使用DefaultPartitioner,它根据消息的键进行哈希计算,然后根据哈希值选择分区。如果消息没有键,则采用轮询的方式选择分区。

// org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {// 如果消息没有键,则采用轮询方式选择分区int nextValue = counter.getAndIncrement();List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {return Utils.toPositive(nextValue) % availablePartitions.size();} else {return Utils.toPositive(nextValue) % numPartitions;}} else {// 如果消息有键,则根据键的哈希值选择分区return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}
}

3.3 消息累加

消息累加器(RecordAccumulator)是Kafka生产者中的一个重要组件,它负责将消息按分区进行缓存,并等待批量发送。每个分区对应一个消息批次(RecordBatch),当批次大小达到一定阈值或等待时间超过一定阈值时,批次会被发送到Kafka集群。

// org.apache.kafka.clients.producer.internals.RecordAccumulator#append
public RecordAppendResult append(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Header[] headers, int partition, long maxTimeToBlock) {// 获取或创建对应分区的DequeDeque<ProducerBatch> deque = getOrCreateDeque(partition);// 尝试将消息添加到批次中synchronized (deque) {RecordAppendResult result = tryAppend(deque, record, serializedKey, serializedValue, headers);if (result != null) {return result;}}// 如果批次已满或创建了新批次,则返回结果return appendNewBatch(deque, partition, record, serializedKey, serializedValue, headers, maxTimeToBlock);
}

3.4 批次发送

当消息累加器中的批次满足发送条件时,发送线程(Sender)会将批次发送到Kafka集群。发送线程会从累加器中获取准备好的批次,并将其封装成ProducerRequest,然后通过网络发送到Kafka集群。

// org.apache.kafka.clients.producer.internals.Sender#run
public void run() {while (running) {// 从累加器中获取准备好的批次RecordAccumulator.ReadyCheckResult result = accumulator.ready(cluster, now);// 发送批次sendProduceRequests(result.readyNodes, now);}
}

3.5 响应处理

Kafka集群在接收到消息后,会返回一个响应(ProducerResponse)。发送线程会处理这个响应,并根据响应结果更新消息的状态。如果消息发送成功,则调用用户提供的回调函数(Callback);如果发送失败,则根据配置的重试策略进行重试。

// org.apache.kafka.clients.producer.internals.Sender#handleProduceResponse
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {for (Map.Entry<TopicPartition, ProducerBatch> entry : batches.entrySet()) {TopicPartition tp = entry.getKey();ProducerBatch batch = entry.getValue();if (response.wasDisconnected()) {// 处理网络断开的情况handleDisconnection(batch, tp, now);} else if (response.hasResponse()) {// 处理成功响应handleSuccessfulResponse(batch, tp, response, now);} else {// 处理其他错误handleErrorResponse(batch, tp, response, now);}}
}

4. 原理图

以下是Kafka生产者发送消息的核心流程示意图:
在这里插入图片描述

5. 总结

现将发送消息的详细流程总结如下

  • 当应用程序调用生产者的 send 方法发送一条消息时,消息首先会被序列化。Kafka 支持多种序列化方式,如常见的 StringSerializer、AvroSerializer 等,确保消息能以合适的二进制格式传输。序列化后的消息被封装成 ProducerRecord 对象,该对象包含了目标主题、分区信息(若有指定)、键值对等关键数据。
  • 接着,ProducerRecord 进入 RecordAccumulator。这里采用了一种类似缓冲池的机制,消息按照分区分类存放,每个分区都有自己独立的缓冲区。RecordAccumulator 会持续监测各个分区缓冲区的消息数量,一旦达到设定的批量大小(batch.size 参数配置),或者距离上次发送时间超过 linger.ms 设定的时长,就标记该分区的消息为可发送状态。
  • Sender 线程一直在后台运行,它周期性地轮询 RecordAccumulator,查找那些已标记为可发送的分区消息。当发现可发送的消息批次后,Sender 会从 Metadata 组件获取对应的 broker 地址信息,建立与目标 broker 的连接。这里涉及到 TCP 连接的建立、维护以及连接池的管理等复杂逻辑,以确保连接的高效复用与可靠性。
  • 与 broker 成功建立连接后,Sender 使用 Kafka 的协议格式,将消息批次组装成请求发送过去。在这个过程中,需要处理诸如请求超时、重试机制等异常情况。如果发送失败,根据生产者配置的重试次数(retries 参数),会自动进行重试,直到达到重试上限或者成功为止。
  • 一旦 broker 成功接收并处理了消息批次,它会返回一个响应给生产者。Sender 负责解析这个响应,确认消息是否被正确写入 Kafka 日志文件。若出现错误,例如写入副本不足导致的写入失败,生产者可能会根据配置的策略进行调整,如增加重试次数、调整消息发送策略等。

Kafka生产者发送消息的过程涉及多个步骤,包括消息的序列化、分区选择、消息累加、批次发送和响应处理。通过源码剖析,我们可以更深入地理解Kafka生产者的工作原理。希望本文能够帮助你更好地理解Kafka生产者的内部机制。

6. 参考

  • Kafka官方文档
  • Kafka源码

http://www.ppmy.cn/ops/157447.html

相关文章

C# 线程与同步介绍

.NET学习资料 .NET学习资料 .NET学习资料 在 C# 编程中&#xff0c;线程和同步是实现高效、可靠应用程序的关键概念。随着计算机硬件的发展&#xff0c;多核处理器的普及&#xff0c;充分利用多线程技术可以显著提升应用程序的性能和响应速度。而同步机制则是确保多线程环境下…

C#+Redis接收数据并定时3秒钟频率异步保存到数据库

要在C#中实现从Redis接收数据,并以每3秒的频率异步保存到数据库,你可以使用System.Threading.Tasks.Task.Delay或System.Timers.Timer来创建一个定时任务。不过,对于更复杂的定时和调度需求,System.Threading.Tasks.Timer或Quartz.NET等库可能更合适。 在这个场景中,由于…

打家劫舍3

今天和打家讲一下打家劫舍3 题目&#xff1a; 题目链接&#xff1a;337. 打家劫舍 III - 力扣&#xff08;LeetCode&#xff09; 小偷又发现了一个新的可行窃的地区。这个地区只有一个入口&#xff0c;我们称之为root。 除了 root 之外&#xff0c;每栋房子有且只有一个“父“…

LM Studio本地调用模型的方法

首先需要下载LM Studio&#xff0c;&#xff08;LM Studio - Discover, download, and run local LLMs&#xff09;安装好后&#xff0c;需要对index.js文件进行修改&#xff0c;主要是对相关源hugging face的地址修改。 以macOS为例&#xff1a; cd /Applications/LM\ Studi…

网络安全ITP是什么 网络安全产品ips

DS/IPS都是专门针对计算机病毒和黑客入侵而设计的网络安全设备 1、含义不同 IDS &#xff1a;入侵检测系统&#xff08;发现非法入侵只能报警不能自己过滤&#xff09; 做一个形象的比喻&#xff1a;假如防火墙是一幢大楼的门锁&#xff0c;那么IDS就是这幢大楼里的监视系统…

DeepSeekMoE 论文解读:混合专家架构的效能革新者

论文链接&#xff1a;DeepSeekMoE: Towards Ultimate Expert Specialization in Mixture-of-Experts Language Models 目录 一、引言二、背景知识&#xff08;一&#xff09;MoE架构概述&#xff08;二&#xff09;现有MoE架构的问题 三、DeepSeekMoE架构详解&#xff08;一&a…

【Matlab优化算法-第14期】基于智能优化算法的VMD信号去噪项目实践

基于智能优化算法的VMD信号去噪项目实践 一、前言 在信号处理领域&#xff0c;噪声去除是一个关键问题&#xff0c;尤其是在处理含有高斯白噪声的复杂信号时。变分模态分解&#xff08;VMD&#xff09;作为一种新兴的信号分解方法&#xff0c;因其能够自适应地分解信号而受到…

C# 创建 Windows 应用程序教程

.NET学习资料 .NET学习资料 .NET学习资料 在 C# 编程领域中&#xff0c;创建 Windows 应用程序是一项基础且重要的技能。通过 C#&#xff0c;开发者能够构建出功能丰富、用户体验良好的桌面应用。接下来&#xff0c;我们将详细介绍如何使用 C# 创建 Windows 应用程序。 一、…