Kafka 生产者(二)RecordAccumulate

2020年4月21日 | 作者 Siran | 9300字 | 阅读大约需要19分钟
归档于 消息队列 | 标签 #kafka

1.概述

Kafka 生产者(一)KafkaProducer 中分析了图中红色区域,也就是说每次调用 send 方法发送消息,都会经过InterceptorsSerializerPartitionor 。我们可以自定义拦截器、序列化器、分区器,实现我们自己的功能,比如消息路由、消息审计、消息轨迹等。

如下图: 这次分析的重点是 RecordAccumulator ,消息缓存器。Kafka 通过把消息放入缓存器,形成一个一个 ProducerBatch 进行批量的发送消息,这样的话可以有效的减少IO的开销。


2.源码分析

2.1 RecordAccumulator

在前面的 1.1 Kafka Producer 整体流程图 中阐述了主线程调用KafkaProducer.send( )方法并没有直接发送给Kafka Server,而是存入 RecordAccumulator 中,当达到一定条件之后,会唤醒 Sender 线程来获取消息并发送。

那么在 RecordAccumulator 中必然是有一个 main 线程不断的往其加数据,还有一个Sender 线程不断的取数据,所以要保证 RecordAccumulator 线程安全,可以看到使用了 原子类、并发集合、volatile 、锁来保证

public final class RecordAccumulator {
    //① 标记位,producer 是否 close,通过valatile 保证可见性
    private volatile boolean closed;
    //② 当前调用flush 方法的线程。KafkaProducer 可以调用flush 方法立即发送消息,会立马唤醒Sender线程
    private final AtomicInteger flushesInProgress;
    //③ 当前正在往 RecordAccumulator 中添加数据的线程数
    private final AtomicInteger appendsInProgress;
    //④ 指定每个Batch 的大小
    private final int batchSize;
    //⑤ 压缩的类型
    private final CompressionType compression;
    //⑥ 默认0,可以通过 linger.ms 进行设置
    private final int lingerMs;
    //⑦ 当发送消息失败时,重试的间隔时间,默认100,可以通过`retry.backoff.ms`配置
    private final long retryBackoffMs;
    //⑧ 延迟时间,默认120秒,可以通过 `delivery.timeout.ms` 进行配置
    private final int deliveryTimeoutMs;
    //⑨ ByteBuffer 对象池
    private final BufferPool free;
    private final Time time;
    // version 号
    private final ApiVersions apiVersions;
    //⑩ Topic-Partition 与 ProducerBatch 的映射关系 ,CopyOnWriteMap 线程安全
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    //⑪ 未发送完成的ProducerBatch 集合
    private final IncompleteBatches incomplete;
    private final Map<TopicPartition, Long> muted;
    //⑫ 在使用 drain 方法批量导出 RecordBatch 时,为了防止饥饿,用此字段记录上次发送停止的位置,下次继续从此位置开始发送
    private int drainIndex;
    private final TransactionManager transactionManager;
    private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.  
}
  • close :标记位,producer 是否 close,通过valatile 保证可见性
  • flushesInProgress :当前调用flush 方法的线程。KafkaProducer 可以调用flush 方法立即发送消息,会立马唤醒Sender线程
    public void flush() {
          log.trace("Flushing accumulated records in producer.");
          this.accumulator.beginFlush();
          this.sender.wakeup();
          try {
              this.accumulator.awaitFlushCompletion();
          } catch (InterruptedException e) {
              throw new InterruptException("Flush interrupted.", e);
          }
      }
    
  • appendsInProgress :当前正在往 RecordAccumulator 中添加数据的线程数,自增
  • batchSize :指定每个 ProducerBatch 的大小,默认16kb,可以通过 batch.size 进行设置
  • compression :压缩的类型,默认为none,可以配置为gzip、snappy、lz4、zstd
  • lingerMs :用来指定生产者发送 ProducerBatch 之前等待更多消息加入ProducerBatch的时间,默认0,可以通过 linger.ms 进行设置。生产者客户端会在 ProducerBatch 被填满或等待时间超过linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量
  • retryBackoffMs:当发送消息失败时,重试的间隔时间,默认100,可以通过retry.backoff.ms配置
  • deliveryTimeoutMs: 延迟时间,默认120秒,可以通过 delivery.timeout.ms 进行配置
  • free :ByteBuffer 对象复用池,在下面的 2.3 中 进行分析
  • batches Topic-Partition 与 ProducerBatch 的映射关系 ,CopyOnWriteMap 线程安全
  • incomplete 未发送完成的ProducerBatch 集合
  • drainIndex:在使用 drain 方法批量导出 RecordBatch 时,为了防止饥饿,用此字段记录上次发送停止的位置,下次继续从此位置开始发送

