Kafka 副本模块 ReplicaManager

2020年6月1日 | 作者 Siran | 6700字 | 阅读大约需要14分钟
归档于 消息队列 | 标签 #kafka

概述

Kafka 的 Replication Mechanism 是为了保证 Kafka 的高可用性,也就是说一个每个分区可以有多个副本,并且会其副本集合中(AR) 选出一个副本作为 Leader 副本,所有的读写请求都由选举出来的 Leader 副本处理。剩余的副本都作为其 Follower 副本,Follower 副本会从 Leader 副本处获取消息并更新到自己的 Log中。只有当 Leader 副本宕机之后,才会从 Follower 副本中重新选举新的Leader 副本对外提供服务。

Kafka 的 Replication Mechanism 使用了微软的 PacificA 算法


源码分析

1. ReplicaManager

Replica 的管理是通过 ReplicaManager 这个类进行管理的。

class ReplicaManager(val config: KafkaConfig,
                     metrics: Metrics,
                     time: Time,
                     val zkClient: KafkaZkClient,
                     /*
                       定时器,四个定时任务:
                        1.shutdown-idle-replica-alter-log-dirs-thread
                        2.highwatermark-checkpoint
                        3.isr-expiration
                        4.isr-change-propagation
                      */
                     scheduler: Scheduler,
                     val logManager: LogManager,// 对分区的读写操作都委托给底层的日志存储子系统
                     val isShuttingDown: AtomicBoolean,
                     quotaManagers: QuotaManagers,
                     val brokerTopicStats: BrokerTopicStats,//broker 状态
                     val metadataCache: MetadataCache,//元数据缓存
                     logDirFailureChannel: LogDirFailureChannel,
                     // 延迟操作
                     val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
                     val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
                     val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
                     val delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
                     threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
                     }
  • scheduler:定时器,有四个定时任务,下面会一一进行分析。
  • logManager:Log 管理,前面已经分析过了
  • brokerTopicStats:Broker 的状态
  • metadataCache:元数据缓存
  • DelayedOperationPurgatory: 延迟操作,前面已经分析过了

1.1 becomeLeaderOrFollower( )方法

使指定 replica 成为 Leader 或者 Follower

