1.概述
在分析了 KafkaProducer
如何把消息存入 RecordAccumulator
中,那么这些ProducerBatch 是如何发送给 Kafka Server 的呢?
如下图:
当每次调用 RecordAccumulator#append( )
追加消息的的时候,如果发现当前的 ProducerBatch 满了或者说创建了一个新的 ProducerBatch ,那么就会唤醒 Sender 线程来获取消息进行发送。
代码如下:
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
那这一章,我们就来分析一下 Sender 线程
是如何拉取消息并且发送消息的。
2. 源码分析
2.1 Sender 线程
public class Sender implements Runnable {
//① kafka 网络通信客户端,主要封装与 broker 的网络通信。
private final KafkaClient client;
//② 消息记录累积器
private final RecordAccumulator accumulator;
//③ 元数据
private final ProducerMetadata metadata;
//④ 是否需要保证消息的顺序性。
private final boolean guaranteeMessageOrder;
//⑤ 调用 send 方法发送的最大请求大小,通过参数 max.request.size 来设置。
private final int maxRequestSize;
//⑥ ack,通过acks 设置
private final short acks;
//⑦ 重试次数
private final int retries;
//时间工具类
private final Time time;
//⑧ 该线程状态,为 true 表示运行中。
private volatile boolean running;
//⑨ 是否强制关闭,此时会忽略正在发送中的消息。
private volatile boolean forceClose;
//⑩ 消息发送相关的统计指标收集器。
private final SenderMetrics sensors;
//⑪ 请求的超时时间,
private final int requestTimeoutMs;
//⑫ 当发送消息失败时,重试的间隔时间,默认100,可以通过`retry.backoff.ms`配置
private final long retryBackoffMs;
//⑬ 版本信息
private final ApiVersions apiVersions;
//⑭ 事务处理器
private final TransactionManager transactionManager;
//⑮ 正在执行发送相关的消息批次。
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
}
- ①
client
kafka 网络通信客户端,主要封装与 broker 的网络通信。 - ②
accumulator
消息记录累积器 - ③
metadata
元数据 - ④
guaranteeMessageOrder
是否需要保证消息的顺序性。 - ⑤
maxRequestSize
调用 send 方法发送的最大请求大小,通过参数 max.request.size 来设置。 - ⑥
ack
通过acks 设置 - ⑦
retries
重试次数 - ⑧
running
该线程状态,为 true 表示运行中。 - ⑨
forceClose
是否强制关闭,此时会忽略正在发送中的消息。 - ⑩
sensors
消息发送相关的统计指标收集器。 - ⑪
requestTimeoutMs
请求的超时时间, - ⑫
retryBackoffMs
当发送消息失败时,重试的间隔时间,默认100,可以通过retry.backoff.ms
配置 - ⑬
apiVersions
版本信息 - ⑭
transactionManager
事务处理器 - ⑮
inFlightBatches
正在执行发送相关的消息批次。
2.1.1 Sender线程流程图
由于Sender 线程是实现 Runnable
接口的,所以当它唤醒的时候就会执行 run
方法
- ① 判断状态是否是运行中,根据
running
属性。 - ② 如果是
close
状态,那就通过forceClose
是否要强制关闭。 - ③ 如果
forceClose
是true的话 那就直接调用NetworkClient#close
方法关闭 - ④ 如果
running
属性是运行中,那么调用runOnce
进行发送消息 - ⑤ 调用
Sender#sendProducerData
方法准备要发送的数据封装成ClientRequest
,放入KafkaChannel 的 send字段中, - ⑥ 调用
NetworkClient#poll
方法,进行发送消息
2.1.2 run 方法
public void run() {
log.debug("Starting Kafka producer I/O thread.");
//① 判断状态
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
//② forceClose
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
2.1.3 runOnce 方法
void runOnce() {
if (transactionManager != null) {
try {
transactionManager.maybeResolveSequences();
if (transactionManager.hasFatalError()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, time.milliseconds());
return;
}
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
if (maybeSendAndPollTransactionalRequest()) {
return;
}
} catch (AuthenticationException e) {
log.trace("Authentication exception while processing transactional request", e);
transactionManager.authenticationFailed(e);
}
}
long currentTimeMs = time.milliseconds();
//① sendProducerData 准备发送消息
long pollTimeout = sendProducerData(currentTimeMs);
//② 发送消息
client.poll(pollTimeout, currentTimeMs);
}
接下来分析比较重要的两个方法 sendProducerData
和 poll
2.1.4 sendProducerData 方法
与 2.1.1
中的流程图相对应
private long sendProducerData(long now) {
//5.1 拉取集群的元数据
Cluster cluster = metadata.fetch();
//5.2 调用 accumulator.ready 获取准备好发送给的ProducerBatch
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
//5.3 更新找不到Leader 节点的ProducerBatch 的元数据
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
//5.4 调用NetworkClient.ready 把没有准备好的Node 从readyNodes 集合中剔除
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
//5.5 调用 accumulator.drain 方法把准备的ProducerBatch 进行转换
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
//5.6 添加到 InflightBatches,主要作用是缓存发送出去的消息但是还没有得到响应的请求
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
//5.7 处理过期的Batch
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);
}
}
sensors.updateProduceRequestMetrics(batches);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
//5.8 将ProducerBatch 放入 KafkaChannel 中的send 等待发送
sendProduceRequests(batches, now);
return pollTimeout;
}
-
5.1 拉取集群的元数据
-
5.2 调用
accumulator.ready
获取准备好发送给的ProducerBatch- 在Kafka 生产者(二)RecordAccumulate的
2.1.2
小节中已经分析过了
- 在Kafka 生产者(二)RecordAccumulate的
-
5.3 更新找不到Leader 节点的ProducerBatch 的元数据
-
5.4 调用
NetworkClient.ready
把没有准备好的Node 从readyNodes 集合中剔除。和accumulator.ready
方法的区别是:NetworkClient.ready
:与KafkaServer 建立Tcp连接 三次握手,如果建立不成功那么肯定无法发送这里就会剔除
-
5.5 调用
accumulator.drain
方法把准备的ProducerBatch 进行转换- 在Kafka 生产者(二)RecordAccumulate的
2.1.3
小节中已经分析过了
- 在Kafka 生产者(二)RecordAccumulate的
-
5.6 添加到
InflightBatches
,主要作用是缓存发送出去的消息但是还没有得到响应的请求 -
5.7 处理过期的Batch
-
5.8 将
ProducerBatch
放入KafkaChannel
中的 send 字段等待发送
2.1.5 NetworkClient.poll 方法 发送数据
//org.apache.kafka.clients.NetworkClient
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
if (!abortedSends.isEmpty()) {
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
// 发送数据
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
}
NetworkClient
那一块没有进行分析,因为我觉得这一块放在 Network Layer
那分析更加容易理解。
3.总结
-
在主线程中由
KafkaProducer
创建消息,然后通过拦截器
、序列化器
和分区器
的之后缓存到消息累加器(RecordAccumulator)
中。 -
Sender
线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。 -
RecordAccumulator
主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。 -
RecordAccumulator 缓存的大小可以通过生产者
客户端参数buffer.memory
配置,默认值为 33554432B,即32MB
。 -
如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。
-
主线程中发送过来的消息都会被追加到
RecordAccumulator
的某个双端队列(Deque)中,在RecordAccumulator 的内部为每个分区都维护了一个双端队列
,队列中的内容就是ProducerBatch
,即Deque<ProducerBatch>
。消息写入缓存时,追加到双端队列的尾部; -
Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个
ProducerRecord
。-
ProducerRecord
:是生产者中创建的消息 -
ProducerBatch
:是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧凑,减少网络请求的次数以提升整体的吞吐量 -
如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量
-
-
消息在网络上都是以
字节(Byte)
的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer
实现消息内存的创建和释放。- 不过频繁的创建和释放是比较耗费资源的,在
RecordAccumulator
的内部还有一个BufferPool
,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。 - BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中。
- 这个特定的大小由
batch.size
参数来指定,默认值为16384B,即16KB
。我们可以适当地调大batch.size参数以便多缓存一些消息。
- 不过频繁的创建和释放是比较耗费资源的,在
-
Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本
<分区,Deque<ProducerBatch>>
的保存形式转变成<Node,List< ProducerBatch>
的形式。Node
表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区- 而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
-
在转换成
<Node,List<ProducerBatch>>
的形式之后,Sender 还会进一步封装成<Node,Request>
的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest
-
请求在从Sender线程发往Kafka之前还会保存到
InFlightRequests
中,InFlightRequests保存对象的具体形式为Map<NodeId,Deque<Request>>
,-
它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。
-
InFlightRequests
还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection
,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。 有点像Java中 Semaphore 的意思 -
通过比较
Deque<Request>
的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
-
-
InFlightRequests
还可以获得leastLoadedNode
,即所有Node中负载最小的那一个。- 这里的负载最小是通过每个 Node 在 InFlightRequests 中还未确认的请求决定的,未确认的请求越多则认为负载越大。