2.1.1 append 方法 KafkaProdcer 追加到消息到 RecordAccumulator 中

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch,
                                     long nowMs) throws InterruptedException {
        //① 当前正在往 RecordAccumulator 中添加数据的线程数,cas 自增1
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            //② 通过 topic-partition 去 batches 中获取是否有正在使用的,有的话直接获取,没有的创建一个新的, Deque 是一个ArrayList。
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            // 通过synchronized 来保证线程安全
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                //③ 尝试向 Deque 中最后一个ProducerBatch 追加 record,
                //   如果这个ProducerBatch 已经满了那么返回null
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null)
                    //④ 追加成功 直接返回
                    return appendResult;
            }

            //--------------------------到这里就是在 第③步 中追加失败了---------------------------

            // we don't have an in-progress record batch try to allocate a new batch
            //判断是否要丢弃这个batch ,是的话直接返回
            if (abortOnNewBatch) {
                // Return a result that will cause another call to append.
                return new RecordAppendResult(null, false, false, true);
            }
            //⑤ 开始创建 batch
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            //⑥ 计算这个batch 需要多少空间
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            //⑦ 创建空间
            buffer = free.allocate(size, maxTimeToBlock);

            // Update the current time in case the buffer allocation blocked above.
            nowMs = time.milliseconds();
            //加锁
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                //⑧ 再次尝试追加记录
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                //追加成功直接返回
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }
                
                //⑨ 在新创建的ProducerBatch 中追加 Record ,并将其添加到 dp 集合中,和incomplete 集合中,详情见 `2.4` 小节
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
                FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                        callback, nowMs));

                dq.addLast(batch);
                incomplete.add(batch);

                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
            }
        } finally {
            if (buffer != null)
                //⑩ 释放内存
                free.deallocate(buffer);
            // appendsInProgress - 1
            appendsInProgress.decrementAndGet();
        }
    }
  • incrementAndGet() :当前正在往 RecordAccumulator 中添加数据的线程数,cas 自增1
  • getOrCreateDeque(tp):通过 topic-partition 去 batches 中获取是否有正在使用的,有的话直接获取,没有的创建一个新的, Deque 是一个 ArrayList。通过 synchronized 来保证线程安全
  • tryAppend( ):尝试向 Deque 中最后一个ProducerBatch 追加 record,如果这个ProducerBatch 已经满了那么返回null,详情见 2.2 小节
  • ④ 如果追加成功直接返回
  • ⑤ 追加不成功,开始创建 batch,再次尝试
  • ⑥ 计算这个 batch 需要多少空间
  • allocate:使用BufferPool 创建空间,详情见 2.3 小节
  • tryAppend( ):再次尝试追加记录
  • ⑨ 在新创建的ProducerBatch 中追加 Record ,并将其添加到 dp 集合中,和incomplete 集合中,详情见 2.4 小节
  • deallocate(buffer) :释放内存

RecordAccumulate 的组成图:

对应上面的参数 ⑩ :ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches

  • 每一条由 KafkaProducer#send 发送到 RecordAccumulate 中的消息,都会被加入到 ProducerBatch中,即ProducerBatch 是消息的集合,可以对其进行压缩。
  • Deque<ProducerBatch>> 又是Batch的集合,通过 Topic-Partition 进行分类