def becomeLeaderOrFollower(correlationId: Int,
                             leaderAndIsrRequest: LeaderAndIsrRequest,
                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
    if (stateChangeLogger.isTraceEnabled) {
      leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
        stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState " +
          s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " +
          s"epoch ${leaderAndIsrRequest.controllerEpoch}")
      }
    }
    replicaStateChangeLock synchronized {//加锁
      //① 检查 controller epoch,如果epoch过期,直接返回错误码
      if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
        stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " +
          s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
          s"Latest known controller epoch is $controllerEpoch")
        leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
      } else {
        //统计返回的错误码
        val responseMap = new mutable.HashMap[TopicPartition, Errors]
        val controllerId = leaderAndIsrRequest.controllerId
        controllerEpoch = leaderAndIsrRequest.controllerEpoch

        // First check partition's leader epoch
        // 进行一些准备工作,统计进行切换需要使用的信息
        val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
        val newPartitions = new mutable.HashSet[Partition]

        //②  根据 ReplicaManager 的缓存过滤出之前未保存过的topicPartition
        leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
          val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
          //获取partition对象,找不到就创建新的 partition 对象
          val partitionOpt = getPartition(topicPartition) match {
            case HostedPartition.Offline =>
              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
                s"controller $controllerId with correlation id $correlationId " +
                s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
                "partition is in an offline log directory")
              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
              None

            case HostedPartition.Online(partition) => Some(partition)

            case HostedPartition.None =>
              val partition = Partition(topicPartition, time, this)
              allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
              newPartitions.add(partition)
              Some(partition)
          }

          // ③ 逐个对 leaderAndIsrRequest 请求中的tp进行处理
          partitionOpt.foreach { partition =>
            val currentLeaderEpoch = partition.getLeaderEpoch
            val requestLeaderEpoch = partitionState.leaderEpoch
            //检查 leaderEpoch
            if (requestLeaderEpoch > currentLeaderEpoch) {
              // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
              // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
              //判断该分区的副本是否被分配到了当前的 broker
              if (partitionState.replicas.contains(localBrokerId))
                partitionStates.put(partition, partitionState)
              else {
                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
                  s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
                  s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
                responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
              }
            } else if (requestLeaderEpoch < currentLeaderEpoch) {
              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
                s"controller $controllerId with correlation id $correlationId " +
                s"epoch $controllerEpoch for partition $topicPartition since its associated " +
                s"leader epoch $requestLeaderEpoch is smaller than the current " +
                s"leader epoch $currentLeaderEpoch")
              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
            } else {
              stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
                s"controller $controllerId with correlation id $correlationId " +
                s"epoch $controllerEpoch for partition $topicPartition since its associated " +
                s"leader epoch $requestLeaderEpoch matches the current leader epoch")
              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
            }
          }
        }

        // ④ 到这里 leaderAndIsrRequest 请求中的tp 已经经过过滤了,然后根据 partitionState 中指定的角色进行分类
        val partitionsTobeLeader = partitionStates.filter { case (_, partitionState) =>
          partitionState.leader == localBrokerId
        }
        val partitionsToBeFollower = partitionStates -- partitionsTobeLeader.keys

        val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
        //⑤ 将指定分区的副本切换成leader 副本
        val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
          makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap,
            highWatermarkCheckpoints)
        else
          Set.empty[Partition]
        //⑥ 将指定分区的副本切换成follower 副本
        val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
            highWatermarkCheckpoints)
        else
          Set.empty[Partition]

        /*
         * KAFKA-8392
         * For topic partitions of which the broker is no longer a leader, delete metrics related to
         * those topics. Note that this means the broker stops being either a replica or a leader of
         * partitions of said topics
         */
        val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
        val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
        followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)

        // remove metrics for brokers which are not followers of a topic
        leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)

        //⑦ 处理offline partition 
        leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
          val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
          /*
           * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
           * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
           * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
           * we need to map this topic-partition to OfflinePartition instead.
           */
          if (localLog(topicPartition).isEmpty)
            markPartitionOffline(topicPartition)
        }

        // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
        // have been completely populated before starting the checkpointing there by avoiding weird race conditions
        //⑧ 第一次处理leaderAndIsrRequest 请求时初始化highwatermark线程,定期为各个partition写入highwatermark到highwatermark 文件中
        startHighWatermarkCheckPointThread()

        val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
        for (partition <- newPartitions) {
          val topicPartition = partition.topicPartition
          if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
            partition.log.foreach { log =>
              val leader = BrokerEndPoint(config.brokerId, "localhost", -1)

              // Add future replica to partition's map
              partition.createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true,
                highWatermarkCheckpoints)

              // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
              // replica from source dir to destination dir
              logManager.abortAndPauseCleaning(topicPartition)

              futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader,
                partition.getLeaderEpoch, log.highWatermark))
            }
          }
        }
        replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)

        //⑨ 关闭空闲的 fetcher 线程
        replicaFetcherManager.shutdownIdleFetcherThreads()
        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
        //⑩ 回调函数
        onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
        val responsePartitions = responseMap.iterator.map { case (tp, error) =>
          new LeaderAndIsrPartitionError()
            .setTopicName(tp.topic)
            .setPartitionIndex(tp.partition)
            .setErrorCode(error.code)
        }.toBuffer
        new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
          .setErrorCode(Errors.NONE.code)
          .setPartitionErrors(responsePartitions.asJava))
      }
    }
  }
  • ① 检查 controller epoch,如果epoch过期,直接返回错误码

  • ② 根据 ReplicaManager 的缓存过滤出之前未保存过的 topicPartition

  • ③ 逐个对 leaderAndIsrRequest 请求中的tp进行处理

  • ④ 到这里 leaderAndIsrRequest 请求中的tp 已经经过过滤了,然后根据 partitionState 中指定的角色进行分类

  • ⑤ 将指定分区的副本切换成 leader 副本

    private def makeLeaders(controllerId: Int,
                            controllerEpoch: Int,
                            partitionStates: Map[Partition, LeaderAndIsrPartitionState],
                            correlationId: Int,
                            responseMap: mutable.Map[TopicPartition, Errors],
                            highWatermarkCheckpoints: OffsetCheckpoints): Set[Partition] = {
      partitionStates.keys.foreach { partition =>
        stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " +
          s"controller $controllerId epoch $controllerEpoch starting the become-leader transition for " +
          s"partition ${partition.topicPartition}")
      }
    
      for (partition <- partitionStates.keys)
        responseMap.put(partition.topicPartition, Errors.NONE)
    
      val partitionsToMakeLeaders = mutable.Set[Partition]()
    
      try {
        // First stop fetchers for all the partitions
        // ① 停止 partition 对应的 fetch线程
        replicaFetcherManager.removeFetcherForPartitions(partitionStates.keySet.map(_.topicPartition))
        // Update the partition information to be the leader
        partitionStates.foreach { case (partition, partitionState) =>
          try {
            /* ② 调用 partition.makeLeader 方法使得当前 replica 成为 leader
                 如果makeLeader方法中,更新isr队列,在allReplicasMap缓存中移除那些被controller删除的副本brokerId
                 并且将follower副本逐个设置lastFetchLeaderLogEndOffset,lastFetchTimeMs,_lastCaughtUpTimeMs等offset变量
                 如果本leader副本为新的leader副本,则为新leader副本创建highWatermarkMetadata
                 并更新lastfetch信息 makeLeader返回结果为是否是新leader
             */
            if (partition.makeLeader(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) {
              partitionsToMakeLeaders += partition
              stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
                s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " +
                s"(last update controller epoch ${partitionState.controllerEpoch})")
            } else
              stateChangeLogger.info(s"Skipped the become-leader state change after marking its " +
                s"partition as leader with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
                s"partition ${partition.topicPartition} (last update controller epoch ${partitionState.controllerEpoch}) " +
                s"since it is already the leader for the partition.")
          } catch {
            case e: KafkaStorageException =>
              stateChangeLogger.error(s"Skipped the become-leader state change with " +
                s"correlation id $correlationId from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
                s"(last update controller epoch ${partitionState.controllerEpoch}) since " +
                s"the replica for the partition is offline due to disk error $e")
              val dirOpt = getLogDir(partition.topicPartition)
              error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e)
              responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
          }
        }
    
      } catch {
        case e: Throwable =>
          partitionStates.keys.foreach { partition =>
            stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " +
              s"from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition}", e)
          }
          // Re-throw the exception for it to be caught in KafkaApis
          throw e
      }
    
      partitionStates.keys.foreach { partition =>
        stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
          s"epoch $controllerEpoch for the become-leader transition for partition ${partition.topicPartition}")
      }
    
      partitionsToMakeLeaders
    }
    
    • ① 停止 partition 对应的 fetch线程
    • ② 调用 partition.makeLeader 方法使得当前 replica 成为 leader

      如果makeLeader方法中,更新isr队列,在allReplicasMap缓存中移除那些被controller删除的副本brokerId 并且将follower副本逐个设置lastFetchLeaderLogEndOffsetlastFetchTimeMs_lastCaughtUpTimeMs等offset变量 如果本leader副本为新的leader副本,则为新leader副本创建 highWatermarkMetadata 并更新lastfetch信息 makeLeader返回结果为是否是新leader

  • ⑥ 将指定分区的副本切换成 follower 副本

    private def makeFollowers(controllerId: Int,
                              controllerEpoch: Int,
                              partitionStates: Map[Partition, LeaderAndIsrPartitionState],
                              correlationId: Int,
                              responseMap: mutable.Map[TopicPartition, Errors],
                              highWatermarkCheckpoints: OffsetCheckpoints) : Set[Partition] = {
      partitionStates.foreach { case (partition, partitionState) =>
        stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
          s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
          s"${partitionState.leader}")
      }
    
      for (partition <- partitionStates.keys)
        responseMap.put(partition.topicPartition, Errors.NONE)
    
      val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
      try {
        // TODO: Delete leaders from LeaderAndIsrRequest
        partitionStates.foreach { case (partition, partitionState) =>
          val newLeaderBrokerId = partitionState.leader
          try {
            //① 进行判断,只有当 leader 时可用状态才能转变成follower
            metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
              // Only change partition state when the leader is available
              case Some(_) =>
                //② 通过partition.makeFollower()方法进行设置leader,清空isr的工作,如果缓存中的leaderReplicaIdOpt与本次发送过来请求中一致,则makeFollower返回false,否则更新leaderReplicaIdOpt并返回true
                if (partition.makeFollower(controllerId, partitionState, correlationId, highWatermarkCheckpoints))
                  partitionsToMakeFollower += partition
                else
                  stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
                    s"follower with correlation id $correlationId from controller $controllerId epoch $controllerEpoch " +
                    s"for partition ${partition.topicPartition} (last update " +
                    s"controller epoch ${partitionState.controllerEpoch}) " +
                    s"since the new leader $newLeaderBrokerId is the same as the old leader")
              case None =>
                // The leader broker should always be present in the metadata cache.
                // If not, we should record the error message and abort the transition process for this partition
                stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " +
                  s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
                  s"(last update controller epoch ${partitionState.controllerEpoch}) " +
                  s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
                // Create the local replica even if the leader is unavailable. This is required to ensure that we include
                // the partition's high watermark in the checkpoint file (see KAFKA-1647)
                partition.createLogIfNotExists(localBrokerId, isNew = partitionState.isNew, isFutureReplica = false,
                  highWatermarkCheckpoints)
            }
          } catch {
            case e: KafkaStorageException =>
              stateChangeLogger.error(s"Skipped the become-follower state change with correlation id $correlationId from " +
                s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
                s"(last update controller epoch ${partitionState.controllerEpoch}) with leader " +
                s"$newLeaderBrokerId since the replica for the partition is offline due to disk error $e")
              val dirOpt = getLogDir(partition.topicPartition)
              error(s"Error while making broker the follower for partition $partition with leader " +
                s"$newLeaderBrokerId in dir $dirOpt", e)
              responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
          }
        }
    
        //③ 通过replicaFetcherManager将那些leader有变化的副本的同步线程移除,后面会重新添加对应的同步线程
        replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
        partitionsToMakeFollower.foreach { partition =>
          stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
            s"epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " +
            s"${partitionStates(partition).leader}")
        }
          
        //④ 尝试完成 producer 和 fetch 的延迟任务
        partitionsToMakeFollower.foreach { partition =>
          completeDelayedFetchOrProduceRequests(partition.topicPartition)
        }
    
        partitionsToMakeFollower.foreach { partition =>
          stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " +
            s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " +
            s"controller $controllerId epoch $controllerEpoch with leader ${partitionStates(partition).leader}")
        }
    
        if (isShuttingDown.get()) {
          partitionsToMakeFollower.foreach { partition =>
            stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " +
              s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
              s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " +
              "since it is shutting down")
          }
        } else {
          // we do not need to check if the leader exists again since this has been done at the beginning of this process
          val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
            val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get
              .brokerEndPoint(config.interBrokerListenerName)
            val fetchOffset = partition.localLogOrException.highWatermark
            partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
         }.toMap
    
          //⑤ 添加fetcher 线程 用于进行副本之间的同步,由follower 副本 向leader 副本进行同步
          replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
          partitionsToMakeFollowerWithLeaderAndOffset.foreach { case (partition, initialFetchState) =>
            stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " +
              s"request from controller $controllerId epoch $controllerEpoch with correlation id $correlationId for " +
              s"partition $partition with leader ${initialFetchState.leader}")
          }
        }
      } catch {
        case e: Throwable =>
          stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " +
            s"received from controller $controllerId epoch $controllerEpoch", e)
          // Re-throw the exception for it to be caught in KafkaApis
          throw e
      }
    
      partitionStates.keys.foreach { partition =>
        stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
          s"epoch $controllerEpoch for the become-follower transition for partition ${partition.topicPartition} with leader " +
          s"${partitionStates(partition).leader}")
      }
    
      partitionsToMakeFollower
    }
    
    • ① 进行判断,只有当 leader 时可用状态才能转变成follower
    • ② 通过partition.makeFollower()方法进行设置leader,清空isr的工作,如果缓存中的leaderReplicaIdOpt与本次发送过来请求中一致,则makeFollower返回false,否则更新leaderReplicaIdOpt并返回true
    • ③ 通过 replicaFetcherManager 将那些leader有变化的副本的同步线程移除,后面会重新添加对应的同步线程
    • ④ 尝试完成 producer 和 fetch 的延迟任务
    • ⑤ 添加fetcher 线程 用于进行副本之间的同步,由follower 副本 向leader 副本进行同步
  • ⑦ 处理 offline partition

  • ⑧ 第一次处理 leaderAndIsrRequest 请求时初始化 highwatermark 线程,定期为各个 partition 写入 highwatermark 到 highwatermark 文件中

  • ⑨ 关闭空闲的 fetcher 线程

  • ⑩ 回调函数


