Kafka源码分析之Producer数据发送流程(四)

news/2024/11/28 21:41:18/

概述

书接上回的producer发送流程,在准备工作完成后,kafka的producer借助SenderKafkaClient两大组件完成了数据的发送。其底层封装了java的NIO的组件channle以及selector,对于NIO组件不太熟悉的同学可以自行查询相关文档。

下面我整理了kafka发送数据的主要流程,下面是流程图:

下面根据上面的流程图一步一步剖析kafka的源码。

源码分析

接上第一节的producer主流程,这里代码唤醒sender完成消息发送。并且producer的所有和kafka服务端交互的都是在sender中完成的。

     //6.发送消息。if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}

 sender实现了Runnable接口,所以只需要看run方法:

//一次启动,一直运行(除非关闭)    
public void run() {log.debug("Starting Kafka producer I/O thread.");// main loop, runs until close is calledwhile (running) {try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}}
.....void runOnce() {long currentTimeMs = time.milliseconds();//发送请求long pollTimeout = sendProducerData(currentTimeMs);//poll NIO事件client.poll(pollTimeout, currentTimeMs);}

进入sendProducerData方法,下面标注了主要的流程:

 private long sendProducerData(long now) {//1.获取元数据Cluster cluster = metadata.fetch();//2.获取到发送消息的主机leader// get the list of partitions with data ready to sendRecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);// if there are any partitions whose leaders are not known yet, force metadata updateif (!result.unknownLeaderTopics.isEmpty()) {// The set of topics with unknown leader contains topics with leader election pending as well as// topics which may have expired. Add the topic again to metadata to ensure it is included// and request metadata update, since there are messages to send to the topic.for (String topic : result.unknownLeaderTopics)this.metadata.add(topic, now);log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",result.unknownLeaderTopics);this.metadata.requestUpdate();}// remove any nodes we aren't ready to send to//3.遍历主机Iterator<Node> iter = result.readyNodes.iterator();long notReadyTimeout = Long.MAX_VALUE;while (iter.hasNext()) {Node node = iter.next();if (!this.client.ready(node, now)) {// 4.移除发送消息的服务器iter.remove();notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));}}// create produce requests// 5.合并发往同一主机的请求,第一次请求batches为空Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);addToInflightBatches(batches);if (guaranteeMessageOrder) {// Mute all the partitions drainedfor (List<ProducerBatch> batchList : batches.values()) {for (ProducerBatch batch : batchList)this.accumulator.mutePartition(batch.topicPartition);}}// 6.超时accumulator.resetNextBatchExpiryTime();List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);expiredBatches.addAll(expiredInflightBatches);// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why// we need to reset the producer id here.if (!expiredBatches.isEmpty())log.trace("Expired {} batches in accumulator", expiredBatches.size());for (ProducerBatch expiredBatch : expiredBatches) {String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";failBatch(expiredBatch, new TimeoutException(errorMessage), false);if (transactionManager != null && expiredBatch.inRetry()) {// This ensures that no new batches are drained until the current in flight batches are fully resolved.transactionManager.markSequenceUnresolved(expiredBatch);}}sensors.updateProduceRequestMetrics(batches);// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data// that aren't ready to send since they would cause busy looping.long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);pollTimeout = Math.max(pollTimeout, 0);if (!result.readyNodes.isEmpty()) {log.trace("Nodes with data ready to send: {}", result.readyNodes);// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;pollTimeout = 0;}//7.发送数据sendProduceRequests(batches, now);return pollTimeout;}

第一次请求时会走到 if (!this.client.ready(node, now)),然后检查网络连接是否准备好,然后通过代理的NIO Selector初始化网络连接,并注册channel监听事件:

registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
    public boolean ready(Node node, long now) {if (node.isEmpty())throw new IllegalArgumentException("Cannot connect to empty node " + node);// 网络是否连接好if (isReady(node, now))return true;//是否可以建立网络连接if (connectionStates.canConnect(node.idString(), now))// if we are interested in sending to a node and we don't have a connection to it, initiate one// 初始化连接initiateConnect(node, now);return false;}private void initiateConnect(Node node, long now) {String nodeConnectionId = node.idString();try {connectionStates.connecting(nodeConnectionId, now, node.host());InetAddress address = connectionStates.currentAddress(nodeConnectionId);log.debug("Initiating connection to node {} using address {}", node, address);    // 封装了NIO的selector,相当于加了一层代理selector.connect(nodeConnectionId,new InetSocketAddress(address, node.port()),this.socketSendBuffer,this.socketReceiveBuffer);} catch (IOException e) {log.warn("Error connecting to node {}", node, e);// Attempt failed, we'll try again after the backoffconnectionStates.disconnected(nodeConnectionId, now);// Notify metadata updater of the connection failuremetadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());}}

网络连接 建立完成回到主流程,此时返回ready方法翻译false,移除所有发送消息服务器。

   if (!this.client.ready(node, now)) {
                // 4.移除发送消息的服务器
                iter.remove();

 这时将获取不到批次,退回到runOnce方法中。然后进入

client.poll(pollTimeout, currentTimeMs)方法。
这里就到了网络请求处理组件NetworkClient了。核心方法:this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));

在进入Selector里面去,这里核心方法是:

pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);

这里监听了各类事件,然后对不同事件进行了处理。

通过TransportLayer代理对象,最终调用NIO的socketChannel完成网络连接,并且通过取反或的方式移除了连接事件,注册读取数据事件。

   public boolean finishConnect() throws IOException {//完成网络连接boolean connected = socketChannel.finishConnect();if (connected)// 移除OP_CONNECT,添加OP_READ事件key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);return connected;}

然后while再次循环,进入sendProducerData方法。

 下面是发送请求数据方法流程:

sendProduceRequests(batches, now); >> 
client.send(clientRequest, now); >>
doSend(request, false, now); >>
doSend(clientRequest, isInternalRequest, now, builder.build(version)); >>

讲发送的数据封装到inFlightRequest中,然后调用selector发送请求。

 //最大容忍多少个请求没有响应,默认是5个private final InFlightRequests inFlightRequests;private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {String destination = clientRequest.destination();RequestHeader header = clientRequest.makeHeader(request.version());if (log.isDebugEnabled()) {log.debug("Sending {} request with header {} and timeout {} to node {}: {}",clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);}Send send = request.toSend(header);InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);this.inFlightRequests.add(inFlightRequest);selector.send(new NetworkSend(clientRequest.destination(), send));}

这里最后给selector绑定了读取数据的事件:

    public void send(NetworkSend send) {String connectionId = send.destinationId();KafkaChannel channel = openOrClosingChannelOrFail(connectionId);if (closingChannels.containsKey(connectionId)) {// ensure notification via `disconnected`, leave channel in the state in which closing was triggeredthis.failedSends.add(connectionId);} else {try {//核心方法channel.setSend(send);} ...}}public void setSend(NetworkSend send) {if (this.send != null)throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);this.send = send;//绑定读取数据的事件this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}public void addInterestOps(int ops) {key.interestOps(key.interestOps() | ops);}

然后又走到 client的poll方法,然后selector写数据:

// 写请求数据
attemptWrite(key, channel, nowNanos);
//移除写事件
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

最后就是读取服务器的响应数据:

//读取服务端事件,之前已经注册了读事件
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel)&& !explicitlyMutedChannels.contains(channel)) {attemptRead(channel);
}

读取数据时kafka采用了4字节标识数据长度来避免粘包黏包的问题:

 通过下面代码保证必须读取到完整的数据才会返回。 

     //4字节表示数据大小if (size.hasRemaining()) {int bytesRead = channel.read(size);if (bytesRead < 0)throw new EOFException();read += bytesRead;if (!size.hasRemaining()) {size.rewind();//4字节int receiveSize = size.getInt();if (receiveSize < 0)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");if (maxSize != UNLIMITED && receiveSize > maxSize)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)if (receiveSize == 0) {buffer = EMPTY_BUFFER;}}}public boolean complete() {//是否读完return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}


http://www.ppmy.cn/news/41748.html

相关文章

Android 中打开音频流所用的配置

我们在 AudioPolicyManager::onNewAudioModulesAvailableInt(DeviceVector *newDevices) 函数中看到它创建了 SwAudioOutputDescriptor 对象&#xff0c;后者的构造函数的定义 (位于 frameworks/av/services/audiopolicy/common/managerdefinitions/src/AudioOutputDescriptor.…

今年最火的拼团玩法,全民拼购模式,这个点子你可以看看

什么是全民拼购&#xff1f; “全民拼购”是将“社交电商”与“拼购玩法”结合起来的一种全新的营销模式&#xff0c;它通过更人性化的方式&#xff0c;将商品的销售利益最大化&#xff0c;既能激励消费者参与&#xff0c;又能促进商品的销售&#xff0c;实现“多方共赢”。 …

十分钟在 macOS 快速搭建 Linux C/C++ 开发环境

有一个使用了 Epoll 的 C 项目&#xff0c;笔者平时用的 Linux 主力开发机不在身边&#xff0c;想在 macOS 上开发调试&#xff0c;但是没有 Linux 虚拟机。恰好&#xff0c;JetBrains CLion 的 Toolchains 配置除了使用本地环境&#xff0c;还支持 SSH、Docker。 笔者使用 CL…

学习系统编程No.19【进程间通信之控制进程】

引言&#xff1a; 北京时间&#xff1a;2023/4/13/8:00&#xff0c;早八人&#xff0c;早八魂&#xff0c;时间不怎么充足&#xff0c;磨磨引言刚好&#xff0c;学习Linux和Linux有关的系统级知识已经许久了&#xff0c;在不知不觉之中&#xff0c;发现自己已经更到了第19篇&a…

【Android笔记92】Android小案例(二)之模仿小米商城(底部菜单栏、Fragment页面切换)

这篇文章,主要介绍Android小案例(二)之模仿小米商城(底部菜单栏、Fragment页面切换)。 目录 一、模仿小米商城 1.1、运行效果 1.2、底部菜单栏 (1)布局文件

Map排序

(一&#xff09;treeHap 特点&#xff1a;treeMap中的元素根据键的大小自然排序&#xff08;默认是升序&#xff09; 1、treeHap遍历测试 import java.io.IOException; import java.util.*; public class Main {public static void main(String[] args)throws IOException {…

Excel VBA 之For Next循环

一.简单for循环 当你知道你需要重复运行多少次某段语句时&#xff0c;可以使用For…Next语句。它的语法如下&#xff1a; For 计数器 开始 To 结束 [step 步长]语句1语句2[Exit For]语句N Next [计数器] 从开始到结束&#xff0c;反复执行For和Next之间的指令块&#xff0c;除…

【Docker】Dockerfile简介

介绍 Dockerfile是用来构建Docker镜像的文本文件&#xff0c;是由一条条构建镜像所需的指令和参数构成的脚本。 我们使用docker commit可以构造镜像&#xff0c;但是docker中的镜像随时变化&#xff0c;不能一次次的使用commit&#xff0c;因此使用Dockerfile来一次性构建。 …