因为ProducerBatch 是非线程安全的,所以往ProducerBatch 追加消息的时候需要加锁。而这边又使用了多个 synchronized 块而不是一个完整的 synchronized 块

这样做的原因是在第 ⑦ 步中通过BufferPool 申请空间的时候,会造成堵塞,会降低性能,所以就减少了锁的持有时间,做一定的优化。

第二次加锁重试的原因是:为了防止多个线程并发的向 BufferPool 申请空间后,造成内部碎片。

如下图中:线程1 发现 Deque 尾部的ProducerBatch 不够用创建了一个新的添加到Deque尾部,线程2也是,这样就导致ProducerBatch3 不可用了,这就出现了内存碎片。

然后在第二个synchronized 块中,再一次尝试追加record,发现已经创建了新的了,就会唤醒Sender 线程,发送消息然后释放内存。

在RecordAccumulator 中还有两个比较重要的方法 readydrain

2.1.2 ready 方法 在 RecordAccumulator 中追加消息如果所在 Deque 队列的最后一个 ProducerBatch 满了或者 创建了一个新的ProducerBatch那么就会唤醒Sender线程。那么Sender 线程就会调用 ready 方法获取RecordAccumulator 符合发送的节点集合。

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        //① 通过Set 来保存满足发送要求的节点,最后包装到ReadyCheckResult中返回
        Set<Node> readyNodes = new HashSet<>();
        // 记录下一次调用ready方法的时间间隔
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        //② 根据元数据中找不到Leader 节点的分区
        Set<String> unknownLeaderTopics = new HashSet<>();

        //条件4:是有有其他线程在等待BufferPool释放空间
        boolean exhausted = this.free.queued() > 0;
        //③ 从batches 中获取 tp 对应的Deque<ProducerBatch>列表,判断是否有满足的Batch可以发送
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
            Deque<ProducerBatch> deque = entry.getValue();
            //④加锁保证线程安全
            synchronized (deque) {
                //⑤ 从Deque头部获取 ProducerBatch 并且去元数据中查找Leader节点
                ProducerBatch batch = deque.peekFirst();
                if (batch != null) {
                    TopicPartition part = entry.getKey();
                    Node leader = cluster.leaderFor(part);
                    //找不到放入 unknownLeaderTopics 队列,之后会出发元数据更新,因为Kafka 是主读主写的模式。找不到Leader节点肯定无法发送的。
                    if (leader == null) {
                        unknownLeaderTopics.add(part.topic());
                        //能找到 Leader 节点 并且 不在 readyNodes 集合中,则需判断是否满足以下要求:
                    } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
                        // 条件1:这个条件涉及到两个参数 retry.backoff.ms 重试的间隔时间以及 linger.ms
                        // 如果当前尝试次数>0说明之前发送失败了,所以根据 retry.backoff.ms 重试间隔时间 判断是否需要堵塞等待
                        // 如果需要,timeToWaitMs 为true 需要堵塞,不会添加到 readyNodes 集合中
                        // 如果不需要,那么要根据 linger.ms 判断是否需要等待,这个参数的意思就是,当Sender 线程准备来获取可发送的ProducerBatch的时候,是否需要等待默认为0 ,如果增加这个值,那么会给消息带来一定的延迟,但是会增加吞吐量
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        //条件2:是否第一个ProducerBatch 满了或者有多个ProducerBatch
                        boolean full = deque.size() > 1 || batch.isFull();
                        //条件3:是否超时
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        //条件5: close :Sender 准备关闭
                        //条件6: flushInProgress 是否有线程正在等待flush 操作完成
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }
  • readyNodes:通过Set 来保存满足发送要求的节点,最后包装到ReadyCheckResult中返回
  • unknownLeaderTopics
  • ③ 从 batches 中获取 tp 对应的Deque<ProducerBatch>列表,判断是否有满足的Batch可以发送
  • ④ 加锁保证线程安全
  • ⑤ 从Deque头部获取 ProducerBatch 并且去元数据中查找Leader节点
    • 找不到放入 unknownLeaderTopics 队列,之后会出发元数据更新,因为Kafka 是主读主写的模式。找不到Leader节点肯定无法发送的。
  • ⑥ 能找到 Leader 节点 并且 不在 readyNodes 集合中,则需判断是否满足以下要求:
    • 条件1 backingOff:这个条件涉及到两个参数 retry.backoff.ms 重试的间隔时间以及 linger.ms如果当前尝试次数>0说明之前发送失败了,所以根据 retry.backoff.ms 重试间隔时间 判断是否需要堵塞等待,如果需要,timeToWaitMs 为true 需要堵塞,不会添加到 readyNodes 集合中,如果不需要,那么要根据 linger.ms 判断是否需要等待,这个参数的意思就是,当Sender 线程准备来获取可发送的ProducerBatch的时候,是否需要等待默认为0 ,如果增加这个值,那么会给消息带来一定的延迟,但是会增加吞吐量
    • 条件2 full:是否第一个ProducerBatch 满了或者有多个ProducerBatch
    • 条件3 expired:是否超时
    • 条件4 exhausted:是有有其他线程在等待BufferPool释放空间
    • 条件5 close :Sender 准备关闭
    • 条件6 flushInProgress() :是否有线程正在等待flush 操作完成

2.1.3 drain 方法 通过上面的 ready( ) 方法后,Sender 线程就获得了可发送的 ProducerBatch 集合了,但是在 IO 层,生产者不管数据属于哪个TopicPartition 的,它只关心我需要发往哪个Node,所以Sender 线程会调用 drain 方法通过获得的 Set<Node> nodes 集合。重新映射成一个Map, key 是要发往的 NodeId, valueProducerBatch

//① 其中这个maxSize 对应的是 max.request.size 生产者发送消息最大值,默认 1MB ,可以通过 
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();

        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
        for (Node node : nodes) {
            //调用 drainBatchesForOneNode 方法
            List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
            batches.put(node.id(), ready);
        }
        return batches;
    }