1.2 stopReplicas( ) 方法

关闭指定分区下的 replica

def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors) = {
    replicaStateChangeLock synchronized {
      val responseMap = new collection.mutable.HashMap[TopicPartition, Errors]
      //① 检查请求中的 controllerEpoch 值
      if (stopReplicaRequest.controllerEpoch() < controllerEpoch) {
        stateChangeLogger.warn("Received stop replica request from an old controller epoch " +
          s"${stopReplicaRequest.controllerEpoch}. Latest known controller epoch is $controllerEpoch")
        (responseMap, Errors.STALE_CONTROLLER_EPOCH)
      } else {
        val partitions = stopReplicaRequest.partitions.asScala.toSet
        controllerEpoch = stopReplicaRequest.controllerEpoch
        // First stop fetchers for all partitions, then stop the corresponding replicas
        replicaFetcherManager.removeFetcherForPartitions(partitions)
        replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
        for (topicPartition <- partitions){
          try {
            // ② 停止指定分区的同步操作
            stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
            responseMap.put(topicPartition, Errors.NONE)
          } catch {
            case e: KafkaStorageException =>
              stateChangeLogger.error(s"Ignoring stop replica (delete=${stopReplicaRequest.deletePartitions}) for " +
                s"partition $topicPartition due to storage exception", e)
              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
          }
        }
        (responseMap, Errors.NONE)
      }
    }
  }

