Kafka 延迟操作(二)DelayedProduce

2020年5月7日 | 作者 Siran | 2100字 | 阅读大约需要5分钟
归档于 消息队列 | 标签 #kafka

概述

Kafka Producer 的ack 设置为 all,需要所有的 ISR 都接收到这条消息后才会返回。这里就用到了延迟操作。

具体流程图如下

  • ① 生产者发送 ProducerRequest 向某些指定分区追加消息。
  • ProducerRequest 经过网络层和 Api 层(KafkaApis)的处理达到 ReplicaManager,它会将消息交给日志存储系统进行追加 Log,同时还会检查 delayedFetchPurgatory 中相关 key 对应的 DelayedFetch,满足条件则将其执行完成。
  • ③ 日志存储子系统返回追加消息的结果。
  • ReplicaManagerProducerRequest 创建 DelayedProduce 对象,并交由 delayedProducePurgatory 管理。
  • delayedProducePurgatory 使用 SystemTimer 管理 DelayedProduce 是否超时。
  • ISR 集合中的 Follower 副本发送 FetchRequest 请求与 Leader 副本同步消息。同时,也会检查 DelayedProduce 是否符合执行条件。
  • DelayedProduce 执行时会调用回调函数产生 ProducerResponse,并将其添加到 RequestChannels 中。
  • ⑧ 由网络层将 ProducerResponse 返回给客户端。

DelayedProduce 源码分析

ReplicaManager.appendRecords 方法

对应上面的第 步,Replica 也是 Kafka 中非常重要的一部分,后面会进行分析,这里主要关注 DelayedProduce

def appendRecords(timeout: Long,
                    requiredAcks: Short,
                    internalTopicsAllowed: Boolean,
                    origin: AppendOrigin,
                    entriesPerPartition: Map[TopicPartition, MemoryRecords],//需要添加的条目,key是key-partition,value 是memoryRecords
                    responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                    delayedProduceLock: Option[Lock] = None,
                    recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {
    if (isValidRequiredAcks(requiredAcks)) {
      val sTime = time.milliseconds
      //重点方法:添加到本地log中
      val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        origin, entriesPerPartition, requiredAcks)
      debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

      val produceStatus = localProduceResults.map { case (topicPartition, result) =>
        topicPartition ->
                ProducePartitionStatus(
                  result.info.lastOffset + 1, // required offset
                  new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime,
                    result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status
      }

      recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })

      //ack 设置的如果是 -1 0 那么放入延迟队列。
      if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
        // create delayed produce operation
        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
        //① 封装成 delayProduce
        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

        // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
        val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq

        // try to complete the request immediately, otherwise put it into the purgatory
        // this is because while the delayed produce operation is being created, new
        // requests may arrive and hence make this operation completable.
        //② 尝试完成 DelayedProduce,否则将 DelayedProduce 添加到 delayedProducePurgatory 进行管理
        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

      } else {
        // we can respond immediately
        val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
        responseCallback(produceResponseStatus)
      }
    } else {
      // If required.acks is outside accepted range, something is wrong with the client
      // Just return an error and don't handle the request at all
      val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
        topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
          LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
      }
      responseCallback(responseStatus)
    }
  }
  • ① 封装成 delayProduce
  • ② 尝试完成 DelayedProduce,否则将 DelayedProduce 添加到 delayedProducePurgatory 进行管理

下面开始分析 DelayedProduce


DelayedProduce 属性

class DelayedProduce(delayMs: Long,
                     produceMetadata: ProduceMetadata,
                     replicaManager: ReplicaManager,
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                     lockOpt: Option[Lock] = None)
  extends DelayedOperation(delayMs, lockOpt) {
  }      
  • delayMs:DelayedProduce 操作的延迟时长
  • produceMetadata:ProduceMetadata 对象,记录了所有相关分区追加消息后的返回结果,主要判断 DelayedProduce 操作是否满足返回要求
  • replicaManager:和 LogManager 一样 用来管理 replica
  • responseCallback:回调,任务满足条件或到期执行时,在 onComplete 方法中调用回调函数。

tryComplete 方法

作为 DelayedOperation 的实现类,DelayedProduce 的 tryComplete 方法的实现:

  1. 该分区出现了 Leader 副本的迁移,该分区的 Leader 副本不再位于此节点上,此时会更新对应 ProducePartitionStatus 中记录的错误码
  2. 正常情况下,ISR 集合中所有副本都完成乐同步后,该分区的 Leader 副本的 HW 位置已经大于对应的 ProduceStatus.requiredOffset,此时会清空初始化中设置的超时错误码。
  3. 如果出现异常,则更新分区对应的 ProducePartitionStatus 中记录的错误码