private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
        int size = 0;
        //② 获取分区的详细信息
        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
        List<ProducerBatch> ready = new ArrayList<>();
        //③ 从上一次drain的位置开始 取模 算法这次的开始位置
        int start = drainIndex = drainIndex % parts.size();
        do {
            //④ 获取这个位置的详细信息
            PartitionInfo part = parts.get(drainIndex);
            //⑤ 通过这个 PartitionInfo 封装成新的 topic-partition映射关系
            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
            //⑥ 更新drainIndex
            this.drainIndex = (this.drainIndex + 1) % parts.size();

            // Only proceed if the partition has no in-flight batches.
            if (isMuted(tp, now))
                continue;
            Deque<ProducerBatch> deque = getDeque(tp);
            if (deque == null)
                continue;

            synchronized (deque) {
                // invariant: !isMuted(tp,now) && deque != null
                //⑦ 获取ProducerBatch
                ProducerBatch first = deque.peekFirst();
                if (first == null)
                    continue;

                // first != null
                //⑧ 这个判断在ready方法中的条件一分析过。
                boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                // Only drain the batch if it is not during backoff period.
                if (backoff)
                    continue;

                //⑨ 判断是否超过 maxSize(在①中有解释这个参数的含义),满了就直接结束循环
                if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                    break;
                } else {
                    //⑩ 调用 shouldStopDrainBatchesForPartition 方法判断是否要停止 跟事务有关系,不是本次的重点
                    if (shouldStopDrainBatchesForPartition(first, tp))
                        break;

                    boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
                    ProducerIdAndEpoch producerIdAndEpoch =
                        transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
                    ProducerBatch batch = deque.pollFirst();
                    if (producerIdAndEpoch != null && !batch.hasSequence()) {
                        batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                        transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                        log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
                                "{} being sent to partition {}", producerIdAndEpoch.producerId,
                            producerIdAndEpoch.epoch, batch.baseSequence(), tp);

                        transactionManager.addInFlightBatch(batch);
                    }
                    //⑪ 调用 batch.close 方法关闭底层输出流,将MemoryRecords 设置为只读
                    batch.close();
                    //⑫ 将 batch的字节流添加到 size中
                    size += batch.records().sizeInBytes();
                    //⑬ 添加到ready集合中发送。
                    ready.add(batch);
                    //⑭ 调用 ProducerBatch.drain 更新 drainedMs 字段
                    batch.drained(now);
                }
            }
        } while (start != drainIndex);
        return ready;
    }    
  • maxSize 参数 :对应的是 max.request.size 生产者发送消息最大值,默认 1MB ,可以通过
  • ② 获取分区的详细信息
  • ③ 从上一次drain的位置开始 取模 算法这次的开始位置
  • PartitionInfo:获取这个位置的详细信息
    public class PartitionInfo {
      //topic
      private final String topic;
      //分区
      private final int partition;
      //leader 节点
      private final Node leader;
      //topic对应的replica 集合
      private final Node[] replicas;
      //ISR可用Node 集合
      private final Node[] inSyncReplicas;
      //OSR不可用Node 集合
      private final Node[] offlineReplicas;
    } 
    
  • ⑤ 通过这个 PartitionInfo 封装成新的 topic-partition 映射关系
  • ⑥ 更新 drainIndex
  • ⑦ 获取 ProducerBatch
  • ⑧ 这个判断在 ready( ) 方法中的 条件一 分析过。
  • ⑨ 判断是否超过 maxSize(在①中有解释这个参数的含义),满了就直接结束循环
  • ⑩ 调用 shouldStopDrainBatchesForPartition 方法判断是否要停止,跟事务有关系,不是本次的重点
  • ⑪ 调用 batch#close 方法关闭底层输出流,将 MemoryRecords 设置为只读
  • ⑫ 将 batch 的字节流添加到 size中
  • ⑬ 添加到ready集合中发送。
  • ⑭ 调用 ProducerBatch#drain 方法,更新 drainedMs 字段