def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean)  = {
    stateChangeLogger.trace(s"Handling stop replica (delete=$deletePartition) for partition $topicPartition")

    //③ 根据 deletePartition 的值 决定是否对log进行删除
    if (deletePartition) {
      getPartition(topicPartition) match {
        case HostedPartition.Offline =>
          throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk")

        case hostedPartition @ HostedPartition.Online(removedPartition) =>
          if (allPartitions.remove(topicPartition, hostedPartition)) {
            maybeRemoveTopicMetrics(topicPartition.topic)
            // this will delete the local log. This call may throw exception if the log is on offline directory
            removedPartition.delete() // 删除副本,log会被删除
          }

        case HostedPartition.None =>
          stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition " +
            s"$topicPartition as replica doesn't exist on broker")
      }

      // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
      // This could happen when topic is being deleted while broker is down and recovers.
      if (logManager.getLog(topicPartition).isDefined)
        logManager.asyncDelete(topicPartition)
      if (logManager.getLog(topicPartition, isFuture = true).isDefined)
        logManager.asyncDelete(topicPartition, isFuture = true)
    }

    // If we were the leader, we may have some operations still waiting for completion.
    // We force completion to prevent them from timing out.
    completeDelayedFetchOrProduceRequests(topicPartition)

    stateChangeLogger.trace(s"Finished handling stop replica (delete=$deletePartition) for partition $topicPartition")
  }  
  • ① 检查请求中的 controllerEpoch
  • ② 停止指定分区的同步操作
  • ③ 根据 deletePartition 的值 决定是否对log进行删除

