概述
Kafka Producer 的ack
设置为 all
,需要所有的 ISR
都接收到这条消息后才会返回。这里就用到了延迟操作。
具体流程图如下:
- ① 生产者发送
ProducerRequest
向某些指定分区追加消息。 - ②
ProducerRequest
经过网络层和 Api 层(KafkaApis
)的处理达到ReplicaManager
,它会将消息交给日志存储系统进行追加Log
,同时还会检查delayedFetchPurgatory
中相关 key 对应的DelayedFetch
,满足条件则将其执行完成。 - ③ 日志存储子系统返回追加消息的结果。
- ④
ReplicaManager
为ProducerRequest
创建DelayedProduce
对象,并交由delayedProducePurgatory
管理。 - ⑤
delayedProducePurgatory
使用SystemTimer
管理DelayedProduce
是否超时。 - ⑥
ISR
集合中的Follower
副本发送FetchRequest
请求与Leader
副本同步消息。同时,也会检查DelayedProduce
是否符合执行条件。 - ⑦
DelayedProduce
执行时会调用回调函数产生ProducerResponse
,并将其添加到RequestChannels
中。 - ⑧ 由网络层将
ProducerResponse
返回给客户端。
DelayedProduce 源码分析
ReplicaManager.appendRecords 方法
对应上面的第 ②
步,Replica
也是 Kafka
中非常重要的一部分,后面会进行分析,这里主要关注 DelayedProduce
。
def appendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicPartition, MemoryRecords],//需要添加的条目,key是key-partition,value 是memoryRecords
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
//重点方法:添加到本地log中
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
origin, entriesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime,
result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status
}
recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
//ack 设置的如果是 -1 0 那么放入延迟队列。
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
//① 封装成 delayProduce
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
//② 尝试完成 DelayedProduce,否则将 DelayedProduce 添加到 delayedProducePurgatory 进行管理
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(produceResponseStatus)
}
} else {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
}
responseCallback(responseStatus)
}
}
- ① 封装成
delayProduce
- ② 尝试完成
DelayedProduce
,否则将DelayedProduce
添加到delayedProducePurgatory
进行管理
下面开始分析 DelayedProduce
DelayedProduce 属性
class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
lockOpt: Option[Lock] = None)
extends DelayedOperation(delayMs, lockOpt) {
}
delayMs
:DelayedProduce 操作的延迟时长produceMetadata
:ProduceMetadata 对象,记录了所有相关分区追加消息后的返回结果,主要判断 DelayedProduce 操作是否满足返回要求replicaManager
:和 LogManager 一样 用来管理 replicaresponseCallback
:回调,任务满足条件或到期执行时,在onComplete
方法中调用回调函数。
tryComplete 方法
作为 DelayedOperation 的实现类,DelayedProduce 的 tryComplete 方法的实现:
- 该分区出现了 Leader 副本的迁移,该分区的 Leader 副本不再位于此节点上,此时会更新对应 ProducePartitionStatus 中记录的错误码
- 正常情况下,ISR 集合中所有副本都完成乐同步后,该分区的 Leader 副本的 HW 位置已经大于对应的 ProduceStatus.requiredOffset,此时会清空初始化中设置的超时错误码。
- 如果出现异常,则更新分区对应的 ProducePartitionStatus 中记录的错误码
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
// ① 遍历 produceMetadata 中的所有分区状态。
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
trace(s"Checking produce satisfaction for $topicPartition, current status $status")
// skip those partitions that have already been satisfied
// ② 检查此分区是否已经满足 DelayedProduce 执行条件
if (status.acksPending) {
// 获取 partition 对象
val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition, expectLeader = true) match {
case Left(err) =>
// Case A
(false, err)
//③ 检查此分区的 HW 位置是否大于 requiredOffset。
case Right(partition) =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
}
// Case B.1 || B.2
// 出现异常
if (error != Errors.NONE || hasEnough) {
status.acksPending = false
status.responseStatus.error = error
}
}
}
// check if every partition has satisfied at least one of case A or B
//④ 检查全部的分区是否都已经符合 DelayedProduce 的执行条件
if (!produceMetadata.produceStatus.values.exists(_.acksPending))
forceComplete()
else
false
}
- ① 遍历
produceMetadata
中的所有分区状态。 - ② 检查此分区是否已经满足
DelayedProduce
执行条件 - ③ 检查此分区的
HW
位置是否大于requiredOffset。
- ④ 检查全部的分区是否都已经符合
DelayedProduce
的执行条件
onComplete 方法
完成方式
override def onComplete(): Unit = {
// 根据 ProduceMetadata 记录的相关信息,为每个 Partition 产生响应状态
val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }
// 调用 responseCallback 回调函数
responseCallback(responseStatus)
}
这里调用的回调函数,在KafkaApis 类中。
//kafka.server.KafkaApis
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
//生成响应状态集合,其中包括通过授权验证并处理完成的状态以及未通过授权验证的状态。
val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
//标识处理 producerRequest 的过程中出现的异常。
var errorInResponse = false
mergedResponseStatus.foreach { case (topicPartition, status) =>
if (status.error != Errors.NONE) {
errorInResponse = true
debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
request.header.correlationId,
request.header.clientId,
topicPartition,
status.error.exceptionName))
}
}
// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeNanos = time.nanoseconds
// Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the quotas
// have been violated. If both quotas have been violated, use the max throttle time between the two quotas. Note
// that the request quota is not enforced if acks == 0.
val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, numBytesAppended, time.milliseconds)
val requestThrottleTimeMs = if (produceRequest.acks == 0) 0 else quotas.request.maybeRecordAndGetThrottleTimeMs(request)
val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
if (maxThrottleTimeMs > 0) {
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendResponse)
} else {
quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)
}
}
// Send the response immediately. In case of throttling, the channel has already been muted.
if (produceRequest.acks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
// the producer client will know that some error has happened and will refresh its metadata
if (errorInResponse) {
val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
topicPartition -> status.error.exceptionName
}.mkString(", ")
info(
s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
s"from client id ${request.header.clientId} with ack=0\n" +
s"Topic and partition to exceptions: $exceptionsSummary"
)
closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
} else {
// Note that although request throttling is exempt for acks == 0, the channel may be throttled due to
// bandwidth quota violation.
sendNoOpResponseExemptThrottle(request)
}
} else {
sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)
}
}
def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
processingStats.foreach { case (tp, info) =>
updateRecordConversionStats(request, tp, info)
}
}
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
// call the replica manager to append messages to the replicas
replicaManager.appendRecords(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
origin = AppendOrigin.Client,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback)
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
// hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
produceRequest.clearPartitionRecords()
}
}