2.2 ProducerBatch

public final class ProducerBatch {
    //追加记录最后的状态:Aborted、Failed、Successed
    private enum FinalState { ABORTED, FAILED, SUCCEEDED }

    final long createdMs;
    //① topic 与 partition 的映射关系
    final TopicPartition topicPartition;
    //② 标识ProducerBatch 的状态的future, 没有实现Future 接口,而是使用了 CountDownLatch 来实现类似Future 的功能
    final ProduceRequestResult produceFuture;
    //③ Thunk 对象集合 ,ProducerBatch 的内部类
    private final List<Thunk> thunks = new ArrayList<>();
    //④ MemoryRecords 对象 通过 builder 模式构建
    private final MemoryRecordsBuilder recordsBuilder;
    //⑤ 尝试发送当前ProducerBatch的次数
    private final AtomicInteger attempts = new AtomicInteger(0);

    private final boolean isSplitBatch;
    //状态
    private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
    //⑥ 记录了当前保存的Record 个数
    int recordCount;
    //⑦ 最大record的字节数
    int maxRecordSize;
    //⑧ 最后一次尝试发送ProducerBatch的时间
    private long lastAttemptMs;
    //⑨ 最后一次追加record的时间
    private long lastAppendTime;
    //⑩ 在使用 drain 方法批量导出 RecordBatch 的时间
    private long drainedMs;
    //⑪是否正在重试
    private boolean retry;
    private boolean reopened;
}
  • topicPartition :topic 与 partition 的映射关系
  • produceFuture :标识ProducerBatch 的状态的future, 没有实现Future 接口,而是使用了
  • thunks : Thunk 对象集合 ,ProducerBatch 的内部类,这个thunk里面包装了每个消息的回调。
  • recordsBuilder:MemoryRecords 对象 通过 builder 模式构建
  • attempts:尝试发送当前ProducerBatch的次数
  • recordCount:记录了当前保存的Record 个数
  • maxRecordSize:最大record的字节数
  • lastAttemptMs:最后一次尝试发送ProducerBatch的时间
  • lastAppendTime:最后一次追加record的时间
  • drainedMs :在使用 drain 方法批量导出 RecordBatch 的时间
  • retry : 是否正在重试

