概述
书接上回的producer发送流程,在准备工作完成后,kafka的producer借助Sender和KafkaClient两大组件完成了数据的发送。其底层封装了java的NIO的组件channle以及selector,对于NIO组件不太熟悉的同学可以自行查询相关文档。
下面我整理了kafka发送数据的主要流程,下面是流程图:
下面根据上面的流程图一步一步剖析kafka的源码。
源码分析
接上第一节的producer主流程,这里代码唤醒sender完成消息发送。并且producer的所有和kafka服务端交互的都是在sender中完成的。
//6.发送消息。if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}
sender实现了Runnable接口,所以只需要看run方法:
//一次启动,一直运行(除非关闭)
public void run() {log.debug("Starting Kafka producer I/O thread.");// main loop, runs until close is calledwhile (running) {try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}}
.....void runOnce() {long currentTimeMs = time.milliseconds();//发送请求long pollTimeout = sendProducerData(currentTimeMs);//poll NIO事件client.poll(pollTimeout, currentTimeMs);}
进入sendProducerData方法,下面标注了主要的流程:
private long sendProducerData(long now) {//1.获取元数据Cluster cluster = metadata.fetch();//2.获取到发送消息的主机leader// get the list of partitions with data ready to sendRecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);// if there are any partitions whose leaders are not known yet, force metadata updateif (!result.unknownLeaderTopics.isEmpty()) {// The set of topics with unknown leader contains topics with leader election pending as well as// topics which may have expired. Add the topic again to metadata to ensure it is included// and request metadata update, since there are messages to send to the topic.for (String topic : result.unknownLeaderTopics)this.metadata.add(topic, now);log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",result.unknownLeaderTopics);this.metadata.requestUpdate();}// remove any nodes we aren't ready to send to//3.遍历主机Iterator<Node> iter = result.readyNodes.iterator();long notReadyTimeout = Long.MAX_VALUE;while (iter.hasNext()) {Node node = iter.next();if (!this.client.ready(node, now)) {// 4.移除发送消息的服务器iter.remove();notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));}}// create produce requests// 5.合并发往同一主机的请求,第一次请求batches为空Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);addToInflightBatches(batches);if (guaranteeMessageOrder) {// Mute all the partitions drainedfor (List<ProducerBatch> batchList : batches.values()) {for (ProducerBatch batch : batchList)this.accumulator.mutePartition(batch.topicPartition);}}// 6.超时accumulator.resetNextBatchExpiryTime();List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);expiredBatches.addAll(expiredInflightBatches);// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why// we need to reset the producer id here.if (!expiredBatches.isEmpty())log.trace("Expired {} batches in accumulator", expiredBatches.size());for (ProducerBatch expiredBatch : expiredBatches) {String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";failBatch(expiredBatch, new TimeoutException(errorMessage), false);if (transactionManager != null && expiredBatch.inRetry()) {// This ensures that no new batches are drained until the current in flight batches are fully resolved.transactionManager.markSequenceUnresolved(expiredBatch);}}sensors.updateProduceRequestMetrics(batches);// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data// that aren't ready to send since they would cause busy looping.long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);pollTimeout = Math.max(pollTimeout, 0);if (!result.readyNodes.isEmpty()) {log.trace("Nodes with data ready to send: {}", result.readyNodes);// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;pollTimeout = 0;}//7.发送数据sendProduceRequests(batches, now);return pollTimeout;}
第一次请求时会走到 if (!this.client.ready(node, now)),然后检查网络连接是否准备好,然后通过代理的NIO Selector初始化网络连接,并注册channel监听事件:
registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
public boolean ready(Node node, long now) {if (node.isEmpty())throw new IllegalArgumentException("Cannot connect to empty node " + node);// 网络是否连接好if (isReady(node, now))return true;//是否可以建立网络连接if (connectionStates.canConnect(node.idString(), now))// if we are interested in sending to a node and we don't have a connection to it, initiate one// 初始化连接initiateConnect(node, now);return false;}private void initiateConnect(Node node, long now) {String nodeConnectionId = node.idString();try {connectionStates.connecting(nodeConnectionId, now, node.host());InetAddress address = connectionStates.currentAddress(nodeConnectionId);log.debug("Initiating connection to node {} using address {}", node, address); // 封装了NIO的selector,相当于加了一层代理selector.connect(nodeConnectionId,new InetSocketAddress(address, node.port()),this.socketSendBuffer,this.socketReceiveBuffer);} catch (IOException e) {log.warn("Error connecting to node {}", node, e);// Attempt failed, we'll try again after the backoffconnectionStates.disconnected(nodeConnectionId, now);// Notify metadata updater of the connection failuremetadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());}}
网络连接 建立完成回到主流程,此时返回ready方法翻译false,移除所有发送消息服务器。
if (!this.client.ready(node, now)) {
// 4.移除发送消息的服务器
iter.remove();
这时将获取不到批次,退回到runOnce方法中。然后进入
client.poll(pollTimeout, currentTimeMs)方法。 这里就到了网络请求处理组件NetworkClient了。核心方法:this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));在进入Selector里面去,这里核心方法是:
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
这里监听了各类事件,然后对不同事件进行了处理。
通过TransportLayer代理对象,最终调用NIO的socketChannel完成网络连接,并且通过取反或的方式移除了连接事件,注册读取数据事件。
public boolean finishConnect() throws IOException {//完成网络连接boolean connected = socketChannel.finishConnect();if (connected)// 移除OP_CONNECT,添加OP_READ事件key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);return connected;}
然后while再次循环,进入sendProducerData方法。
下面是发送请求数据方法流程:
sendProduceRequests(batches, now); >>client.send(clientRequest, now); >>doSend(request, false, now); >>doSend(clientRequest, isInternalRequest, now, builder.build(version)); >>
讲发送的数据封装到inFlightRequest中,然后调用selector发送请求。
//最大容忍多少个请求没有响应,默认是5个private final InFlightRequests inFlightRequests;private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {String destination = clientRequest.destination();RequestHeader header = clientRequest.makeHeader(request.version());if (log.isDebugEnabled()) {log.debug("Sending {} request with header {} and timeout {} to node {}: {}",clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);}Send send = request.toSend(header);InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);this.inFlightRequests.add(inFlightRequest);selector.send(new NetworkSend(clientRequest.destination(), send));}
这里最后给selector绑定了读取数据的事件:
public void send(NetworkSend send) {String connectionId = send.destinationId();KafkaChannel channel = openOrClosingChannelOrFail(connectionId);if (closingChannels.containsKey(connectionId)) {// ensure notification via `disconnected`, leave channel in the state in which closing was triggeredthis.failedSends.add(connectionId);} else {try {//核心方法channel.setSend(send);} ...}}public void setSend(NetworkSend send) {if (this.send != null)throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);this.send = send;//绑定读取数据的事件this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}public void addInterestOps(int ops) {key.interestOps(key.interestOps() | ops);}
然后又走到 client的poll方法,然后selector写数据:
// 写请求数据 attemptWrite(key, channel, nowNanos);//移除写事件 transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
最后就是读取服务器的响应数据:
//读取服务端事件,之前已经注册了读事件 if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel)&& !explicitlyMutedChannels.contains(channel)) {attemptRead(channel); }
读取数据时kafka采用了4字节标识数据长度来避免粘包黏包的问题:
通过下面代码保证必须读取到完整的数据才会返回。
//4字节表示数据大小if (size.hasRemaining()) {int bytesRead = channel.read(size);if (bytesRead < 0)throw new EOFException();read += bytesRead;if (!size.hasRemaining()) {size.rewind();//4字节int receiveSize = size.getInt();if (receiveSize < 0)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");if (maxSize != UNLIMITED && receiveSize > maxSize)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)if (receiveSize == 0) {buffer = EMPTY_BUFFER;}}}public boolean complete() {//是否读完return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}