ReplicaFetcherThread

用于 Follower 副本与 Leader 副本进行同步,继承于抽象的 AbstractFetcherManager 类,而 AbstractFetcherManager 继承于 Thread

//kafka.server.AbstractFetcherThread
override def doWork(): Unit = {
    //从leader获取 epochandoffset,更新本地副本fetchOffset,是否截断等
    maybeTruncate()
    //发送fetch 请求给 leader 副本进行同步,并处理结果
    maybeFetch()
  }

  • maybeTruncate:从leader获取 epochandoffset,更新本地副本fetchOffset,是否截断等
  • maybeFetch:发送 fetch 请求给 leader 副本进行同步,并处理结果
private def maybeFetch(): Unit = {
    val fetchRequestOpt = inLock(partitionMapLock) {
      val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)

      handlePartitionsWithErrors(partitionsWithError, "maybeFetch")

      if (fetchRequestOpt.isEmpty) {
        trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
      }

      fetchRequestOpt
    }

    fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
      processFetchRequest(sessionPartitions, fetchRequest)
    }
  }
//用于发送 fetchRequest 以及处理 FetchResponse 的
private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
                                  fetchRequest: FetchRequest.Builder): Unit = {
    val partitionsWithError = mutable.Set[TopicPartition]()
    var responseData: Map[TopicPartition, FetchData] = Map.empty

    try {
      trace(s"Sending fetch request $fetchRequest")
      //① 向leader 发送 fetch 请求
      responseData = fetchFromLeader(fetchRequest)
    } catch {
      case t: Throwable =>
        if (isRunning) {
          warn(s"Error in response for fetch request $fetchRequest", t)
          inLock(partitionMapLock) {
            partitionsWithError ++= partitionStates.partitionSet.asScala
            // there is an error occurred while fetching partitions, sleep a while
            // note that `ReplicaFetcherThread.handlePartitionsWithError` will also introduce the same delay for every
            // partition with error effectively doubling the delay. It would be good to improve this.
            partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
          }
        }
    }
    fetcherStats.requestRate.mark()

    //② 开始处理 FetchResponse
    if (responseData.nonEmpty) {
      // process fetched data
      inLock(partitionMapLock) {
        //遍历每个 topicPartition 对应的响应信息
        responseData.foreach { case (topicPartition, partitionData) =>
          Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>
            // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
            // In this case, we only want to process the fetch response if the partition state is ready for fetch and
            // the current offset is the same as the offset requested.
            val fetchPartitionData = sessionPartitions.get(topicPartition)
            //从发送 fetchRequest 到收到 fetchResponse 这段同步时间内, offset 并未发生变化
            if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
              partitionData.error match {
                case Errors.NONE =>
                  try {
                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                    // 处理从leader 副本获取的消息集合追加到 log 中
                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
                      partitionData)

                    logAppendInfoOpt.foreach { logAppendInfo =>
                      val validBytes = logAppendInfo.validBytes
                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag

                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
                        // Update partitionStates only if there is no exception during processPartitionData
                        val newFetchState = PartitionFetchState(nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching)
                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
                        fetcherStats.byteRate.mark(validBytes)
                      }
                    }
                  } catch {
                    case ime@( _: CorruptRecordException | _: InvalidRecordException) =>
                      // we log the error and continue. This ensures two things
                      // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread
                      //    down and cause other topic partition to also lag
                      // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes
                      //    can cause this), we simply continue and should get fixed in the subsequent fetches
                      error(s"Found invalid messages during fetch for partition $topicPartition " +
                        s"offset ${currentFetchState.fetchOffset}", ime)
                      partitionsWithError += topicPartition
                    case e: KafkaStorageException =>
                      error(s"Error while processing data for partition $topicPartition " +
                        s"at offset ${currentFetchState.fetchOffset}", e)
                      markPartitionFailed(topicPartition)
                    case t: Throwable =>
                      // stop monitoring this partition and add it to the set of failed partitions
                      error(s"Unexpected error occurred while processing data for partition $topicPartition " +
                        s"at offset ${currentFetchState.fetchOffset}", t)
                      markPartitionFailed(topicPartition)
                  }
                  /*
                    处理 follower副本请求的offset 超出了 leader 副本的 offset 范围,可能是超过leader 的 LEO,
                    也可能是小于 leader 的最小offset(startOffset),当发生"Unclean leader election" 时,可能出现第一种情况,
                    这种场景简单来说将不在 ISR 集合中的 follower 副本被选举成为 leader 副本,发生此场景的过程如下:
                    1.一个follower 副本发生宕机,而leader 副本不断接收来自生产者的消息并追加到log中,此时follower 副本因为宕机没有和leader副本进行同步
                    2.此follower 副本重新上线,在它与leader 完全同步之前,它没有资格进入 ISR 集合,假设isr集合中的follower 副本在此时全部宕机,只能选举此 follower 副本为新leader 副本
                    3.之后,旧leader 重新上线成为 follower 副本,此时就会出现 follower 副本的 LEO 超越了 leader 副本的 LEO 值的场景。
                   */
                case Errors.OFFSET_OUT_OF_RANGE =>
                  if (!handleOutOfRangeError(topicPartition, currentFetchState))
                    partitionsWithError += topicPartition

                case Errors.UNKNOWN_LEADER_EPOCH =>
                  debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
                    s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.")
                  partitionsWithError += topicPartition

                case Errors.FENCED_LEADER_EPOCH =>
                  onPartitionFenced(topicPartition)

                case Errors.NOT_LEADER_FOR_PARTITION =>
                  debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
                    "that the partition is being moved")
                  partitionsWithError += topicPartition

                case _ =>
                  error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}",
                    partitionData.error.exception)
                  partitionsWithError += topicPartition
              }
            }
          }
        }
      }
    }

    if (partitionsWithError.nonEmpty) {
      handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")
    }
  }      