2.2.1 tryAppend 方法

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
        //① 估算一下剩余的值是否足够,不够不直接return
        if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
            return null;
        } else {
            //② 向MemoryRecords 中添加数据
            Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
            this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                    recordsBuilder.compressionType(), key, value, headers));
            this.lastAppendTime = now;
            //创建FutureRecordMetadata对象
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length,
                                                                   Time.SYSTEM);
            //③ 每一次调用send 方法,都会有一个CallBack 对象,用于该条消息的回调,thunk就是封装这个消息的回调,thunks就是消息回调对象的集合
            thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }
  • ① 估算一下剩余的值是否足够,不够不直接return,详情见 2.4 小节
  • ② 向MemoryRecords 中添加数据,详情见 2.4 小节
  • ③ 每一次调用send 方法,都会有一个CallBack 对象,用于该条消息的回调,thunk就是封装这个消息的回调,thunks就是消息回调对象的集合,通过Sender 线程进行回调。

2.3 BufferPool

2.1.1 小节中的 第⑦步第⑩步 类似这样的创建和释放 ByteBuffer 是比较耗费资源的,所以通过BufferPool 实现 ByteBuffer 对象池,这种技术在 Netty 中也有体现。

public class BufferPool {

    static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";
    //①
    private final long totalMemory;
    //②
    private final int poolableSize;
    //③ ReentrantLock 锁,因为会有多线程并发的分配和回收,所以要用锁进行控制
    private final ReentrantLock lock;
    //④ 是 ArrayDeque<ByteBuffer> 队列,也就是说对象池是通过队列来实现的
    private final Deque<ByteBuffer> free;
    //⑤ 条件队列:记录因申请不到足够空间而堵塞的线程,此队列中实际记录的是堵塞线程对应的Condition对象
    private final Deque<Condition> waiters;
    //⑥
    private long nonPooledAvailableMemory;
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;
    private boolean closed;
}    
  • totalMemory:总大小,默认32MB, 可以通过buffer.memory 进行配置
  • poolableSize:每个ByteBuffer 的大小,也就是ProducerBatch 默认16kb,可以通过 batch.size 进行设置
  • lock:ReentrantLock 锁,因为会有多线程并发的分配和回收,所以要用锁进行控制
  • free:是 ArrayDeque 队列,也就是说对象池是通过队列来实现的
  • waiters:条件队列:记录因申请不到足够空间而堵塞的线程,此队列中实际记录的是堵塞线程对应的Condition对象
  • nonPooledAvailableMemory:可用的空间大小,totalMemory - free队列中全部的ByteBuffer大小

注意:通过 poolableSize 字段可以知道,BufferPool 只能对指定大小的ByteBuffer 进行复用,可以通过增大 batch.size 属性增加这个的复用对象的大小