override def tryComplete(): Boolean = {
    // check for each partition if it still has pending acks
    // ① 遍历 produceMetadata 中的所有分区状态。
    produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
      trace(s"Checking produce satisfaction for $topicPartition, current status $status")
      // skip those partitions that have already been satisfied
      // ② 检查此分区是否已经满足 DelayedProduce 执行条件
      if (status.acksPending) {
        // 获取 partition 对象
        val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition, expectLeader = true) match {
          case Left(err) =>
            // Case A
            (false, err)

          //③ 检查此分区的 HW 位置是否大于 requiredOffset。
          case Right(partition) =>
            partition.checkEnoughReplicasReachOffset(status.requiredOffset)
        }

        // Case B.1 || B.2
        // 出现异常 
        if (error != Errors.NONE || hasEnough) {
          status.acksPending = false
          status.responseStatus.error = error
        }
      }
    }

    // check if every partition has satisfied at least one of case A or B
    //④ 检查全部的分区是否都已经符合 DelayedProduce 的执行条件
    if (!produceMetadata.produceStatus.values.exists(_.acksPending))
      forceComplete()
    else
      false
  }
  • ① 遍历 produceMetadata 中的所有分区状态。
  • ② 检查此分区是否已经满足 DelayedProduce 执行条件
  • ③ 检查此分区的 HW 位置是否大于 requiredOffset。
  • ④ 检查全部的分区是否都已经符合 DelayedProduce 的执行条件

onComplete 方法

完成方式

 override def onComplete(): Unit = {
    // 根据 ProduceMetadata 记录的相关信息,为每个 Partition 产生响应状态
    val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }
    // 调用 responseCallback 回调函数
    responseCallback(responseStatus)
  }

这里调用的回调函数,在KafkaApis 类中。

//kafka.server.KafkaApis
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
      //生成响应状态集合,其中包括通过授权验证并处理完成的状态以及未通过授权验证的状态。
      val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
      //标识处理 producerRequest 的过程中出现的异常。
      var errorInResponse = false

      mergedResponseStatus.foreach { case (topicPartition, status) =>
        if (status.error != Errors.NONE) {
          errorInResponse = true
          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
            request.header.correlationId,
            request.header.clientId,
            topicPartition,
            status.error.exceptionName))
        }
      }

      // When this callback is triggered, the remote API call has completed
      request.apiRemoteCompleteTimeNanos = time.nanoseconds

      // Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the quotas
      // have been violated. If both quotas have been violated, use the max throttle time between the two quotas. Note
      // that the request quota is not enforced if acks == 0.
      val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, numBytesAppended, time.milliseconds)
      val requestThrottleTimeMs = if (produceRequest.acks == 0) 0 else quotas.request.maybeRecordAndGetThrottleTimeMs(request)
      val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
      if (maxThrottleTimeMs > 0) {
        if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
          quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendResponse)
        } else {
          quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)
        }
      }

      // Send the response immediately. In case of throttling, the channel has already been muted.
      if (produceRequest.acks == 0) {
        // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
        // the request, since no response is expected by the producer, the server will close socket server so that
        // the producer client will know that some error has happened and will refresh its metadata
        if (errorInResponse) {
          val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
            topicPartition -> status.error.exceptionName
          }.mkString(", ")
          info(
            s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
              s"from client id ${request.header.clientId} with ack=0\n" +
              s"Topic and partition to exceptions: $exceptionsSummary"
          )
          closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
        } else {
          // Note that although request throttling is exempt for acks == 0, the channel may be throttled due to
          // bandwidth quota violation.
          sendNoOpResponseExemptThrottle(request)
        }
      } else {
        sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)
      }
    }

    def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
      processingStats.foreach { case (tp, info) =>
        updateRecordConversionStats(request, tp, info)
      }
    }

    if (authorizedRequestInfo.isEmpty)
      sendResponseCallback(Map.empty)
    else {
      val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId

      // call the replica manager to append messages to the replicas
      replicaManager.appendRecords(
        timeout = produceRequest.timeout.toLong,
        requiredAcks = produceRequest.acks,
        internalTopicsAllowed = internalTopicsAllowed,
        origin = AppendOrigin.Client,
        entriesPerPartition = authorizedRequestInfo,
        responseCallback = sendResponseCallback,
        recordConversionStatsCallback = processingStatsCallback)

      // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
      // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
      produceRequest.clearPartitionRecords()
    }
  }