在发送Fetch 请求给Leader 副本进行副本之间的同步会发生以下情况 处理 follower副本请求的offset 超出了 leader 副本的 offset 范围,可能是超过leader 的 LEO, 也可能是小于 leader 的最小offset(startOffset),当发生 "Unclean leader election" 时,可能出现第一种情况, 这种场景简单来说将不在 ISR 集合中的 follower 副本被选举成为 leader 副本,发生此场景的过程如下:

  1. 一个follower 副本发生宕机,而leader 副本不断接收来自生产者的消息并追加到log中,此时follower 副本因为宕机没有和leader副本进行同步
  2. 此follower 副本重新上线,在它与leader 完全同步之前,它没有资格进入 ISR 集合,假设isr集合中的follower 副本在此时全部宕机,只能选举此 follower 副本为新leader 副本
  3. 之后,旧leader 重新上线成为 follower 副本,此时就会出现 follower 副本的 LEO 超越了 leader 副本的 LEO 值的场景。
可以通过 unclean.leader.election.enable进行配置
  • 如果为true 的话,可以在非ISR集合的 Replica 选举一个副本作为Leader,也就是说允许一部分数据不正确来保证Kafka 的高可用(默认)
  • 如果设置为false, 只能在ISR集合中选举,选举出来才对外提供服务,保证一致性。