2.3.1 allocate 方法 申请ByteBuffer 也就是申请ProducerBatch,如果没有足够的内存分配会把当前线程塞入 waiters 条件队列堵塞,等待有线程释放了内存,然后唤醒。

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");

        ByteBuffer buffer = null;
        //① 加锁同步
        this.lock.lock();

        if (this.closed) {
            this.lock.unlock();
            throw new KafkaException("Producer closed while allocating memory");
        }

        try {
            // check if we have a free buffer of the right size pooled
            //② 申请分配的是否是 poolableSize 指定大小的,并且 如果 free 中有空闲的ByteBuffer 直接获取返回
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();

            // now check if the request is immediately satisfiable with the
            // memory on hand or if we need to block
            //③ 不满足② 计算当前 free 队列中的空间
            int freeListSize = freeSize() * this.poolableSize;

            //④ 未分配的可用空间 > 所要分配的大小,也就是说有空间能进行分配,那么会进行分配
            if (this.nonPooledAvailableMemory + freeListSize >= size) {
                // we have enough unallocated or pooled memory to immediately
                // satisfy the request, but need to allocate the buffer
                freeUp(size);
                this.nonPooledAvailableMemory -= size;
            } else {
                // we are out of memory and will have to block
                //⑤ 不够则加入条件队列进行堵塞
                int accumulated = 0;
                Condition moreMemory = this.lock.newCondition();
                try {
                    long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                    this.waiters.addLast(moreMemory);
                    // loop over and over until we have a buffer or have reserved
                    // enough memory to allocate one
                    while (accumulated < size) {
                        long startWaitNs = time.nanoseconds();
                        long timeNs;
                        boolean waitingTimeElapsed;
                        try {
                            //⑥ 堵塞 超时时间为 remainingTimeToBlockNs
                            waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                        } finally {
                            long endWaitNs = time.nanoseconds();
                            timeNs = Math.max(0L, endWaitNs - startWaitNs);
                            recordWaitTime(timeNs);
                        }

                        if (this.closed)
                            throw new KafkaException("Producer closed while allocating memory");

                        if (waitingTimeElapsed) {
                            throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                        }

                        remainingTimeToBlockNs -= timeNs;

                        // check if we can satisfy this request from the free list,
                        // otherwise allocate memory
                        //⑦ 有足够的空间分配了,则退出循环
                        if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                            // just grab a buffer from the free list
                            buffer = this.free.pollFirst();
                            accumulated = size;
                        //⑧ 否则先分配一部分,继续等待其他线程释放空间。
                        } else {
                            // we'll need to allocate memory, but we may only get
                            // part of what we need on this iteration
                            freeUp(size - accumulated);
                            int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                            this.nonPooledAvailableMemory -= got;
                            accumulated += got;
                        }
                    }
                    // Don't reclaim memory on throwable since nothing was thrown
                    accumulated = 0;
                } finally {
                    // When this loop was not able to successfully terminate don't loose available memory
                    this.nonPooledAvailableMemory += accumulated;
                    //⑨ 已经分配了空间 从条件队列中删除
                    this.waiters.remove(moreMemory);
                }
            }
        } finally {
            // signal any additional waiters if there is more memory left
            // over for them
            try {
                //⑩ 如果还有空间的 唤醒下一个线程
                if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                    this.waiters.peekFirst().signal();
            } finally {
                // Another finally... otherwise find bugs complains
                //⑪ 释放锁,整个过程都是同步的。
                lock.unlock();
            }
        }

        if (buffer == null)
            return safeAllocateByteBuffer(size);
        else
            return buffer;
    }
  • lock( ) :加锁同步
  • ② 申请分配的是否是 poolableSize 指定大小的,并且 如果 free 中有空闲的 ByteBuffer 直接获取返回
  • ③ 不满足 计算当前 free 队列中的空间
  • ④ 未分配的可用空间 > 所要分配的大小,也就是说有空间能进行分配,那么会进行分配
  • ⑤ 不够则加入条件队列进行堵塞
  • ⑥ 堵塞 超时时间为 remainingTimeToBlockNs
  • ⑦ 有足够的空间分配了,则退出循环
  • ⑧ 否则先分配一部分,继续等待其他线程释放空间。
  • ⑨ 已经分配了空间 从条件队列中删除
  • signal( ):如果还有空间的 唤醒下一个线程
  • unlock( ):释放锁,整个过程都是同步的。

2.3.2 deallocate 方法 相对的就是释放。

