Kafka 生产者(三)Sender 线程

2020年4月22日 | 作者 Siran | 4200字 | 阅读大约需要9分钟
归档于 消息队列 | 标签 #kafka


在分析了 KafkaProducer 如何把消息存入 RecordAccumulator 中,那么这些ProducerBatch 是如何发送给 Kafka Server 的呢?

如下图: 当每次调用 RecordAccumulator#append( ) 追加消息的的时候,如果发现当前的 ProducerBatch 满了或者说创建了一个新的 ProducerBatch ,那么就会唤醒 Sender 线程来获取消息进行发送。


if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

那这一章,我们就来分析一下 Sender 线程是如何拉取消息并且发送消息的。

2. 源码分析

2.1 Sender 线程

public class Sender implements Runnable {
    //① kafka 网络通信客户端,主要封装与 broker 的网络通信。
    private final KafkaClient client;
    //② 消息记录累积器
    private final RecordAccumulator accumulator;
    //③ 元数据
    private final ProducerMetadata metadata;
    //④ 是否需要保证消息的顺序性。
    private final boolean guaranteeMessageOrder;
    //⑤ 调用 send 方法发送的最大请求大小,通过参数 max.request.size 来设置。
    private final int maxRequestSize;
    //⑥ ack,通过acks 设置
    private final short acks;
    //⑦ 重试次数
    private final int retries;
    private final Time time;
    //⑧ 该线程状态,为 true 表示运行中。
    private volatile boolean running;
    //⑨ 是否强制关闭,此时会忽略正在发送中的消息。
    private volatile boolean forceClose;
    //⑩ 消息发送相关的统计指标收集器。
    private final SenderMetrics sensors;
    //⑪ 请求的超时时间,
    private final int requestTimeoutMs;
    //⑫ 当发送消息失败时,重试的间隔时间,默认100,可以通过`retry.backoff.ms`配置
    private final long retryBackoffMs;
    //⑬ 版本信息
    private final ApiVersions apiVersions;
    //⑭ 事务处理器
    private final TransactionManager transactionManager;
    //⑮ 正在执行发送相关的消息批次。
    private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
2.1.1 Sender线程流程图

由于Sender 线程是实现 Runnable 接口的,所以当它唤醒的时候就会执行 run 方法

  • ① 判断状态是否是运行中,根据 running 属性。
  • ② 如果是 close 状态,那就通过 forceClose 是否要强制关闭。
  • ③ 如果 forceClose 是true的话 那就直接调用NetworkClient#close方法关闭
  • ④ 如果 running 属性是运行中,那么调用 runOnce 进行发送消息
  • ⑤ 调用 Sender#sendProducerData 方法准备要发送的数据封装成 ClientRequest,放入KafkaChannel 的 send字段中,
  • ⑥ 调用 NetworkClient#poll 方法,进行发送消息

2.1.2 run 方法

    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        //① 判断状态
        while (running) {
            try {
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        //② forceClose 
        while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
            try {
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);

        while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
            if (!transactionManager.isCompleting()) {
                log.info("Aborting incomplete transaction due to shutdown");
            try {
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);

        if (forceClose) {
            if (transactionManager != null) {
                log.debug("Aborting incomplete transactional requests due to forced shutdown");
            log.debug("Aborting incomplete batches due to forced shutdown");
        try {
        } catch (Exception e) {
            log.error("Failed to close network client", e);

        log.debug("Shutdown of Kafka producer I/O thread has completed.");

2.1.3 runOnce 方法

    void runOnce() {
        if (transactionManager != null) {
            try {

                if (transactionManager.hasFatalError()) {
                    RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                    client.poll(retryBackoffMs, time.milliseconds());


                if (maybeSendAndPollTransactionalRequest()) {
            } catch (AuthenticationException e) {
                log.trace("Authentication exception while processing transactional request", e);

        long currentTimeMs = time.milliseconds();
        //① sendProducerData 准备发送消息
        long pollTimeout = sendProducerData(currentTimeMs);
        //② 发送消息
        client.poll(pollTimeout, currentTimeMs);

接下来分析比较重要的两个方法 sendProducerDatapoll

2.1.4 sendProducerData 方法2.1.1 中的流程图相对应

private long sendProducerData(long now) {
        //5.1 拉取集群的元数据
        Cluster cluster = metadata.fetch();
        //5.2 调用 accumulator.ready 获取准备好发送给的ProducerBatch
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        //5.3 更新找不到Leader 节点的ProducerBatch 的元数据
        if (!result.unknownLeaderTopics.isEmpty()) {
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic, now);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",

        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        //5.4 调用NetworkClient.ready 把没有准备好的Node 从readyNodes 集合中剔除
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));

        //5.5 调用 accumulator.drain 方法把准备的ProducerBatch 进行转换
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        //5.6 添加到 InflightBatches,主要作用是缓存发送出去的消息但是还没有得到响应的请求
        if (guaranteeMessageOrder) {
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)

        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);

        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        //5.7 处理过期的Batch
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.

        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0;
        //5.8 将ProducerBatch 放入 KafkaChannel 中的send 等待发送
        sendProduceRequests(batches, now);
        return pollTimeout;
2.1.5 NetworkClient.poll 方法 发送数据

public List<ClientResponse> poll(long timeout, long now) {

        if (!abortedSends.isEmpty()) {
            List<ClientResponse> responses = new ArrayList<>();
            return responses;

        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            // 发送数据
            this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleTimedOutRequests(responses, updatedNow);

        return responses;

NetworkClient 那一块没有进行分析,因为我觉得这一块放在 Network Layer 那分析更加容易理解。

Kafka 服务端(一)网络层


  • 在主线程中由KafkaProducer创建消息,然后通过拦截器序列化器分区器的之后缓存到消息累加器(RecordAccumulator)中。

  • Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

  • RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

  • RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B,即 32MB

  • 如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。

  • 主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;

  • Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个 ProducerRecord

    • ProducerRecord :是生产者中创建的消息

    • ProducerBatch:是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧凑,减少网络请求的次数以提升整体的吞吐量

    • 如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量

  • 消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。

    • 不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。
    • BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中。
    • 这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。
  • Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List< ProducerBatch>的形式。

    • Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区
    • 而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
  • 在转换成<Node,List<ProducerBatch>>的形式之后,Sender 还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest

  • 请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>

    • 它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。

    • InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。 有点像Java中 Semaphore 的意思

    • 通过比较Deque<Request>的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

  • InFlightRequests 还可以获得leastLoadedNode,即所有Node中负载最小的那一个。

    • 这里的负载最小是通过每个 Node 在 InFlightRequests 中还未确认的请求决定的,未确认的请求越多则认为负载越大。