概述
Kafka 的 Replication Mechanism
是为了保证 Kafka 的高可用性,也就是说一个每个分区可以有多个副本,并且会其副本集合中(AR
) 选出一个副本作为 Leader
副本,所有的读写请求都由选举出来的 Leader
副本处理。剩余的副本都作为其 Follower
副本,Follower 副本会从 Leader 副本处获取消息并更新到自己的 Log中。只有当 Leader
副本宕机之后,才会从 Follower
副本中重新选举新的Leader 副本对外提供服务。
Kafka 的 Replication Mechanism
使用了微软的 PacificA 算法
源码分析
1. ReplicaManager
Replica 的管理是通过 ReplicaManager
这个类进行管理的。
class ReplicaManager(val config: KafkaConfig,
metrics: Metrics,
time: Time,
val zkClient: KafkaZkClient,
/*
定时器,四个定时任务:
1.shutdown-idle-replica-alter-log-dirs-thread
2.highwatermark-checkpoint
3.isr-expiration
4.isr-change-propagation
*/
scheduler: Scheduler,
val logManager: LogManager,// 对分区的读写操作都委托给底层的日志存储子系统
val isShuttingDown: AtomicBoolean,
quotaManagers: QuotaManagers,
val brokerTopicStats: BrokerTopicStats,//broker 状态
val metadataCache: MetadataCache,//元数据缓存
logDirFailureChannel: LogDirFailureChannel,
// 延迟操作
val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
val delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
}
scheduler
:定时器,有四个定时任务,下面会一一进行分析。logManager
:Log 管理,前面已经分析过了brokerTopicStats
:Broker 的状态metadataCache
:元数据缓存DelayedOperationPurgatory
: 延迟操作,前面已经分析过了
1.1 becomeLeaderOrFollower( )方法
使指定 replica 成为 Leader 或者 Follower
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
if (stateChangeLogger.isTraceEnabled) {
leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState " +
s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " +
s"epoch ${leaderAndIsrRequest.controllerEpoch}")
}
}
replicaStateChangeLock synchronized {//加锁
//① 检查 controller epoch,如果epoch过期,直接返回错误码
if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " +
s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
s"Latest known controller epoch is $controllerEpoch")
leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
} else {
//统计返回的错误码
val responseMap = new mutable.HashMap[TopicPartition, Errors]
val controllerId = leaderAndIsrRequest.controllerId
controllerEpoch = leaderAndIsrRequest.controllerEpoch
// First check partition's leader epoch
// 进行一些准备工作,统计进行切换需要使用的信息
val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
val newPartitions = new mutable.HashSet[Partition]
//② 根据 ReplicaManager 的缓存过滤出之前未保存过的topicPartition
leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
//获取partition对象,找不到就创建新的 partition 对象
val partitionOpt = getPartition(topicPartition) match {
case HostedPartition.Offline =>
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
"partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
None
case HostedPartition.Online(partition) => Some(partition)
case HostedPartition.None =>
val partition = Partition(topicPartition, time, this)
allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
newPartitions.add(partition)
Some(partition)
}
// ③ 逐个对 leaderAndIsrRequest 请求中的tp进行处理
partitionOpt.foreach { partition =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
//检查 leaderEpoch
if (requestLeaderEpoch > currentLeaderEpoch) {
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
//判断该分区的副本是否被分配到了当前的 broker
if (partitionState.replicas.contains(localBrokerId))
partitionStates.put(partition, partitionState)
else {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
} else if (requestLeaderEpoch < currentLeaderEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch is smaller than the current " +
s"leader epoch $currentLeaderEpoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
} else {
stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch matches the current leader epoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
}
}
}
// ④ 到这里 leaderAndIsrRequest 请求中的tp 已经经过过滤了,然后根据 partitionState 中指定的角色进行分类
val partitionsTobeLeader = partitionStates.filter { case (_, partitionState) =>
partitionState.leader == localBrokerId
}
val partitionsToBeFollower = partitionStates -- partitionsTobeLeader.keys
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
//⑤ 将指定分区的副本切换成leader 副本
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap,
highWatermarkCheckpoints)
else
Set.empty[Partition]
//⑥ 将指定分区的副本切换成follower 副本
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
highWatermarkCheckpoints)
else
Set.empty[Partition]
/*
* KAFKA-8392
* For topic partitions of which the broker is no longer a leader, delete metrics related to
* those topics. Note that this means the broker stops being either a replica or a leader of
* partitions of said topics
*/
val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
// remove metrics for brokers which are not followers of a topic
leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
//⑦ 处理offline partition
leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
/*
* If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
* before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
* In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
* we need to map this topic-partition to OfflinePartition instead.
*/
if (localLog(topicPartition).isEmpty)
markPartitionOffline(topicPartition)
}
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
//⑧ 第一次处理leaderAndIsrRequest 请求时初始化highwatermark线程,定期为各个partition写入highwatermark到highwatermark 文件中
startHighWatermarkCheckPointThread()
val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
for (partition <- newPartitions) {
val topicPartition = partition.topicPartition
if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
partition.log.foreach { log =>
val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
// Add future replica to partition's map
partition.createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true,
highWatermarkCheckpoints)
// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
// replica from source dir to destination dir
logManager.abortAndPauseCleaning(topicPartition)
futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader,
partition.getLeaderEpoch, log.highWatermark))
}
}
}
replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
//⑨ 关闭空闲的 fetcher 线程
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
//⑩ 回调函数
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
val responsePartitions = responseMap.iterator.map { case (tp, error) =>
new LeaderAndIsrPartitionError()
.setTopicName(tp.topic)
.setPartitionIndex(tp.partition)
.setErrorCode(error.code)
}.toBuffer
new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
.setPartitionErrors(responsePartitions.asJava))
}
}
}
-
① 检查
controller epoch
,如果epoch过期,直接返回错误码 -
② 根据
ReplicaManager
的缓存过滤出之前未保存过的 topicPartition -
③ 逐个对
leaderAndIsrRequest
请求中的tp进行处理 -
④ 到这里 leaderAndIsrRequest 请求中的tp 已经经过过滤了,然后根据
partitionState
中指定的角色进行分类 -
⑤ 将指定分区的副本切换成
leader
副本private def makeLeaders(controllerId: Int, controllerEpoch: Int, partitionStates: Map[Partition, LeaderAndIsrPartitionState], correlationId: Int, responseMap: mutable.Map[TopicPartition, Errors], highWatermarkCheckpoints: OffsetCheckpoints): Set[Partition] = { partitionStates.keys.foreach { partition => stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " + s"controller $controllerId epoch $controllerEpoch starting the become-leader transition for " + s"partition ${partition.topicPartition}") } for (partition <- partitionStates.keys) responseMap.put(partition.topicPartition, Errors.NONE) val partitionsToMakeLeaders = mutable.Set[Partition]() try { // First stop fetchers for all the partitions // ① 停止 partition 对应的 fetch线程 replicaFetcherManager.removeFetcherForPartitions(partitionStates.keySet.map(_.topicPartition)) // Update the partition information to be the leader partitionStates.foreach { case (partition, partitionState) => try { /* ② 调用 partition.makeLeader 方法使得当前 replica 成为 leader 如果makeLeader方法中,更新isr队列,在allReplicasMap缓存中移除那些被controller删除的副本brokerId 并且将follower副本逐个设置lastFetchLeaderLogEndOffset,lastFetchTimeMs,_lastCaughtUpTimeMs等offset变量 如果本leader副本为新的leader副本,则为新leader副本创建highWatermarkMetadata 并更新lastfetch信息 makeLeader返回结果为是否是新leader */ if (partition.makeLeader(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) { partitionsToMakeLeaders += partition stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " + s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionState.controllerEpoch})") } else stateChangeLogger.info(s"Skipped the become-leader state change after marking its " + s"partition as leader with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + s"partition ${partition.topicPartition} (last update controller epoch ${partitionState.controllerEpoch}) " + s"since it is already the leader for the partition.") } catch { case e: KafkaStorageException => stateChangeLogger.error(s"Skipped the become-leader state change with " + s"correlation id $correlationId from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionState.controllerEpoch}) since " + s"the replica for the partition is offline due to disk error $e") val dirOpt = getLogDir(partition.topicPartition) error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e) responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR) } } } catch { case e: Throwable => partitionStates.keys.foreach { partition => stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " + s"from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition}", e) } // Re-throw the exception for it to be caught in KafkaApis throw e } partitionStates.keys.foreach { partition => stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " + s"epoch $controllerEpoch for the become-leader transition for partition ${partition.topicPartition}") } partitionsToMakeLeaders }
- ① 停止 partition 对应的 fetch线程
- ② 调用
partition.makeLeader
方法使得当前 replica 成为 leader如果makeLeader方法中,更新isr队列,在allReplicasMap缓存中移除那些被controller删除的副本brokerId 并且将follower副本逐个设置
lastFetchLeaderLogEndOffset
,lastFetchTimeMs
,_lastCaughtUpTimeMs
等offset变量 如果本leader副本为新的leader副本,则为新leader副本创建highWatermarkMetadata
并更新lastfetch信息 makeLeader返回结果为是否是新leader
-
⑥ 将指定分区的副本切换成
follower
副本private def makeFollowers(controllerId: Int, controllerEpoch: Int, partitionStates: Map[Partition, LeaderAndIsrPartitionState], correlationId: Int, responseMap: mutable.Map[TopicPartition, Errors], highWatermarkCheckpoints: OffsetCheckpoints) : Set[Partition] = { partitionStates.foreach { case (partition, partitionState) => stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + s"${partitionState.leader}") } for (partition <- partitionStates.keys) responseMap.put(partition.topicPartition, Errors.NONE) val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() try { // TODO: Delete leaders from LeaderAndIsrRequest partitionStates.foreach { case (partition, partitionState) => val newLeaderBrokerId = partitionState.leader try { //① 进行判断,只有当 leader 时可用状态才能转变成follower metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { // Only change partition state when the leader is available case Some(_) => //② 通过partition.makeFollower()方法进行设置leader,清空isr的工作,如果缓存中的leaderReplicaIdOpt与本次发送过来请求中一致,则makeFollower返回false,否则更新leaderReplicaIdOpt并返回true if (partition.makeFollower(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) partitionsToMakeFollower += partition else stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " + s"follower with correlation id $correlationId from controller $controllerId epoch $controllerEpoch " + s"for partition ${partition.topicPartition} (last update " + s"controller epoch ${partitionState.controllerEpoch}) " + s"since the new leader $newLeaderBrokerId is the same as the old leader") case None => // The leader broker should always be present in the metadata cache. // If not, we should record the error message and abort the transition process for this partition stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionState.controllerEpoch}) " + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") // Create the local replica even if the leader is unavailable. This is required to ensure that we include // the partition's high watermark in the checkpoint file (see KAFKA-1647) partition.createLogIfNotExists(localBrokerId, isNew = partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) } } catch { case e: KafkaStorageException => stateChangeLogger.error(s"Skipped the become-follower state change with correlation id $correlationId from " + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionState.controllerEpoch}) with leader " + s"$newLeaderBrokerId since the replica for the partition is offline due to disk error $e") val dirOpt = getLogDir(partition.topicPartition) error(s"Error while making broker the follower for partition $partition with leader " + s"$newLeaderBrokerId in dir $dirOpt", e) responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR) } } //③ 通过replicaFetcherManager将那些leader有变化的副本的同步线程移除,后面会重新添加对应的同步线程 replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition)) partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " + s"epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " + s"${partitionStates(partition).leader}") } //④ 尝试完成 producer 和 fetch 的延迟任务 partitionsToMakeFollower.foreach { partition => completeDelayedFetchOrProduceRequests(partition.topicPartition) } partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " + s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " + s"controller $controllerId epoch $controllerEpoch with leader ${partitionStates(partition).leader}") } if (isShuttingDown.get()) { partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " + "since it is shutting down") } } else { // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition => val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get .brokerEndPoint(config.interBrokerListenerName) val fetchOffset = partition.localLogOrException.highWatermark partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset) }.toMap //⑤ 添加fetcher 线程 用于进行副本之间的同步,由follower 副本 向leader 副本进行同步 replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) partitionsToMakeFollowerWithLeaderAndOffset.foreach { case (partition, initialFetchState) => stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " + s"request from controller $controllerId epoch $controllerEpoch with correlation id $correlationId for " + s"partition $partition with leader ${initialFetchState.leader}") } } } catch { case e: Throwable => stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " + s"received from controller $controllerId epoch $controllerEpoch", e) // Re-throw the exception for it to be caught in KafkaApis throw e } partitionStates.keys.foreach { partition => stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " + s"epoch $controllerEpoch for the become-follower transition for partition ${partition.topicPartition} with leader " + s"${partitionStates(partition).leader}") } partitionsToMakeFollower }
- ① 进行判断,只有当 leader 时可用状态才能转变成follower
- ② 通过
partition.makeFollower()
方法进行设置leader,清空isr的工作,如果缓存中的leaderReplicaIdOpt与本次发送过来请求中一致,则makeFollower返回false,否则更新leaderReplicaIdOpt并返回true - ③ 通过
replicaFetcherManager
将那些leader有变化的副本的同步线程移除,后面会重新添加对应的同步线程 - ④ 尝试完成 producer 和 fetch 的延迟任务
- ⑤ 添加fetcher 线程 用于进行副本之间的同步,由follower 副本 向leader 副本进行同步
-
⑦ 处理
offline partition
-
⑧ 第一次处理 leaderAndIsrRequest 请求时初始化 highwatermark 线程,定期为各个 partition 写入 highwatermark 到 highwatermark 文件中
-
⑨ 关闭空闲的 fetcher 线程
-
⑩ 回调函数
1.2 stopReplicas( ) 方法
关闭指定分区下的 replica
def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors) = {
replicaStateChangeLock synchronized {
val responseMap = new collection.mutable.HashMap[TopicPartition, Errors]
//① 检查请求中的 controllerEpoch 值
if (stopReplicaRequest.controllerEpoch() < controllerEpoch) {
stateChangeLogger.warn("Received stop replica request from an old controller epoch " +
s"${stopReplicaRequest.controllerEpoch}. Latest known controller epoch is $controllerEpoch")
(responseMap, Errors.STALE_CONTROLLER_EPOCH)
} else {
val partitions = stopReplicaRequest.partitions.asScala.toSet
controllerEpoch = stopReplicaRequest.controllerEpoch
// First stop fetchers for all partitions, then stop the corresponding replicas
replicaFetcherManager.removeFetcherForPartitions(partitions)
replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
for (topicPartition <- partitions){
try {
// ② 停止指定分区的同步操作
stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
responseMap.put(topicPartition, Errors.NONE)
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(s"Ignoring stop replica (delete=${stopReplicaRequest.deletePartitions}) for " +
s"partition $topicPartition due to storage exception", e)
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
}
}
(responseMap, Errors.NONE)
}
}
}
def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean) = {
stateChangeLogger.trace(s"Handling stop replica (delete=$deletePartition) for partition $topicPartition")
//③ 根据 deletePartition 的值 决定是否对log进行删除
if (deletePartition) {
getPartition(topicPartition) match {
case HostedPartition.Offline =>
throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk")
case hostedPartition @ HostedPartition.Online(removedPartition) =>
if (allPartitions.remove(topicPartition, hostedPartition)) {
maybeRemoveTopicMetrics(topicPartition.topic)
// this will delete the local log. This call may throw exception if the log is on offline directory
removedPartition.delete() // 删除副本,log会被删除
}
case HostedPartition.None =>
stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition " +
s"$topicPartition as replica doesn't exist on broker")
}
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
// This could happen when topic is being deleted while broker is down and recovers.
if (logManager.getLog(topicPartition).isDefined)
logManager.asyncDelete(topicPartition)
if (logManager.getLog(topicPartition, isFuture = true).isDefined)
logManager.asyncDelete(topicPartition, isFuture = true)
}
// If we were the leader, we may have some operations still waiting for completion.
// We force completion to prevent them from timing out.
completeDelayedFetchOrProduceRequests(topicPartition)
stateChangeLogger.trace(s"Finished handling stop replica (delete=$deletePartition) for partition $topicPartition")
}
- ① 检查请求中的
controllerEpoch
值 - ② 停止指定分区的同步操作
- ③ 根据
deletePartition
的值 决定是否对log进行删除
ReplicaFetcherThread
用于 Follower 副本与 Leader 副本进行同步,继承于抽象的 AbstractFetcherManager
类,而 AbstractFetcherManager
继承于 Thread
//kafka.server.AbstractFetcherThread
override def doWork(): Unit = {
//从leader获取 epochandoffset,更新本地副本fetchOffset,是否截断等
maybeTruncate()
//发送fetch 请求给 leader 副本进行同步,并处理结果
maybeFetch()
}
maybeTruncate
:从leader获取 epochandoffset,更新本地副本fetchOffset,是否截断等maybeFetch
:发送 fetch 请求给 leader 副本进行同步,并处理结果
private def maybeFetch(): Unit = {
val fetchRequestOpt = inLock(partitionMapLock) {
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
if (fetchRequestOpt.isEmpty) {
trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
fetchRequestOpt
}
fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
processFetchRequest(sessionPartitions, fetchRequest)
}
}
//用于发送 fetchRequest 以及处理 FetchResponse 的
private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]()
var responseData: Map[TopicPartition, FetchData] = Map.empty
try {
trace(s"Sending fetch request $fetchRequest")
//① 向leader 发送 fetch 请求
responseData = fetchFromLeader(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning) {
warn(s"Error in response for fetch request $fetchRequest", t)
inLock(partitionMapLock) {
partitionsWithError ++= partitionStates.partitionSet.asScala
// there is an error occurred while fetching partitions, sleep a while
// note that `ReplicaFetcherThread.handlePartitionsWithError` will also introduce the same delay for every
// partition with error effectively doubling the delay. It would be good to improve this.
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
}
}
fetcherStats.requestRate.mark()
//② 开始处理 FetchResponse
if (responseData.nonEmpty) {
// process fetched data
inLock(partitionMapLock) {
//遍历每个 topicPartition 对应的响应信息
responseData.foreach { case (topicPartition, partitionData) =>
Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
// the current offset is the same as the offset requested.
val fetchPartitionData = sessionPartitions.get(topicPartition)
//从发送 fetchRequest 到收到 fetchResponse 这段同步时间内, offset 并未发生变化
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
partitionData.error match {
case Errors.NONE =>
try {
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
// 处理从leader 副本获取的消息集合追加到 log 中
val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
partitionData)
logAppendInfoOpt.foreach { logAppendInfo =>
val validBytes = logAppendInfo.validBytes
val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
fetcherStats.byteRate.mark(validBytes)
}
}
} catch {
case ime@( _: CorruptRecordException | _: InvalidRecordException) =>
// we log the error and continue. This ensures two things
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread
// down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes
// can cause this), we simply continue and should get fixed in the subsequent fetches
error(s"Found invalid messages during fetch for partition $topicPartition " +
s"offset ${currentFetchState.fetchOffset}", ime)
partitionsWithError += topicPartition
case e: KafkaStorageException =>
error(s"Error while processing data for partition $topicPartition " +
s"at offset ${currentFetchState.fetchOffset}", e)
markPartitionFailed(topicPartition)
case t: Throwable =>
// stop monitoring this partition and add it to the set of failed partitions
error(s"Unexpected error occurred while processing data for partition $topicPartition " +
s"at offset ${currentFetchState.fetchOffset}", t)
markPartitionFailed(topicPartition)
}
/*
处理 follower副本请求的offset 超出了 leader 副本的 offset 范围,可能是超过leader 的 LEO,
也可能是小于 leader 的最小offset(startOffset),当发生"Unclean leader election" 时,可能出现第一种情况,
这种场景简单来说将不在 ISR 集合中的 follower 副本被选举成为 leader 副本,发生此场景的过程如下:
1.一个follower 副本发生宕机,而leader 副本不断接收来自生产者的消息并追加到log中,此时follower 副本因为宕机没有和leader副本进行同步
2.此follower 副本重新上线,在它与leader 完全同步之前,它没有资格进入 ISR 集合,假设isr集合中的follower 副本在此时全部宕机,只能选举此 follower 副本为新leader 副本
3.之后,旧leader 重新上线成为 follower 副本,此时就会出现 follower 副本的 LEO 超越了 leader 副本的 LEO 值的场景。
*/
case Errors.OFFSET_OUT_OF_RANGE =>
if (!handleOutOfRangeError(topicPartition, currentFetchState))
partitionsWithError += topicPartition
case Errors.UNKNOWN_LEADER_EPOCH =>
debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.")
partitionsWithError += topicPartition
case Errors.FENCED_LEADER_EPOCH =>
onPartitionFenced(topicPartition)
case Errors.NOT_LEADER_FOR_PARTITION =>
debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
"that the partition is being moved")
partitionsWithError += topicPartition
case _ =>
error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}",
partitionData.error.exception)
partitionsWithError += topicPartition
}
}
}
}
}
}
if (partitionsWithError.nonEmpty) {
handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")
}
}
在发送Fetch 请求给Leader 副本进行副本之间的同步会发生以下情况
处理 follower副本请求的offset 超出了 leader 副本的 offset 范围,可能是超过leader 的 LEO,
也可能是小于 leader 的最小offset(startOffset),当发生 "Unclean leader election"
时,可能出现第一种情况,
这种场景简单来说将不在 ISR 集合中的 follower 副本被选举成为 leader 副本,发生此场景的过程如下:
- 一个follower 副本发生宕机,而leader 副本不断接收来自生产者的消息并追加到log中,此时follower 副本因为宕机没有和leader副本进行同步
- 此follower 副本重新上线,在它与leader 完全同步之前,它没有资格进入 ISR 集合,假设isr集合中的follower 副本在此时全部宕机,只能选举此 follower 副本为新leader 副本
- 之后,旧leader 重新上线成为 follower 副本,此时就会出现 follower 副本的 LEO 超越了 leader 副本的 LEO 值的场景。
可以通过 unclean.leader.election.enable
进行配置
- 如果为true 的话,可以在非ISR集合的 Replica 选举一个副本作为Leader,也就是说允许一部分数据不正确来保证Kafka 的高可用(默认)
- 如果设置为false, 只能在ISR集合中选举,选举出来才对外提供服务,保证一致性。
processPartitionData( )方法
副本主从同步过程中,processPartitionData()
方法在处理正常结果情况下会调用 ReplicaFetcherThread.processPartitionData()
对fetch回的结果进行处理。
//kafka.server.ReplicaFetcherThread
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: FetchData): Option[LogAppendInfo] = {
val partition = replicaMgr.nonOfflinePartition(topicPartition).get
val log = partition.localLogOrException
val records = toMemoryRecords(partitionData.records)
maybeWarnIfOversizedRecords(records, topicPartition)
//① 确保开始 fetchOffset 与现有分区log的 endOffset 一致才写入数据
if (fetchOffset != log.logEndOffset)
throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, log.logEndOffset))
if (isTraceEnabled)
trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
.format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
// Append the leader's messages to the log
//② 作为follower将leader的同步结果append到本地副本log中
val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
if (isTraceEnabled)
trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
.format(log.logEndOffset, records.sizeInBytes, topicPartition))
val leaderLogStartOffset = partitionData.logStartOffset
// For the follower replica, we do not need to keep its segment base offset and physical position.
// These values will be computed upon becoming leader or handling a preferred read replica fetch.
//③ 在本地副本的logEndOffset.messageOffset和返回结果partitionData.highWatermark中取较小值作为followerHighWatermark,来更新副本内存缓存的highWatermark
val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark)
//④ 如果leaderlogStartOffset<本地highWatermark.messageOffset,且newLogStartOffset > logStartOffset,则更新本地的log start offset
log.maybeIncrementLogStartOffset(leaderLogStartOffset)
if (isTraceEnabled)
trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
// Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
// traffic doesn't exceed quota.
if (quota.isThrottled(topicPartition))
quota.record(records.sizeInBytes)
if (partition.isReassigning && partition.isAddingLocalReplica)
brokerTopicStats.updateReassignmentBytesIn(records.sizeInBytes)
brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
logAppendInfo
}
- ① 确保开始
fetchOffset
与现有分区log的endOffset
一致才写入数据 - ② 作为 follower 将 leader 的同步结果 append 到本地副本 log 中
- ③ 在本地副本的
logEndOffset.messageOffset
和返回结果partitionData.highWatermark
中取较小值作为followerHighWatermark
,来更新副本内存缓存的highWatermark - ④ 如果
leaderlogStartOffset
< 本地highWatermark.messageOffset
,且 newLogStartOffset > logStartOffset,则更新本地的log start offset