public void deallocate(ByteBuffer buffer, int size) {
        //① 加锁
        lock.lock();
        try {
            //② 释放是的 poolableSize 大小的,直接加入 free 队列中
            if (size == this.poolableSize && size == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);
            } else {
                //③ 不是的 poolableSize 大小的,不会进行复用,往 nonPooledAvailableMemory 添加
                this.nonPooledAvailableMemory += size;
            }
            //④ 线程间的通讯,唤醒条件队列中因空间不足而堵塞的线程
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                moreMem.signal();
        } finally {
            lock.unlock();
        }
}
  • ① 加锁
  • ② 释放是的 poolableSize 大小的,直接加入 free 队列中
  • ③ 不是的 poolableSize 大小的,不会进行复用,往 nonPooledAvailableMemory 添加
  • ④ 线程间的通讯,唤醒条件队列中因空间不足而堵塞的线程

2.4 MemoryRecords

来开始填 2.1.1append 方法中第 步留下的坑,可以看到调用的是ProducerBatch 类中的 tryAppend 方法具体看 2.2.1中的第 步,而ProducerBatch 又是调用的 MemoryReocrdsBuilder 类中的 append 方法,而 MemoryReocrdsBuilder 类实际是操作 MemoryRecords,而MemoryRecords 底层又是使用的 BufferPool 创建的 ByteBuffer ,整体流程就串起来了。

整个tryAppend( )方法流程如下:

  • KafkaProducer#send:发送一条消息 假设发往Topic 是2 Partition是1,会先调用 RecordAccumulator#tryAppend 缓存起来 对应 2.1.1
  • RecordAccumulator :去获取是否是有对应的Deque,有直接获取,没有创建。
  • ProducerBatch#append :通过 MemoryReocrdsBuilder,往ByteBuffer 中添加记录
  • BufferPool: 如果第③步添加成功直接返回,但是如果失败,则会 allocate 空间继续走第③步

所以呢这个 MemoryReocrdsBuilder 实际就是对 MemoryRecords 的包装进而操控 BufferPool 创建出来的 ByteBuffer 往里追加 record。

public class MemoryRecords extends AbstractRecords {
    //① java nio ByteBuffer
    private final ByteBuffer buffer;
    //② 私有构造器
    private MemoryRecords(ByteBuffer buffer) {
        Objects.requireNonNull(buffer, "buffer should not be null");
        this.buffer = buffer;
    }
    //③ MemoryRecordsBuilder builder 模式
    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
                                               byte magic,
                                               CompressionType compressionType,
                                               TimestampType timestampType,
                                               long baseOffset) {
        long logAppendTime = RecordBatch.NO_TIMESTAMP;
        if (timestampType == TimestampType.LOG_APPEND_TIME)
            logAppendTime = System.currentTimeMillis();
        return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
                RecordBatch.NO_PARTITION_LEADER_EPOCH);
    }
}   
  • ① 可以看到MemoryRecords 就是对Java中的ByteBuffer 进行操作。
  • ② 私有构造器不允许外部使用
  • ③ 通过builder 模式创建 MemoryRecordsBuilder,通过 MemoryRecordsBuilder 对MemoryRecords 进行操作。

MemoryRecordsBuilder 在MemoryRecordsBuilder 中两个输出类型的字段 bufferStreamappendStream

public class MemoryRecordsBuilder implements AutoCloseable {
    //① 对ByteBuffer的封装
    private final ByteBufferOutputStream bufferStream;
    //② 对bufferStream 的包装带有压缩功能
    private DataOutputStream appendStream;
    private MemoryRecords builtRecords;
}    
  • bufferStream : 继承java.io.OutputStream,当写入数据超出ByteBuffer 的时候自动扩容。
  • appendStream : 通过 timestampType 字段选择的压缩算法,对 bufferStream的包装。
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));

Kafka 四种压缩算法性能结论: ZStandard有着最高的压缩比,相同的消息量占用最少的磁盘容量,因此带宽的占用也是比较少的,但是在TPS方面的表现并不抢眼,因此对于那些在乎磁盘和带宽资源的用户而言,配置ZStandard算法似乎是个不错的选择,但如果追求应用TPS,就目前的Kafka而言LZ4依然是最好的选择。 参考自:Kafka 2.1.0压缩算法性能测试

大致流程就分析完了,接下来会分析Sender线程是如何来RecordAccumulator中获取消息并且发送。