Kafka 延迟操作(二)DelayedProduce

2020年5月7日 | 作者 Siran | 2100字 | 阅读大约需要5分钟
归档于 消息队列 | 标签 #kafka


Kafka Producer 的ack 设置为 all,需要所有的 ISR 都接收到这条消息后才会返回。这里就用到了延迟操作。


  • ① 生产者发送 ProducerRequest 向某些指定分区追加消息。
  • ProducerRequest 经过网络层和 Api 层(KafkaApis)的处理达到 ReplicaManager,它会将消息交给日志存储系统进行追加 Log,同时还会检查 delayedFetchPurgatory 中相关 key 对应的 DelayedFetch,满足条件则将其执行完成。
  • ③ 日志存储子系统返回追加消息的结果。
  • ReplicaManagerProducerRequest 创建 DelayedProduce 对象,并交由 delayedProducePurgatory 管理。
  • delayedProducePurgatory 使用 SystemTimer 管理 DelayedProduce 是否超时。
  • ISR 集合中的 Follower 副本发送 FetchRequest 请求与 Leader 副本同步消息。同时,也会检查 DelayedProduce 是否符合执行条件。
  • DelayedProduce 执行时会调用回调函数产生 ProducerResponse,并将其添加到 RequestChannels 中。
  • ⑧ 由网络层将 ProducerResponse 返回给客户端。

DelayedProduce 源码分析

ReplicaManager.appendRecords 方法

对应上面的第 步,Replica 也是 Kafka 中非常重要的一部分,后面会进行分析,这里主要关注 DelayedProduce

def appendRecords(timeout: Long,
                    requiredAcks: Short,
                    internalTopicsAllowed: Boolean,
                    origin: AppendOrigin,
                    entriesPerPartition: Map[TopicPartition, MemoryRecords],//需要添加的条目,key是key-partition,value 是memoryRecords
                    responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                    delayedProduceLock: Option[Lock] = None,
                    recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {
    if (isValidRequiredAcks(requiredAcks)) {
      val sTime = time.milliseconds
      val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        origin, entriesPerPartition, requiredAcks)
      debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

      val produceStatus = localProduceResults.map { case (topicPartition, result) =>
        topicPartition ->
                  result.info.lastOffset + 1, // required offset
                  new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime,
                    result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status

      recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })

      //ack 设置的如果是 -1 0 那么放入延迟队列。
      if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
        // create delayed produce operation
        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
        //① 封装成 delayProduce
        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

        // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
        val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq

        // try to complete the request immediately, otherwise put it into the purgatory
        // this is because while the delayed produce operation is being created, new
        // requests may arrive and hence make this operation completable.
        //② 尝试完成 DelayedProduce,否则将 DelayedProduce 添加到 delayedProducePurgatory 进行管理
        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

      } else {
        // we can respond immediately
        val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
    } else {
      // If required.acks is outside accepted range, something is wrong with the client
      // Just return an error and don't handle the request at all
      val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
        topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
          LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
下面开始分析 DelayedProduce

DelayedProduce 属性

class DelayedProduce(delayMs: Long,
                     produceMetadata: ProduceMetadata,
                     replicaManager: ReplicaManager,
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                     lockOpt: Option[Lock] = None)
  extends DelayedOperation(delayMs, lockOpt) {
  • delayMs:DelayedProduce 操作的延迟时长
  • produceMetadata:ProduceMetadata 对象,记录了所有相关分区追加消息后的返回结果,主要判断 DelayedProduce 操作是否满足返回要求
  • replicaManager:和 LogManager 一样 用来管理 replica
  • responseCallback:回调,任务满足条件或到期执行时,在 onComplete 方法中调用回调函数。

tryComplete 方法

作为 DelayedOperation 的实现类,DelayedProduce 的 tryComplete 方法的实现:

  1. 该分区出现了 Leader 副本的迁移,该分区的 Leader 副本不再位于此节点上,此时会更新对应 ProducePartitionStatus 中记录的错误码
  2. 正常情况下,ISR 集合中所有副本都完成乐同步后,该分区的 Leader 副本的 HW 位置已经大于对应的 ProduceStatus.requiredOffset,此时会清空初始化中设置的超时错误码。
  3. 如果出现异常,则更新分区对应的 ProducePartitionStatus 中记录的错误码
override def tryComplete(): Boolean = {
    // check for each partition if it still has pending acks
    // ① 遍历 produceMetadata 中的所有分区状态。
    produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
      trace(s"Checking produce satisfaction for $topicPartition, current status $status")
      // skip those partitions that have already been satisfied
      // ② 检查此分区是否已经满足 DelayedProduce 执行条件
      if (status.acksPending) {
        // 获取 partition 对象
        val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition, expectLeader = true) match {
          case Left(err) =>
            // Case A
            (false, err)

          //③ 检查此分区的 HW 位置是否大于 requiredOffset。
          case Right(partition) =>

        // Case B.1 || B.2
        // 出现异常 
        if (error != Errors.NONE || hasEnough) {
          status.acksPending = false
          status.responseStatus.error = error

    // check if every partition has satisfied at least one of case A or B
    //④ 检查全部的分区是否都已经符合 DelayedProduce 的执行条件
    if (!produceMetadata.produceStatus.values.exists(_.acksPending))
onComplete 方法


 override def onComplete(): Unit = {
    // 根据 ProduceMetadata 记录的相关信息,为每个 Partition 产生响应状态
    val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }
    // 调用 responseCallback 回调函数

这里调用的回调函数,在KafkaApis 类中。

def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
      val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
      //标识处理 producerRequest 的过程中出现的异常。
      var errorInResponse = false

      mergedResponseStatus.foreach { case (topicPartition, status) =>
        if (status.error != Errors.NONE) {
          errorInResponse = true
          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(

      // When this callback is triggered, the remote API call has completed
      request.apiRemoteCompleteTimeNanos = time.nanoseconds

      // Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the quotas
      // have been violated. If both quotas have been violated, use the max throttle time between the two quotas. Note
      // that the request quota is not enforced if acks == 0.
      val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, numBytesAppended, time.milliseconds)
      val requestThrottleTimeMs = if (produceRequest.acks == 0) 0 else quotas.request.maybeRecordAndGetThrottleTimeMs(request)
      val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
      if (maxThrottleTimeMs > 0) {
        if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
          quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendResponse)
        } else {
          quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)

      // Send the response immediately. In case of throttling, the channel has already been muted.
      if (produceRequest.acks == 0) {
        // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
        // the request, since no response is expected by the producer, the server will close socket server so that
        // the producer client will know that some error has happened and will refresh its metadata
        if (errorInResponse) {
          val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
            topicPartition -> status.error.exceptionName
          }.mkString(", ")
            s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
              s"from client id ${request.header.clientId} with ack=0\n" +
              s"Topic and partition to exceptions: $exceptionsSummary"
          closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
        } else {
          // Note that although request throttling is exempt for acks == 0, the channel may be throttled due to
          // bandwidth quota violation.
      } else {
        sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)

    def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
      processingStats.foreach { case (tp, info) =>
        updateRecordConversionStats(request, tp, info)

    if (authorizedRequestInfo.isEmpty)
    else {
      val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId

      // call the replica manager to append messages to the replicas
        timeout = produceRequest.timeout.toLong,
        requiredAcks = produceRequest.acks,
        internalTopicsAllowed = internalTopicsAllowed,
        origin = AppendOrigin.Client,
        entriesPerPartition = authorizedRequestInfo,
        responseCallback = sendResponseCallback,
        recordConversionStatsCallback = processingStatsCallback)

      // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
      // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log