Kafka由浅入深(6) Sender线程执行源码解析

张开发
2026/4/16 4:34:10 15 分钟阅读

分享文章

Kafka由浅入深(6) Sender线程执行源码解析
一、KafkaProducer消息流程图1.1 KafkaProducer 消息架构图1.2 KafkaProducer 消息架构分为两部分第一部分是KafkaProducer主线程主要逻辑提供消息拦截器、序列化器、和分区器的默认实现和对外自定义扩展功能已经将消息追加并缓存到累加器RecordAccumulator中为Sender线程提供批量数据队列和发送分区的准备工作。备注1、对于主线程的原理分析可以看之前的文章Kafka由浅入深2 生产者主线程工作原理上_架构源启的博客-CSDN博客_kafka 生产者线程Kafka由浅入深4生产者主线程工作原理下_架构源启的博客-CSDN博客2、详细解说RecordAccumulator可以看这边文章Kafka由浅入深5RecordAccumulator的工作原理_架构源启的博客-CSDN博客第二部分是Sender线程当ProducerBatch已经满了或者是新创建的ProducerBatch则唤醒Sender线程执行发送任务,Sender从RecordAccumulator缓存中取出发送的数据通过NetwordClient将数据发送到Kafka服务器中。Sender线程详细的原理和实现也就是本篇文章详细分析1.3 Sender线程执行流程图二、Sender线程源码分析Sender 是一个 Kafka 生产者消息执行的网络线程无限循环运行在后台将 ProduceRequests 发送到 Kafka 集群中。2.1 Sender线程的定义和创建2.1.1 Sender线程定义Sender继承了Runnable接口而Sender线程是在KafkaProducer 的构造器中进行实例化和线程启动。当KafkaProducer被创建时也会在构造器中实例化以 “kafka-producer-network-thread |[clientId]”命名的线程而这个线程作为守护线程伴随着KafkaProducer整个生命周期。从下面的源码可以看出Sender线程的run()方法中核心发送入口方法为runOnce()。每一次执行runOnce()方法Sender 将从RecordAccumulator记录累加器中获取1~max.request.size 个 ProducerBatch的消息数据并最终将 ProduceRequests 发送到对应活动的 Kafka broker服务器中。public class Sender implements Runnable { /** * The main run loop for the sender thread */ Override public void run() { log.debug(Starting Kafka producer I/O thread.); // main loop, runs until close is called while (running) { try { runOnce(); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } log.debug(Beginning shutdown of Kafka producer I/O thread, sending remaining records.); ....省略.... } }2.1.2 KafkaProducer中实例化Sender线程public class KafkaProducerK, V implements ProducerK, V { ...省略 // KafkaProducer构造器中对Sender线程进行创建和线程启动 KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptorsK, V interceptors, Time time) { ...省略 // Sender线程的创建和启动 this.sender newSender(logContext, kafkaClient, this.metadata); String ioThreadName NETWORK_THREAD_PREFIX | clientId; this.ioThread new KafkaThread(ioThreadName, this.sender, true); // Sender线程启动 this.ioThread.start(); ...省略 } // Sender线程的创建 Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) { // 最大InFlightConnection请求链接数 int maxInflightRequests producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); // 请求超时时间 int requestTimeoutMs producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); // 创建ChannelBuilder,提供服务Channel ChannelBuilder channelBuilder ClientUtils.createChannelBuilder(producerConfig, time, logContext); ProducerMetrics metricsRegistry new ProducerMetrics(this.metrics); // kafka数据传感器 Sensor throttleTimeSensor Sender.throttleTimeSensor(metricsRegistry.senderMetrics); // 创建KafkaClient客户端默认为NetworkClient KafkaClient client kafkaClient ! null ? kafkaClient : new NetworkClient( new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, producer, channelBuilder, logContext), metadata, clientId, maxInflightRequests, producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG), producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), requestTimeoutMs, producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG), time, true, apiVersions, throttleTimeSensor, logContext); short acks Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG)); // 创建Sender线程 return new Sender(logContext, client, metadata, this.accumulator, maxInflightRequests 1, producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, producerConfig.getInt(ProducerConfig.RETRIES_CONFIG), metricsRegistry.senderMetrics, time, requestTimeoutMs, producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); } }2.1.3 Sender实例化参数说明参数和实现功能说明InFlightRequest: 飞行途中的请求Sender线程发送到Kafka之前会保存数据到InFlightRequest中主要作用是缓存已经发送但未接收到响应的请求保存的数据模型为MapNodeId,DequeRequest。KafkaProducer 在创建Sender时会设置如下属性1、max.in.flight.requests.per.connection(maxInflightRequests) 设置每一个InFlightRequest链接最多缓存多少个未收到响应的请求默认值为5。如果缓存的数据到达了最大值则消息发送处于阻塞状态。如果需要保证消息顺序发送可以将配置设置为1但会影响到消息发送的执行效率降低消息发送的吞吐量2、request.timeout.ms(requestTimeoutMs):KafkaProducer 等待请求响应的最长时间默认值为 30000 ms。 请求超时之后可以选择进行重试。 注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大这样可以减少因客户端重试而引起的消息重复的概率。3、max.request.size这个参数用来限制生产者客户端能够发送的消息的最大值默认值为 1048576 B即 1 MB。 一般情况下这个默认值就可以满足大多数的应用场景了。 不建议盲目地增大这个参数的配置值尤其是在对 Kafka 整体脉络没有足够把控的时候。 因为这个参数还涉及一些其他参数的联动比如 broker 端的 message.max.bytes 参数如果配置错误可能会引起一些不必要的一场。 比如讲 broker 端端 message.max.bytes 参数配置为 10 而 max.request.size 参数配置为 20那么当我们发送一条消息大小为 15 的消息时生产者客户端就会报出异常 org.apache.kafka.common.errors.RecordTooLargeException: The reqeust included a message larger than the max message size the server will accept.4、retries和 retry.backoff.ms retries 参数用来配置生产者重试的次数默认值为 0即发生异常的时候不进行任何的重试动作。 消息在从生产者发出道成功写入服务器之前可能发生一些临时性的异常比如网络抖动、Leader 副本的选举等这种异常往往是可以自行恢复的生产者可以通过配置 retries 大于 0 的值以此通过内部重试来恢复而不是一味的将异常抛给生产者的应用程序。 如果重试达到设定的次数那么生产者就会放弃重试并返回异常。 不过并不是所有的异常都是可以通过重试来解决的比如消息太大超过 max.request.size 参数配置的值时这种方式就不行了。 重试还和另一个参数 retry.backoff.ms 有关这个参数的默认值为 100它用来设定两次重试之间的时间间隔避免无效的频繁重试。 在配置 retries 和 retry.backoff.ms 之前最好先估算一下可能的异常恢复时间这样可以设定总的重试时间大于这个异常恢复时间以此来避免生产者过早地放弃重试。 Kafka 可以保证同一个分区中的消息时有序的。 如果生产者按照一定的顺序发送消息那么这些消息也会顺序的写入分区进而消费者也可以按照同样的顺序消费它们。5、acks 消息应答机制默认值即为 1ack 1—— 生产者发送消息后只要分区的leader副本写入消息成功则生产者客户端就会收到服务器的成功响应视为消息发送成功。ack 为1 的时候保证客消息发送的吞吐量但是在消息可靠性上不能完全保证。会出现如下的情况a、如果消息无法写入leader副本则生产者会收到错误响应无法写入的情况通常情况为leader副本崩溃、leader副本的重新选举期间。所以为了避免消息发送丢失的情况生产者可以进行消息发送的失败重试。b、在极端情况下可能会出现消息写入成功写入leader副本并且生产者客户端收到了服务器的成功响应。而此时leader副本崩溃但是其他follower副本未及时更新到leader副本的数据在新选举出的leader副本中则服务查询到这一条数据的记录也就出现了消息丢失的情况。acks 0—— 生成者发送消息后不用等待任何服务端的成功与否的响应。如果消息写入到Kafak服务端出现异常从而可能导致消息丢失的情况。设置为0 可以达到消息发送的最大吞吐量但是也可能出现消息丢失的情况。acks -1 或 acks all——生产者发送后需要等待 leader副本将消息同步到所有follower副本则返回消息成功从而保证消息的可靠性。2.2 Sender的线程的唤起执行wakeup首先我们需要知道Sender线程的执行是在哪里阻塞的。Kafka客户端的底层网络通信是通过NIO实现Kafka 的Selecotor类中我们可以看到java.nio.channels.Selector nioSelector对Kafak服务器的通信核心是通过nioSelector 来实现。当调用nioSelector.select()方法时调用者线程会进入阻塞状态直到有就绪的Channel才会返回。而java.nio.channels.Selector 也提供了从阻塞状态唤起的方式则是可以通过其他的线程调用nioSelector.wakeup进行阻塞线程的唤醒则select()方法也会立即返回。public class Selector implements Selectable, AutoCloseable { private final java.nio.channels.Selector nioSelector; /** * Interrupt the nioSelector if it is blocked waiting to do I/O. */ Override public void wakeup() { this.nioSelector.wakeup(); } }wake()被调用的地方KafkaProducer 的doSend(),initTransactions(), sendOffsetsToTransaction(), commitTransaction(), abortTransaction(), waitOnMetadata()和 flush()方法Sender类的 initiateClose()方法三、Sender线程核心流程源码分析3.1 Sender线程run()方法逻辑Sender线程的状态通过两个变量控制分别是running和forceCloserunning: Sender线程是否正在运行中forceClose: 是否强制关闭正在发送和待发送的消息强制关闭状态忽略未发送和正在发送中的消息分三种情况1、状态一 running状态:sender线程处于running状态则一直循环执行runOnce()2、状态二 非强制关闭状态a、停止接受新的请求如果事务管理器、累加器或等待确认的过程中可能仍有请求请等到这些完成。b、如果任何提交或中止未通过事务管理器的队列则中止事务3、强制关闭状态a、将所有未完成的事务请求和batchs置为失败并唤醒正在等待的线程b、我们需要使所有不完整的事务请求和批处理失败并唤醒等待未来的线程。Sender.run()源码分析// sender线程运行状态 private volatile boolean running; // 强制关闭状态忽略未发送和正在发送中的消息 private volatile boolean forceClose; Override public void run() { log.debug(Starting Kafka producer I/O thread.); // 状态一 running状态: sender线程处于running状态则一直循环执行runOnce() while (running) { try { runOnce(); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } log.debug(Beginning shutdown of Kafka producer I/O thread, sending remaining records.); // 状态二 非强制关闭状态 //停止接受新的请求如果事务管理器、累加器或等待确认的过程中可能仍有请求请等到这些完成。 while (!forceClose ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() 0) || hasPendingTransactionalRequests())) { try { runOnce(); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } //如果任何提交或中止未通过事务管理器的队列则中止事务 while (!forceClose transactionManager ! null transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info(Aborting incomplete transaction due to shutdown); transactionManager.beginAbort(); } try { runOnce(); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } // 状态三 强制关闭状态 if (forceClose) { // 将所有未完成的事务请求和batchs置为失败并唤醒正在等待的线程 //我们需要使所有不完整的事务请求和批处理失败并唤醒等待未来的线程。 if (transactionManager ! null) { log.debug(Aborting incomplete transactional requests due to forced shutdown); transactionManager.close(); } log.debug(Aborting incomplete batches due to forced shutdown); this.accumulator.abortIncompleteBatches(); } try { // KafkaClient Kafka客户端关闭 this.client.close(); } catch (Exception e) { log.error(Failed to close network client, e); } log.debug(Shutdown of Kafka producer I/O thread has completed.); }3.2 Sender线程runOnce()方法逻辑从run()方法的逻辑可以看出runOnce()方法是消息发送的入口a、当事务存在的时候,进行事务处理逻辑详细的事务处理将在后续技术文章中进行讲解b、 调用sendProducerData()方法则是Kafka消息发送的核心处理逻辑void runOnce() { if (transactionManager ! null) { try { // 事务的发送处理逻辑 //尝试解析未解析的序列。如果所有正在执行的请求都已完成并且某些分区仍无法解析则如果可能请中断epoch或者转换为致命错误 transactionManager.maybeResolveSequences(); // 如果事务管理器处于失败状态则不要继续发送 if (transactionManager.hasFatalError()) { RuntimeException lastError transactionManager.lastError(); if (lastError ! null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, time.milliseconds()); return; } //检查我们是否需要新的producerId。如果是这样我们将在下面发送InitProducerId请求 transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); //如果发送或轮询事务请求或者FindCoordinator请求已排队则返回true if (maybeSendAndPollTransactionalRequest()) { return; } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace(Authentication exception while processing transactional request, e); transactionManager.authenticationFailed(e); } } long currentTimeMs time.milliseconds(); // 发送ProducerData核心逻辑 long pollTimeout sendProducerData(currentTimeMs); client.poll(pollTimeout, currentTimeMs); }3.3 Sender线程sendProducerData()的核心逻辑梳理虚线框内的逻辑正式sendProducerData()的实现逻辑private long sendProducerData(long now) { // 获取当前运行并且未阻塞的Kafka集群元数据信息 Cluster cluster metadata.fetch(); // get the list of partitions with data ready to send // 获取准备发送数据的分区node节点集合 RecordAccumulator.ReadyCheckResult result this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update // 如果存在分区的leader未知则强制更新元数据 if (!result.unknownLeaderTopics.isEmpty()) { //未知leader的主题集包含leader选举待定的topic以及可能已过期的topic。 // 再次将topic添加到ProducerMetadata元数据中以确保包含该topic并请求元数据更新因为这些消息要需要发送到对应主题。 for (String topic : result.unknownLeaderTopics) this.metadata.add(topic, now); log.debug(Requesting metadata update due to unknown leader topics from the batched records: {}, result.unknownLeaderTopics); this.metadata.requestUpdate(); } // 移除还没有准备好的node节点 IteratorNode iter result.readyNodes.iterator(); long notReadyTimeout Long.MAX_VALUE; while (iter.hasNext()) { Node node iter.next(); if (!this.client.ready(node, now)) { //仅更新延迟统计的readyTimeMs以便每次批处理就绪时都向前移动那么readyTimeM和drainTimeMs之间的差异将表示数据等待节点的时间。 this.accumulator.updateNodeLatencyStats(node.id(), now, false); iter.remove(); notReadyTimeout Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } else { //更新readyTimeMs和drainTimeMs这将“重置”节点延迟。 this.accumulator.updateNodeLatencyStats(node.id(), now, true); } } // 创建produce请求加入到MapTopicPartition, ListProducerBatch inFlightBatches中 MapInteger, ListProducerBatch batches this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); addToInflightBatches(batches); if (guaranteeMessageOrder) { // Mute all the partitions drained // 将所有排水分区静音 for (ListProducerBatch batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } // 重置消息累加器中下一批次的过期时间 accumulator.resetNextBatchExpiryTime(); // 获取已达到超时待发送的in-flight batches 数据 ListProducerBatch expiredInflightBatches getExpiredInflightBatches(now); // 获取累积器中放置时间过长且需要过期的批次列表。 ListProducerBatch expiredBatches this.accumulator.expiredBatches(now); expiredBatches.addAll(expiredInflightBatches); //如果之前已将过期批次发送给代理请重置生产者id。同时更新过期批次的指标。请参阅TransactionState.resetIdempotentProducerId的文档以了解为什么需要在此处重置生产者id。 if (!expiredBatches.isEmpty()) log.trace(Expired {} batches in accumulator, expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { String errorMessage Expiring expiredBatch.recordCount record(s) for expiredBatch.topicPartition : (now - expiredBatch.createdMs) ms has passed since batch creation; failBatch(expiredBatch, new TimeoutException(errorMessage), false); if (transactionManager ! null expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. //这可确保在当前in-flight batches 完全解决之前不会排出新批次。 transactionManager.markSequenceUnresolved(expiredBatch); } } sensors.updateProduceRequestMetrics(batches); //如果我们有任何节点准备发送具有可发送数据则轮询0超时这样可以立即循环并尝试发送更多数据。 // 否则超时将是下一批到期时间与检查数据可用性的延迟时间之间的较小值。 // 请注意由于延迟、后退等原因节点可能有尚未发送的数据。这特别不包括具有尚未准备好发送的可发送数据的节点因为它们会导致繁忙的循环。 long pollTimeout Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); pollTimeout Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now); pollTimeout Math.max(pollTimeout, 0); if (!result.readyNodes.isEmpty()) { log.trace(Nodes with data ready to send: {}, result.readyNodes); //如果某些分区已经准备好发送则选择时间将为0 // 否则如果某个分区已经积累了一些数据但还没有准备好则选择时间将是现在与其延迟到期时间之间的时间差 // 否则选择时间将是现在与元数据到期时间之间的时间差 pollTimeout 0; } // 消息请求发送 sendProduceRequests(batches, now); return pollTimeout; }3.3.1 获取当前运行并且未阻塞的Kafka集群元数据信息Cluster cluster metadata.fetch();3.3.2 从RecordAccumulator消息累加器中获取到准备发送数据的分区集合RecordAccumulator.ReadyCheckResult result this.accumulator.ready(cluster, now);3.3.3 如果准备发送的分区集合中有分区的leader出现未知情况。未知leader的主题集包含leader选举待定的topic以及可能已过期的topic。 再次将topic添加到ProducerMetadata元数据中以确保包含该topic并请求元数据更新因为这些消息要需要发送到对应主题。通过调用ProducerMetadata元数据的requestUpdate()方法打上needFullUpdate标识标识Kafka集群需要进行强制更新。// 如果存在分区的leader未知则强制更新元数据 if (!result.unknownLeaderTopics.isEmpty()) { // 再次将topic添加到ProducerMetadata元数据中以确保包含该topic并请求元数据更新因为这些消息要需要发送到对应主题。 for (String topic : result.unknownLeaderTopics) this.metadata.add(topic, now); log.debug(Requesting metadata update due to unknown leader topics from the batched records: {}, result.unknownLeaderTopics); this.metadata.requestUpdate(); }3.3.4 移除未准备好的node节点检查当前 KafkaProducer 是否与目标 Node 建立了网络连接如果没有建立则尝试初始化网络连接如果初始化失败则直接返回 false表示此时不适合向该 Node 发送请求。其次就是检查当前已发送但未响应的请求是否已经达到上限要是有很多这种请求存在可能是 broker 宕机了或是 broker 处理能力不足此时也不适合继续发送请求。除了进行网络方面的检查之外还会检查 kafka 元数据是否需要更新如果需要更新的话也不能发送请求。毕竟使用过期的或是错误的元数据来发送数据请求也不会发送成功不适合发送请求的 Node 节点会从 readyNodes 集合中删除。// RecordAccumulator.ReadyCheckResult 获取到所有待发送的节点 IteratorNode iter result.readyNodes.iterator(); long notReadyTimeout Long.MAX_VALUE; while (iter.hasNext()) { Node node iter.next(); // 通过KafkaClient进行检查node节点是否已经准备好 if (!this.client.ready(node, now)) { //仅更新延迟统计的readyTimeMs以便每次批处理就绪时都向前移动那么readyTimeM和drainTimeMs之间的差异将表示数据等待节点的时间。 this.accumulator.updateNodeLatencyStats(node.id(), now, false); iter.remove(); notReadyTimeout Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } else { // Update both readyTimeMs and drainTimeMs, this would reset the node // latency. //更新readyTimeMs和drainTimeMs这将“重置”节点延迟。 this.accumulator.updateNodeLatencyStats(node.id(), now, true); } }3.3.5 创建produce请求将ProducerBatch加入到InFlightBatches中a、调用记录累加器RecordAccumlator.drain(),获取当前可以集群下的所有数据并将它们整理成一个批次列表b、将消息将入到MapTopicPartition, ListProducerBatch inFlightBatches的队列中记录正在发送的这一批数据c、如果需要进行顺序消息发送则将通过数据将ProducerBatch的主题信息记录到RecordAccumlator 的SetTopicPartition muted中//获当前可以集群下的所有数据并将它们整理成一个批次列表 MapInteger, ListProducerBatch batches this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); // 加入到InFlightBatch队列中 addToInflightBatches(batches); if (guaranteeMessageOrder) { // 保证消息顺序发送 // 将所有批次的主题加入到SetTopicPartition muted中 for (ListProducerBatch batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } }3.3.6 获取已经超时待发送的消息数据a、重置消息累加器中下一批次的过期时间b、获取已达到超时待发送的in-flight batches 批次数据c、获取累积器中放置时间过长且需要过期的批次数据。d、将超时待发送的批次数据进行汇总// 重置消息累加器中下一批次的过期时间 accumulator.resetNextBatchExpiryTime(); // 获取已达到超时待发送的in-flight batches 数据 ListProducerBatch expiredInflightBatches getExpiredInflightBatches(now); // 获取累积器中放置时间过长且需要过期的批次列表。 ListProducerBatch expiredBatches this.accumulator.expiredBatches(now); expiredBatches.addAll(expiredInflightBatches);3.3.7 处理已超时的消息批次通知该批消息发送失败//如果之前已将过期批次发送给代理请重置生产者id。同时更新过期批次的指标。请参阅TransactionState.resetIdempotentProducerId的文档以了解为什么需要在此处重置生产者id。 if (!expiredBatches.isEmpty()) log.trace(Expired {} batches in accumulator, expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { String errorMessage Expiring expiredBatch.recordCount record(s) for expiredBatch.topicPartition : (now - expiredBatch.createdMs) ms has passed since batch creation; failBatch(expiredBatch, new TimeoutException(errorMessage), false); if (transactionManager ! null expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. //这可确保在当前in-flight batches 完全解决之前不会排出新批次。 transactionManager.markSequenceUnresolved(expiredBatch); } } // 收集统计指标 sensors.updateProduceRequestMetrics(batches);3.3.8 构建请求发送数据//如果我们有任何节点准备发送具有可发送数据则轮询0超时这样可以立即循环并尝试发送更多数据。 // 否则超时将是下一批到期时间与检查数据可用性的延迟时间之间的较小值。 // 请注意由于延迟、后退等原因节点可能有尚未发送的数据。这特别不包括具有尚未准备好发送的可发送数据的节点因为它们会导致繁忙的循环。 // 设置下一次的发送延时 long pollTimeout Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); pollTimeout Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now); pollTimeout Math.max(pollTimeout, 0); if (!result.readyNodes.isEmpty()) { log.trace(Nodes with data ready to send: {}, result.readyNodes); //如果某些分区已经准备好发送则选择时间将为0 // 否则如果某个分区已经积累了一些数据但还没有准备好则选择时间将是现在与其延迟到期时间之间的时间差 // 否则选择时间将是现在与元数据到期时间之间的时间差 pollTimeout 0; } sendProduceRequests(batches, now);3.4 Sender线程sendProduceRequest()的核心逻辑梳理/** * 从给定的记录批次创建生产请求 */ private void sendProduceRequest(long now, int destination, short acks, int timeout, ListProducerBatch batches) { if (batches.isEmpty()) return; final MapTopicPartition, ProducerBatch recordsByPartition new HashMap(batches.size()); // 找到创建记录集时使用的最小魔法版本 byte minUsedMagic apiVersions.maxUsableProduceMagic(); for (ProducerBatch batch : batches) { if (batch.magic() minUsedMagic) minUsedMagic batch.magic(); } // ProduceRequestData ProduceRequestData.TopicProduceDataCollection tpd new ProduceRequestData.TopicProduceDataCollection(); for (ProducerBatch batch : batches) { TopicPartition tp batch.topicPartition; MemoryRecords records batch.records(); //如有必要向下转换到使用的最小魔法的消息格式。 // 通常情况下生产者开始构建批次以及我们发送请求的时间选择了基于过时元数据的消息格式。 // 在最坏的情况下如果选择使用新的消息格式但发现代理不支持它所系需要在客户端发送之前进行消息格式的向下转换。 // 这旨在处理围绕集群升级的边缘情况其中代理可能并非所有消息都支持相同的消息格式版本。 // 例如如果分区从代理迁移它支持新的魔法版本但不支持就需要转换。 if (!records.hasMatchingMagic(minUsedMagic)) records batch.records().downConvert(minUsedMagic, 0, time).records(); ProduceRequestData.TopicProduceData tpData tpd.find(tp.topic()); if (tpData null) { tpData new ProduceRequestData.TopicProduceData().setName(tp.topic()); tpd.add(tpData); } tpData.partitionData().add(new ProduceRequestData.PartitionProduceData() .setIndex(tp.partition()) .setRecords(records)); recordsByPartition.put(tp, batch); } String transactionalId null; if (transactionManager ! null transactionManager.isTransactional()) { transactionalId transactionManager.transactionalId(); } ProduceRequest.Builder requestBuilder ProduceRequest.forMagic(minUsedMagic, new ProduceRequestData() .setAcks(acks) .setTimeoutMs(timeout) .setTransactionalId(transactionalId) .setTopicData(tpd)); RequestCompletionHandler callback response - handleProduceResponse(response, recordsByPartition, time.milliseconds()); // 创建clientRequest请求 String nodeId Integer.toString(destination); ClientRequest clientRequest client.newClientRequest(nodeId, requestBuilder, now, acks ! 0, requestTimeoutMs, callback); // 通过KafkaClient的实现类NetworkClient 调用发送 client.send(clientRequest, now); log.trace(Sent produce request to {}: {}, nodeId, requestBuilder); }说明源码分析基于Kafak 3.3版本

更多文章