processPartitionData( )方法

副本主从同步过程中,processPartitionData()方法在处理正常结果情况下会调用 ReplicaFetcherThread.processPartitionData()对fetch回的结果进行处理。

//kafka.server.ReplicaFetcherThread
override def processPartitionData(topicPartition: TopicPartition,
                                    fetchOffset: Long,
                                    partitionData: FetchData): Option[LogAppendInfo] = {
    val partition = replicaMgr.nonOfflinePartition(topicPartition).get
    val log = partition.localLogOrException
    val records = toMemoryRecords(partitionData.records)

    maybeWarnIfOversizedRecords(records, topicPartition)

    //① 确保开始 fetchOffset 与现有分区log的 endOffset 一致才写入数据
    if (fetchOffset != log.logEndOffset)
      throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
        topicPartition, fetchOffset, log.logEndOffset))

    if (isTraceEnabled)
      trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
        .format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))

    // Append the leader's messages to the log  
    //② 作为follower将leader的同步结果append到本地副本log中
    val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)

    if (isTraceEnabled)
      trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
        .format(log.logEndOffset, records.sizeInBytes, topicPartition))
    val leaderLogStartOffset = partitionData.logStartOffset

    // For the follower replica, we do not need to keep its segment base offset and physical position.
    // These values will be computed upon becoming leader or handling a preferred read replica fetch.
    //③ 在本地副本的logEndOffset.messageOffset和返回结果partitionData.highWatermark中取较小值作为followerHighWatermark,来更新副本内存缓存的highWatermark
    val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark)
    //④ 如果leaderlogStartOffset<本地highWatermark.messageOffset,且newLogStartOffset > logStartOffset,则更新本地的log start offset
    log.maybeIncrementLogStartOffset(leaderLogStartOffset)
    if (isTraceEnabled)
      trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")

    // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
    // traffic doesn't exceed quota.
    if (quota.isThrottled(topicPartition))
      quota.record(records.sizeInBytes)

    if (partition.isReassigning && partition.isAddingLocalReplica)
      brokerTopicStats.updateReassignmentBytesIn(records.sizeInBytes)

    brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)

    logAppendInfo
  }
  • ① 确保开始 fetchOffset 与现有分区log的 endOffset 一致才写入数据
  • ② 作为 follower 将 leader 的同步结果 append 到本地副本 log 中
  • ③ 在本地副本的 logEndOffset.messageOffset 和返回结果 partitionData.highWatermark 中取较小值作为 followerHighWatermark,来更新副本内存缓存的highWatermark
  • ④ 如果 leaderlogStartOffset < 本地 highWatermark.messageOffset,且 newLogStartOffset > logStartOffset,则更新本地的log start offset