1.概述
在 Kafka 生产者(一)KafkaProducer 中分析了图中红色区域,也就是说每次调用 send
方法发送消息,都会经过Interceptors
、 Serializer
、 Partitionor
。我们可以自定义拦截器、序列化器、分区器,实现我们自己的功能,比如消息路由、消息审计、消息轨迹等。
如下图:
这次分析的重点是 RecordAccumulator
,消息缓存器。Kafka 通过把消息放入缓存器,形成一个一个 ProducerBatch
进行批量的发送消息,这样的话可以有效的减少IO的开销。
2.源码分析
2.1 RecordAccumulator
在前面的 1.1 Kafka Producer 整体流程图
中阐述了主线程调用KafkaProducer.send( )
方法并没有直接发送给Kafka Server,而是存入 RecordAccumulator
中,当达到一定条件之后,会唤醒 Sender 线程
来获取消息并发送。
那么在 RecordAccumulator
中必然是有一个 main 线程
不断的往其加数据,还有一个Sender 线程
不断的取数据,所以要保证 RecordAccumulator 线程安全
,可以看到使用了 原子类、并发集合、volatile 、锁来保证。
public final class RecordAccumulator {
//① 标记位,producer 是否 close,通过valatile 保证可见性
private volatile boolean closed;
//② 当前调用flush 方法的线程。KafkaProducer 可以调用flush 方法立即发送消息,会立马唤醒Sender线程
private final AtomicInteger flushesInProgress;
//③ 当前正在往 RecordAccumulator 中添加数据的线程数
private final AtomicInteger appendsInProgress;
//④ 指定每个Batch 的大小
private final int batchSize;
//⑤ 压缩的类型
private final CompressionType compression;
//⑥ 默认0,可以通过 linger.ms 进行设置
private final int lingerMs;
//⑦ 当发送消息失败时,重试的间隔时间,默认100,可以通过`retry.backoff.ms`配置
private final long retryBackoffMs;
//⑧ 延迟时间,默认120秒,可以通过 `delivery.timeout.ms` 进行配置
private final int deliveryTimeoutMs;
//⑨ ByteBuffer 对象池
private final BufferPool free;
private final Time time;
// version 号
private final ApiVersions apiVersions;
//⑩ Topic-Partition 与 ProducerBatch 的映射关系 ,CopyOnWriteMap 线程安全
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
//⑪ 未发送完成的ProducerBatch 集合
private final IncompleteBatches incomplete;
private final Map<TopicPartition, Long> muted;
//⑫ 在使用 drain 方法批量导出 RecordBatch 时,为了防止饥饿,用此字段记录上次发送停止的位置,下次继续从此位置开始发送
private int drainIndex;
private final TransactionManager transactionManager;
private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.
}
- ①
close
:标记位,producer 是否 close,通过valatile 保证可见性 - ②
flushesInProgress
:当前调用flush 方法的线程。KafkaProducer 可以调用flush 方法立即发送消息,会立马唤醒Sender线程public void flush() { log.trace("Flushing accumulated records in producer."); this.accumulator.beginFlush(); this.sender.wakeup(); try { this.accumulator.awaitFlushCompletion(); } catch (InterruptedException e) { throw new InterruptException("Flush interrupted.", e); } }
- ③
appendsInProgress
:当前正在往 RecordAccumulator 中添加数据的线程数,自增 - ④
batchSize
:指定每个 ProducerBatch 的大小,默认16kb,可以通过batch.size
进行设置 - ⑤
compression
:压缩的类型,默认为none,可以配置为gzip、snappy、lz4、zstd - ⑥
lingerMs
:用来指定生产者发送 ProducerBatch 之前等待更多消息加入ProducerBatch的时间,默认0,可以通过linger.ms
进行设置。生产者客户端会在 ProducerBatch 被填满或等待时间超过linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量 - ⑦
retryBackoffMs
:当发送消息失败时,重试的间隔时间,默认100,可以通过retry.backoff.ms
配置 - ⑧
deliveryTimeoutMs
: 延迟时间,默认120秒,可以通过delivery.timeout.ms
进行配置 - ⑨
free
:ByteBuffer 对象复用池,在下面的2.3
中 进行分析 - ⑩
batches
Topic-Partition 与 ProducerBatch 的映射关系 ,CopyOnWriteMap 线程安全 - ⑪
incomplete
未发送完成的ProducerBatch 集合 - ⑫
drainIndex
:在使用 drain 方法批量导出 RecordBatch 时,为了防止饥饿,用此字段记录上次发送停止的位置,下次继续从此位置开始发送
2.1.1 append 方法 KafkaProdcer 追加到消息到 RecordAccumulator 中
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
//① 当前正在往 RecordAccumulator 中添加数据的线程数,cas 自增1
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
//② 通过 topic-partition 去 batches 中获取是否有正在使用的,有的话直接获取,没有的创建一个新的, Deque 是一个ArrayList。
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 通过synchronized 来保证线程安全
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
//③ 尝试向 Deque 中最后一个ProducerBatch 追加 record,
// 如果这个ProducerBatch 已经满了那么返回null
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
//④ 追加成功 直接返回
return appendResult;
}
//--------------------------到这里就是在 第③步 中追加失败了---------------------------
// we don't have an in-progress record batch try to allocate a new batch
//判断是否要丢弃这个batch ,是的话直接返回
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}
//⑤ 开始创建 batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
//⑥ 计算这个batch 需要多少空间
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
//⑦ 创建空间
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.
nowMs = time.milliseconds();
//加锁
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
//⑧ 再次尝试追加记录
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
//追加成功直接返回
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
//⑨ 在新创建的ProducerBatch 中追加 Record ,并将其添加到 dp 集合中,和incomplete 集合中,详情见 `2.4` 小节
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
//⑩ 释放内存
free.deallocate(buffer);
// appendsInProgress - 1
appendsInProgress.decrementAndGet();
}
}
- ①
incrementAndGet()
:当前正在往 RecordAccumulator 中添加数据的线程数,cas 自增1 - ②
getOrCreateDeque(tp)
:通过 topic-partition 去 batches 中获取是否有正在使用的,有的话直接获取,没有的创建一个新的, Deque 是一个ArrayList。通过
synchronized
来保证线程安全 - ③
tryAppend( )
:尝试向 Deque 中最后一个ProducerBatch 追加 record,如果这个ProducerBatch 已经满了那么返回null,详情见2.2
小节 - ④ 如果追加成功直接返回
- ⑤ 追加不成功,开始创建 batch,再次尝试
- ⑥ 计算这个 batch 需要多少空间
- ⑦
allocate
:使用BufferPool 创建空间,详情见2.3
小节 - ⑧
tryAppend( )
:再次尝试追加记录 - ⑨ 在新创建的ProducerBatch 中追加 Record ,并将其添加到 dp 集合中,和incomplete 集合中,详情见
2.4
小节 - ⑩
deallocate(buffer)
:释放内存
RecordAccumulate 的组成图:
对应上面的参数 ⑩ :ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
- 每一条由
KafkaProducer#send
发送到 RecordAccumulate 中的消息,都会被加入到 ProducerBatch中,即ProducerBatch 是消息的集合,可以对其进行压缩。 Deque<ProducerBatch>>
又是Batch的集合,通过Topic-Partition
进行分类
因为ProducerBatch 是非线程安全的,所以往ProducerBatch 追加消息的时候需要加锁。而这边又使用了多个 synchronized 块而不是一个完整的 synchronized 块
这样做的原因是在
第 ⑦ 步
中通过BufferPool 申请空间的时候,会造成堵塞,会降低性能,所以就减少了锁的持有时间,做一定的优化。第二次加锁重试的原因是:为了防止多个线程并发的向 BufferPool 申请空间后,造成内部碎片。
如下图中:线程1 发现 Deque 尾部的ProducerBatch 不够用创建了一个新的添加到Deque尾部,线程2也是,这样就导致ProducerBatch3 不可用了,这就出现了内存碎片。
然后在第二个synchronized 块中,再一次尝试追加record,发现已经创建了新的了,就会唤醒Sender 线程,发送消息然后释放内存。
在RecordAccumulator 中还有两个比较重要的方法 ready
和 drain
2.1.2 ready 方法
在 RecordAccumulator 中追加消息如果所在 Deque 队列的最后一个 ProducerBatch 满了或者
创建了一个新的ProducerBatch那么就会唤醒Sender线程。那么Sender 线程就会调用 ready
方法获取RecordAccumulator 符合发送的节点集合。
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
//① 通过Set 来保存满足发送要求的节点,最后包装到ReadyCheckResult中返回
Set<Node> readyNodes = new HashSet<>();
// 记录下一次调用ready方法的时间间隔
long nextReadyCheckDelayMs = Long.MAX_VALUE;
//② 根据元数据中找不到Leader 节点的分区
Set<String> unknownLeaderTopics = new HashSet<>();
//条件4:是有有其他线程在等待BufferPool释放空间
boolean exhausted = this.free.queued() > 0;
//③ 从batches 中获取 tp 对应的Deque<ProducerBatch>列表,判断是否有满足的Batch可以发送
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();
//④加锁保证线程安全
synchronized (deque) {
//⑤ 从Deque头部获取 ProducerBatch 并且去元数据中查找Leader节点
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
TopicPartition part = entry.getKey();
Node leader = cluster.leaderFor(part);
//找不到放入 unknownLeaderTopics 队列,之后会出发元数据更新,因为Kafka 是主读主写的模式。找不到Leader节点肯定无法发送的。
if (leader == null) {
unknownLeaderTopics.add(part.topic());
//能找到 Leader 节点 并且 不在 readyNodes 集合中,则需判断是否满足以下要求:
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
// 条件1:这个条件涉及到两个参数 retry.backoff.ms 重试的间隔时间以及 linger.ms
// 如果当前尝试次数>0说明之前发送失败了,所以根据 retry.backoff.ms 重试间隔时间 判断是否需要堵塞等待
// 如果需要,timeToWaitMs 为true 需要堵塞,不会添加到 readyNodes 集合中
// 如果不需要,那么要根据 linger.ms 判断是否需要等待,这个参数的意思就是,当Sender 线程准备来获取可发送的ProducerBatch的时候,是否需要等待默认为0 ,如果增加这个值,那么会给消息带来一定的延迟,但是会增加吞吐量
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
//条件2:是否第一个ProducerBatch 满了或者有多个ProducerBatch
boolean full = deque.size() > 1 || batch.isFull();
//条件3:是否超时
boolean expired = waitedTimeMs >= timeToWaitMs;
//条件5: close :Sender 准备关闭
//条件6: flushInProgress 是否有线程正在等待flush 操作完成
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
- ①
readyNodes
:通过Set 来保存满足发送要求的节点,最后包装到ReadyCheckResult中返回 - ②
unknownLeaderTopics
- ③ 从
batches
中获取 tp 对应的Deque<ProducerBatch>
列表,判断是否有满足的Batch可以发送 - ④ 加锁保证线程安全
- ⑤ 从Deque头部获取 ProducerBatch 并且去元数据中查找Leader节点
- 找不到放入
unknownLeaderTopics
队列,之后会出发元数据更新,因为Kafka 是主读主写的模式。找不到Leader节点肯定无法发送的。
- 找不到放入
- ⑥ 能找到 Leader 节点 并且 不在 readyNodes 集合中,则需判断是否满足以下要求:
条件1 backingOff
:这个条件涉及到两个参数retry.backoff.ms
重试的间隔时间以及linger.ms
如果当前尝试次数>0说明之前发送失败了,所以根据 retry.backoff.ms 重试间隔时间 判断是否需要堵塞等待,如果需要,timeToWaitMs 为true 需要堵塞,不会添加到 readyNodes 集合中,如果不需要,那么要根据 linger.ms 判断是否需要等待,这个参数的意思就是,当Sender 线程准备来获取可发送的ProducerBatch的时候,是否需要等待默认为0 ,如果增加这个值,那么会给消息带来一定的延迟,但是会增加吞吐量条件2 full
:是否第一个ProducerBatch 满了或者有多个ProducerBatch条件3 expired
:是否超时条件4 exhausted
:是有有其他线程在等待BufferPool释放空间条件5 close
:Sender 准备关闭条件6 flushInProgress()
:是否有线程正在等待flush 操作完成
2.1.3 drain 方法
通过上面的 ready( )
方法后,Sender 线程就获得了可发送的 ProducerBatch 集合了,但是在 IO 层,生产者不管数据属于哪个TopicPartition 的,它只关心我需要发往哪个Node,所以Sender 线程会调用 drain 方法通过获得的 Set<Node> nodes
集合。重新映射成一个Map, key
是要发往的 NodeId,
value
是 ProducerBatch
//① 其中这个maxSize 对应的是 max.request.size 生产者发送消息最大值,默认 1MB ,可以通过
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
//调用 drainBatchesForOneNode 方法
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
//② 获取分区的详细信息
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();
//③ 从上一次drain的位置开始 取模 算法这次的开始位置
int start = drainIndex = drainIndex % parts.size();
do {
//④ 获取这个位置的详细信息
PartitionInfo part = parts.get(drainIndex);
//⑤ 通过这个 PartitionInfo 封装成新的 topic-partition映射关系
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
//⑥ 更新drainIndex
this.drainIndex = (this.drainIndex + 1) % parts.size();
// Only proceed if the partition has no in-flight batches.
if (isMuted(tp, now))
continue;
Deque<ProducerBatch> deque = getDeque(tp);
if (deque == null)
continue;
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
//⑦ 获取ProducerBatch
ProducerBatch first = deque.peekFirst();
if (first == null)
continue;
// first != null
//⑧ 这个判断在ready方法中的条件一分析过。
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
// Only drain the batch if it is not during backoff period.
if (backoff)
continue;
//⑨ 判断是否超过 maxSize(在①中有解释这个参数的含义),满了就直接结束循环
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
break;
} else {
//⑩ 调用 shouldStopDrainBatchesForPartition 方法判断是否要停止 跟事务有关系,不是本次的重点
if (shouldStopDrainBatchesForPartition(first, tp))
break;
boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
ProducerIdAndEpoch producerIdAndEpoch =
transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
ProducerBatch batch = deque.pollFirst();
if (producerIdAndEpoch != null && !batch.hasSequence()) {
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
"{} being sent to partition {}", producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, batch.baseSequence(), tp);
transactionManager.addInFlightBatch(batch);
}
//⑪ 调用 batch.close 方法关闭底层输出流,将MemoryRecords 设置为只读
batch.close();
//⑫ 将 batch的字节流添加到 size中
size += batch.records().sizeInBytes();
//⑬ 添加到ready集合中发送。
ready.add(batch);
//⑭ 调用 ProducerBatch.drain 更新 drainedMs 字段
batch.drained(now);
}
}
} while (start != drainIndex);
return ready;
}
- ①
maxSize 参数
:对应的是max.request.size
生产者发送消息最大值,默认 1MB ,可以通过 - ② 获取分区的详细信息
- ③ 从上一次drain的位置开始 取模 算法这次的开始位置
- ④
PartitionInfo
:获取这个位置的详细信息public class PartitionInfo { //topic private final String topic; //分区 private final int partition; //leader 节点 private final Node leader; //topic对应的replica 集合 private final Node[] replicas; //ISR可用Node 集合 private final Node[] inSyncReplicas; //OSR不可用Node 集合 private final Node[] offlineReplicas; }
- ⑤ 通过这个 PartitionInfo 封装成新的
topic-partition
映射关系 - ⑥ 更新
drainIndex
- ⑦ 获取
ProducerBatch
- ⑧ 这个判断在
ready( )
方法中的条件一
分析过。 - ⑨ 判断是否超过
maxSize(在①中有解释这个参数的含义)
,满了就直接结束循环 - ⑩ 调用
shouldStopDrainBatchesForPartition
方法判断是否要停止,跟事务有关系,不是本次的重点 - ⑪ 调用
batch#close
方法关闭底层输出流,将 MemoryRecords 设置为只读 - ⑫ 将 batch 的字节流添加到 size中
- ⑬ 添加到ready集合中发送。
- ⑭ 调用
ProducerBatch#drain
方法,更新 drainedMs 字段
2.2 ProducerBatch
public final class ProducerBatch {
//追加记录最后的状态:Aborted、Failed、Successed
private enum FinalState { ABORTED, FAILED, SUCCEEDED }
final long createdMs;
//① topic 与 partition 的映射关系
final TopicPartition topicPartition;
//② 标识ProducerBatch 的状态的future, 没有实现Future 接口,而是使用了 CountDownLatch 来实现类似Future 的功能
final ProduceRequestResult produceFuture;
//③ Thunk 对象集合 ,ProducerBatch 的内部类
private final List<Thunk> thunks = new ArrayList<>();
//④ MemoryRecords 对象 通过 builder 模式构建
private final MemoryRecordsBuilder recordsBuilder;
//⑤ 尝试发送当前ProducerBatch的次数
private final AtomicInteger attempts = new AtomicInteger(0);
private final boolean isSplitBatch;
//状态
private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
//⑥ 记录了当前保存的Record 个数
int recordCount;
//⑦ 最大record的字节数
int maxRecordSize;
//⑧ 最后一次尝试发送ProducerBatch的时间
private long lastAttemptMs;
//⑨ 最后一次追加record的时间
private long lastAppendTime;
//⑩ 在使用 drain 方法批量导出 RecordBatch 的时间
private long drainedMs;
//⑪是否正在重试
private boolean retry;
private boolean reopened;
}
- ①
topicPartition
:topic 与 partition 的映射关系 - ②
produceFuture
:标识ProducerBatch 的状态的future, 没有实现Future 接口,而是使用了 - ③
thunks
: Thunk 对象集合 ,ProducerBatch 的内部类,这个thunk里面包装了每个消息的回调。 - ④
recordsBuilder
:MemoryRecords 对象 通过 builder 模式构建 - ⑤
attempts
:尝试发送当前ProducerBatch的次数 - ⑥
recordCount
:记录了当前保存的Record 个数 - ⑦
maxRecordSize
:最大record的字节数 - ⑧
lastAttemptMs
:最后一次尝试发送ProducerBatch的时间 - ⑨
lastAppendTime
:最后一次追加record的时间 - ⑩
drainedMs
:在使用 drain 方法批量导出 RecordBatch 的时间 - ⑪
retry
: 是否正在重试
2.2.1 tryAppend 方法
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
//① 估算一下剩余的值是否足够,不够不直接return
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
//② 向MemoryRecords 中添加数据
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
//创建FutureRecordMetadata对象
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
//③ 每一次调用send 方法,都会有一个CallBack 对象,用于该条消息的回调,thunk就是封装这个消息的回调,thunks就是消息回调对象的集合
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
- ① 估算一下剩余的值是否足够,不够不直接return,详情见
2.4
小节 - ② 向MemoryRecords 中添加数据,详情见
2.4
小节 - ③ 每一次调用send 方法,都会有一个CallBack 对象,用于该条消息的回调,thunk就是封装这个消息的回调,thunks就是消息回调对象的集合,通过Sender 线程进行回调。
2.3 BufferPool
在 2.1.1
小节中的 第⑦步
,第⑩步
类似这样的创建和释放 ByteBuffer 是比较耗费资源的,所以通过BufferPool 实现 ByteBuffer
对象池,这种技术在 Netty
中也有体现。
public class BufferPool {
static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";
//①
private final long totalMemory;
//②
private final int poolableSize;
//③ ReentrantLock 锁,因为会有多线程并发的分配和回收,所以要用锁进行控制
private final ReentrantLock lock;
//④ 是 ArrayDeque<ByteBuffer> 队列,也就是说对象池是通过队列来实现的
private final Deque<ByteBuffer> free;
//⑤ 条件队列:记录因申请不到足够空间而堵塞的线程,此队列中实际记录的是堵塞线程对应的Condition对象
private final Deque<Condition> waiters;
//⑥
private long nonPooledAvailableMemory;
private final Metrics metrics;
private final Time time;
private final Sensor waitTime;
private boolean closed;
}
- ①
totalMemory
:总大小,默认32MB, 可以通过buffer.memory
进行配置 - ②
poolableSize
:每个ByteBuffer 的大小,也就是ProducerBatch 默认16kb,可以通过batch.size
进行设置 - ③
lock
:ReentrantLock 锁,因为会有多线程并发的分配和回收,所以要用锁进行控制 - ④
free
:是 ArrayDeque 队列,也就是说对象池是通过队列来实现的 - ⑤
waiters
:条件队列:记录因申请不到足够空间而堵塞的线程,此队列中实际记录的是堵塞线程对应的Condition对象 - ⑥
nonPooledAvailableMemory
:可用的空间大小,totalMemory - free队列中全部的ByteBuffer大小
注意
:通过 poolableSize 字段可以知道,BufferPool 只能对指定大小的ByteBuffer 进行复用,可以通过增大 batch.size
属性增加这个的复用对象的大小
2.3.1 allocate 方法
申请ByteBuffer 也就是申请ProducerBatch,如果没有足够的内存分配会把当前线程塞入 waiters
条件队列堵塞,等待有线程释放了内存,然后唤醒。
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
//① 加锁同步
this.lock.lock();
if (this.closed) {
this.lock.unlock();
throw new KafkaException("Producer closed while allocating memory");
}
try {
// check if we have a free buffer of the right size pooled
//② 申请分配的是否是 poolableSize 指定大小的,并且 如果 free 中有空闲的ByteBuffer 直接获取返回
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
//③ 不满足② 计算当前 free 队列中的空间
int freeListSize = freeSize() * this.poolableSize;
//④ 未分配的可用空间 > 所要分配的大小,也就是说有空间能进行分配,那么会进行分配
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
//⑤ 不够则加入条件队列进行堵塞
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
//⑥ 堵塞 超时时间为 remainingTimeToBlockNs
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
recordWaitTime(timeNs);
}
if (this.closed)
throw new KafkaException("Producer closed while allocating memory");
if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
//⑦ 有足够的空间分配了,则退出循环
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
//⑧ 否则先分配一部分,继续等待其他线程释放空间。
} else {
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
this.nonPooledAvailableMemory -= got;
accumulated += got;
}
}
// Don't reclaim memory on throwable since nothing was thrown
accumulated = 0;
} finally {
// When this loop was not able to successfully terminate don't loose available memory
this.nonPooledAvailableMemory += accumulated;
//⑨ 已经分配了空间 从条件队列中删除
this.waiters.remove(moreMemory);
}
}
} finally {
// signal any additional waiters if there is more memory left
// over for them
try {
//⑩ 如果还有空间的 唤醒下一个线程
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
// Another finally... otherwise find bugs complains
//⑪ 释放锁,整个过程都是同步的。
lock.unlock();
}
}
if (buffer == null)
return safeAllocateByteBuffer(size);
else
return buffer;
}
- ①
lock( )
:加锁同步 - ② 申请分配的是否是
poolableSize
指定大小的,并且 如果 free 中有空闲的 ByteBuffer 直接获取返回 - ③ 不满足
②
计算当前 free 队列中的空间 - ④ 未分配的可用空间 > 所要分配的大小,也就是说有空间能进行分配,那么会进行分配
- ⑤ 不够则加入条件队列进行堵塞
- ⑥ 堵塞 超时时间为
remainingTimeToBlockNs
- ⑦ 有足够的空间分配了,则退出循环
- ⑧ 否则先分配一部分,继续等待其他线程释放空间。
- ⑨ 已经分配了空间 从条件队列中删除
- ⑩
signal( )
:如果还有空间的 唤醒下一个线程 - ⑪
unlock( )
:释放锁,整个过程都是同步的。
2.3.2 deallocate 方法 相对的就是释放。
public void deallocate(ByteBuffer buffer, int size) {
//① 加锁
lock.lock();
try {
//② 释放是的 poolableSize 大小的,直接加入 free 队列中
if (size == this.poolableSize && size == buffer.capacity()) {
buffer.clear();
this.free.add(buffer);
} else {
//③ 不是的 poolableSize 大小的,不会进行复用,往 nonPooledAvailableMemory 添加
this.nonPooledAvailableMemory += size;
}
//④ 线程间的通讯,唤醒条件队列中因空间不足而堵塞的线程
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally {
lock.unlock();
}
}
- ① 加锁
- ② 释放是的 poolableSize 大小的,直接加入 free 队列中
- ③ 不是的 poolableSize 大小的,不会进行复用,往 nonPooledAvailableMemory 添加
- ④ 线程间的通讯,唤醒条件队列中因空间不足而堵塞的线程
2.4 MemoryRecords
来开始填 2.1.1append
方法中第 ③
⑧
⑨
步留下的坑,可以看到调用的是ProducerBatch 类中的 tryAppend
方法具体看 2.2.1
中的第 ②
步,而ProducerBatch 又是调用的 MemoryReocrdsBuilder
类中的 append 方法,而 MemoryReocrdsBuilder 类实际是操作 MemoryRecords
,而MemoryRecords 底层又是使用的 BufferPool
创建的 ByteBuffer ,整体流程就串起来了。
整个tryAppend( )方法流程如下:
- ①
KafkaProducer#send
:发送一条消息 假设发往Topic 是2 Partition是1,会先调用RecordAccumulator#tryAppend
缓存起来 对应2.1.1
- ②
RecordAccumulator
:去获取是否是有对应的Deque,有直接获取,没有创建。 - ③
ProducerBatch#append
:通过 MemoryReocrdsBuilder,往ByteBuffer 中添加记录 - ④
BufferPool
: 如果第③步
添加成功直接返回,但是如果失败,则会 allocate 空间继续走第③步
所以呢这个 MemoryReocrdsBuilder
实际就是对 MemoryRecords 的包装进而操控 BufferPool 创建出来的 ByteBuffer
往里追加 record。
public class MemoryRecords extends AbstractRecords {
//① java nio ByteBuffer
private final ByteBuffer buffer;
//② 私有构造器
private MemoryRecords(ByteBuffer buffer) {
Objects.requireNonNull(buffer, "buffer should not be null");
this.buffer = buffer;
}
//③ MemoryRecordsBuilder builder 模式
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset) {
long logAppendTime = RecordBatch.NO_TIMESTAMP;
if (timestampType == TimestampType.LOG_APPEND_TIME)
logAppendTime = System.currentTimeMillis();
return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
}
}
- ① 可以看到MemoryRecords 就是对Java中的ByteBuffer 进行操作。
- ② 私有构造器不允许外部使用
- ③ 通过builder 模式创建
MemoryRecordsBuilder
,通过 MemoryRecordsBuilder 对MemoryRecords 进行操作。
MemoryRecordsBuilder
在MemoryRecordsBuilder 中两个输出类型的字段 bufferStream
和 appendStream
public class MemoryRecordsBuilder implements AutoCloseable {
//① 对ByteBuffer的封装
private final ByteBufferOutputStream bufferStream;
//② 对bufferStream 的包装带有压缩功能
private DataOutputStream appendStream;
private MemoryRecords builtRecords;
}
- ①
bufferStream
: 继承java.io.OutputStream,当写入数据超出ByteBuffer 的时候自动扩容。 - ②
appendStream
: 通过 timestampType 字段选择的压缩算法,对 bufferStream的包装。
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
Kafka 四种压缩算法性能结论: ZStandard有着最高的压缩比,相同的消息量占用最少的磁盘容量,因此带宽的占用也是比较少的,但是在TPS方面的表现并不抢眼,因此对于那些在乎磁盘和带宽资源的用户而言,配置ZStandard算法似乎是个不错的选择,但如果追求应用TPS,就目前的Kafka而言LZ4依然是最好的选择。 参考自:Kafka 2.1.0压缩算法性能测试
大致流程就分析完了,接下来会分析Sender线程是如何来RecordAccumulator中获取消息并且发送。