Kafka 日志模块(四)LogManager

2020年5月5日 | 作者 Siran | 9900字 | 阅读大约需要20分钟
归档于 消息队列 | 标签 #kafka

1. 概述

Kafka 服务端(三)kafka-server-start.sh 一文中讲述了我们启动Kafka之后,Kafka 每个模块初始化的入口类。

Kafka 的所有的消息都是通过日志来存储的,它是通过 LogManager 来进行初始化的,Log 类是真正操作日志的,LogManager 是用来管理 Log 的。

LogManager 提供了创建、恢复、删除、查询 Log集合等功能,所有的读写操作都委托给 Log 去完成,并且在LogManager 初始化的时候会启动五个定时任务以及启动一个后台线程 Cleaner


2. 源码解析

在Kafka 启动初始化的时候调用 LogManager#startup 进行初始化日志模块

def startup(): Unit = {
    //...省略
    /* start log manager */
    logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
    logManager.startup()
    //...省略
}

2.1 LogManager

LogManager 类中的参数与属性

class LogManager(logDirs: Seq[File],
                 initialOfflineDirs: Seq[File],
                 val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation
                 val initialDefaultConfig: LogConfig,
                 val cleanerConfig: CleanerConfig,
                 recoveryThreadsPerDataDir: Int,
                 val flushCheckMs: Long,
                 val flushRecoveryOffsetCheckpointMs: Long,
                 val flushStartOffsetCheckpointMs: Long,
                 val retentionCheckMs: Long,
                 val maxPidExpirationMs: Int,
                 scheduler: Scheduler,
                 val brokerState: BrokerState,
                 brokerTopicStats: BrokerTopicStats,
                 logDirFailureChannel: LogDirFailureChannel,
                 time: Time) extends Logging with KafkaMetricsGroup {

  val LockFile = ".lock"
  val InitialTaskDelayMs = 30 * 1000

  private val logCreationOrDeletionLock = new Object
  private val currentLogs = new Pool[TopicPartition, Log]()
  private val futureLogs = new Pool[TopicPartition, Log]()
  private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()

  private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
  @volatile private var _currentDefaultConfig = initialDefaultConfig
  @volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir

  private[log] val partitionsInitializing = new ConcurrentHashMap[TopicPartition, Boolean]().asScala

  def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
    this._currentDefaultConfig = logConfig
  }

  def currentDefaultConfig: LogConfig = _currentDefaultConfig

  def liveLogDirs: Seq[File] = {
    if (_liveLogDirs.size == logDirs.size)
      logDirs
    else
      _liveLogDirs.asScala.toBuffer
  }

  //dirLocks:FileLock 集合,这些FileLock 用来在文件系统层面为每个Log 目录加文件锁。在LogManager 对象初始化时,就会将所有的log目录加锁
  private val dirLocks = lockLogDirs(liveLogDirs)
  //Map(File,OffsetCheckpointFile) 类型,用于管理每个log目录与其下的RecoveryPointCheckPoint文件之间的映射关系,在LogManager 对象初始化时,
  //会在每个目录下创建一个对应的RecoveryPointCheckpoint 文件。此map 的value 是offsetCheckpoint对象。其中封装了对应log目录下的RecoveryPointCheckpoint文件,
  //并提供对RecoveryPointCheckpoint 文件的读写操作。Recovery文件中则记录了该log目录下的所有Log的recoveryPoint。
  @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
    (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
  @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
    (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap

  private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()

  private def offlineLogDirs: Iterable[File] = {
    val logDirsSet = mutable.Set[File]() ++= logDirs
    _liveLogDirs.asScala.foreach(logDirsSet -=)
    logDirsSet
  }

  loadLogs()

  private[kafka] val cleaner: LogCleaner =
    if (cleanerConfig.enableCleaner)
      new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
    else
      null

}
  • logDirs:log 目录集合,在server.properties 配置文件中通过log.dirs 项指定的多个目录。每个目录下可以创建多个Log,每个log 都有自己对应的目录,LogManager 在创建Log时会选择Log最少的log目录创建Log

  • initialOfflineDirs

  • topicConfigs:topic的一些信息,从zk中读取。Kafka 目前元数据存取以及KafkaController选举都是通过zk的,但是kafka已经有计划替换掉zk了,详情可以查看此连接 KIP-500

    val (topicConfigs, failed) = zkClient.getLogConfigs(
        zkClient.getAllTopicsInCluster,
        defaultProps
      )
    
  • initialDefaultConfig:Kafka关于 Log 的一些配置比如:分段大小,flush间隔时间,日志清理的配置等。

    val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
    LogConfig.validateValues(defaultProps)
    val defaultLogConfig = LogConfig(defaultProps)
    
  • cleanerConfig:cleaner 线程的配置

    case class CleanerConfig(numThreads: Int = 1, //启动多少个 cleaner 线程
                           dedupeBufferSize: Long = 4*1024*1024L, //用于日志重复数据删除的总内存 4mb
                           dedupeBufferLoadFactor: Double = 0.9d,//重复数据删除缓冲区的最大已用百分比
                           ioBufferSize: Int = 1024*1024,
                           maxMessageSize: Int = 32*1024*1024, //日志中可以显示的消息的最大大小
                           maxIoBytesPerSecond: Double = Double.MaxValue, //允许所有 cleaner 线程执行的最大读写I / O
                           backOffMs: Long = 15 * 1000, //如果没有可清除的日志则等待的时间
                           enableCleaner: Boolean = true, //是否开启cleaner 线程
                           hashAlgorithm: String = "MD5") { //key 比较的算法
    }
    
    • numThreads:启动多少个 cleaner 线程
    • dedupeBufferSize:用于日志重复数据删除的总内存 4mb
    • dedupeBufferLoadFactor:重复数据删除缓冲区的最大已用百分比
    • maxMessageSize:日志中可以显示的消息的最大大小
    • maxIoBytesPerSecond:允许所有 cleaner 线程执行的最大读写I / O
    • backOffMs:如果没有可清除的日志则等待的时间
    • enableCleaner:是否开启cleaner 线程
    • hashAlgorithm:key 比较的算法
  • recoveryThreadsPerDataDir:在Kafka启动的时候处理日志恢复,关闭的时候处理日志flush的线程数量。默认为1个,可以通过num.recovery.threads.per.data.dir进行配置。

  • flushCheckMs:从内存刷新到磁盘的间隔时间,通过 log.flush.scheduler.interval.ms 进行配置,默认 Long.MaxValue

  • flushRecoveryOffsetCheckpointMs:更新日志恢复点的频率,通过 log.flush.offset.checkpoint.interval.ms 进行配置,默认 60000ms

  • flushStartOffsetCheckpointMs:更新日志Log Start Offset 的频率,通过log.flush.start.offset.checkpoint.interval.ms 进行配置,默认60000ms

  • retentionCheckMs:检查日志是否有过期的频率,通过 log.retention.check.interval.ms 进行配置,默认 5 * 60 * 1000Lms

  • maxPidExpirationMs:transactional id 过期的时间,通过 transactional.id.expiration.ms 进行配置,默认7天

  • scheduler:定时器,用的是Java中的 ScheduledThreadPoolExecutor 对象

  • brokerState:broker的状态,有 NotRunningStartingRecoveringFromUncleanShutdownRunningAsBrokerPendingControlledShutdownBrokerShuttingDown 这几种状态。

  • brokerTopicStats:broker中topic 的状态

  • logDirFailureChannel:访问日志目录失败的,会加入此对象中的 offlineLogDirsofflineLogDirQueue

    class LogDirFailureChannel(logDirNum: Int) extends Logging {
         private val offlineLogDirs = new ConcurrentHashMap[String, String]
         private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum)
    }   
    
  • InitialTaskDelayMs:定时任务启动的延迟时间,30 * 1000ms

  • logCreationOrDeletionLock:锁

  • currentLogs:用于管理 TopicAndPartitionLog 之间的对应关系。使用的是Kafka自定义的 pool 类型对象,底层是jdk 提供的 ConcurrentHashMap

    private val currentLogs = new Pool[TopicPartition, Log]()
    
    class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
      private val pool: ConcurrentMap[K, V] = new ConcurrentHashMap[K, V]
    }
    
  • futureLogs:用于存放 -future 后缀的日志,当用户在想在相同的broker 移动到另外一个replica 中的时候会创建-future 的日志,使用的是Kafka自定义的 pool 类型对象,底层是jdk 提供的 ConcurrentHashMap

  • logsToBeDeleted: 需要被删除的日志的集合,使用jdk 提供的 LinkedBlockingQueue

  • partitionsInitializing:用于解决 KAFKA-8813 这个 bug

    大致的意思是

    Topic-Partition 下的 log 初始化有以下几个步骤

    1. 从 zk 中拉取 log 的配置
    2. 调用 LogManager.getOrCreateLog 方法创建 log 对象
    3. 注册 log 对象

    但是如果在第二步创建完 log 对象之后,如果用户修改了配置,那么这个配置将无法更新。

    所以通过在 LogManager 初始化的时候,在 partitionsInitializing 集合中,插入 Topic-Partition ,false。

    如果有人修改了配置,那么该 Topic-Partition 的值设置为 true

    在初始化完结束后检查是否为 true 如果是的话,重新从 zk 中拉取配置,重新创建 log 对象

  • _liveLogDirs:创建或者获取 log.dirs 项指定的多个目录,要求:没有重复,都是可读的,通过 ConcurrentLinkedQueue 存储

  • dirLocks:FileLock 集合,这些FileLock 用来在文件系统层面为每个Log 目录加文件锁。在LogManager 对象初始化时,就会将所有的log目录加锁

  • recoveryPointCheckpoints: 用于管理每个log目录与其下的 RecoveryPointCheckPoint 文件之间的映射关系,Map(File,OffsetCheckpointFile) 类型。

    LogManager 对象初始化时,会在每个目录下创建一个对应的 RecoveryPointCheckpoint 文件。此map 的value 是offsetCheckpoint 对象。其中封装了对应log目录下的 RecoveryPointCheckpoint文件,并提供对 RecoveryPointCheckpoint 文件的读写操作。Recovery文件中则记录了该log目录下的所有Log的 recoveryPoint

  • logStartOffsetCheckpoints:用于管理每个log目录与其下的 LogStartOffsetCheckpoint 文件之间的映射关系,Map(File,OffsetCheckpointFile) 类型。

  • preferredLogDirs:用于管理每个log目录与其下的 preferredLog 之间的映射关系

  • cleaner:cleaner 线程


2.2 startup( )

def startup(): Unit = {
    /* Schedule the cleanup task to delete old logs */
    //在KafkaServer 中初始化的 Schedule 传入,开定时的清理log的任务
    if (scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention", //LogSegment 清理任务
                         cleanupLogs _, //核心方法
                         delay = InitialTaskDelayMs, // 30 * 1000  启动延迟时间30秒
                         period = retentionCheckMs, // 每次检查过期 log 的间隔时间  通过 log.retention.check.interval.ms 配置 默认300000毫秒也就是每5分钟检查一次。
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher", //定时 flush log 到磁盘
                         flushDirtyLogs _, //核心方法
                         delay = InitialTaskDelayMs,
                         period = flushCheckMs,//检查时间 通过log.flush.scheduler.interval.ms 配置 默认 Long.maxValue 2的63次方 -1.
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint", //定时将recovery-point 写入到  kafka-recovery-point-checkpoint文件中。
                         checkpointLogRecoveryOffsets _, //核心方法
                         delay = InitialTaskDelayMs,
                         period = flushRecoveryOffsetCheckpointMs, //间隔时间 通过log.flush.offset.checkpoint.interval.ms 配置,默认60000 毫秒
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-log-start-offset-checkpoint",  //log start offset 的检查任务(它用来标识日志的起始偏移量。各个副本在变动 LEO 和 HW的过程中,logStartOffset 也有可能随之而动,Kafka 也有一个定时任务来负责将所有分区的logStartOffset书写到起始点文件log-start-offset-checkpoint中)
                         checkpointLogStartOffsets _,//核心方法
                         delay = InitialTaskDelayMs,
                         period = flushStartOffsetCheckpointMs, //间隔时间  通过 log.flush.start.offset.checkpoint.interval.ms配置,默认60000 毫秒
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
                         deleteLogs _,//核心方法
                         delay = InitialTaskDelayMs,
                         unit = TimeUnit.MILLISECONDS)
    }
    if (cleanerConfig.enableCleaner)
      cleaner.startup()  //启动cleaner 线程
  }

LogManager 初始化方法就启动了5个定时任务和一个cleaner 线程

  • kafka-log-retention :清理过期的 LogSegment 任务
  • kafka-log-flusher:定时 flush log 到磁盘
  • kafka-recovery-point-checkpoint:定时将recovery-point 写入到 kafka-recovery-point-checkpoint文件
  • kafka-log-start-offset-checkpointlog start offset 的检查任务(它用来标识日志的起始偏移量。各个副本在变动 LEO 和 HW的过程中,logStartOffset 也有可能随之而动,Kafka 也有一个定时任务来负责将所有分区的logStartOffset书写到起始点文件
  • kafka-delete-logs:定时把.delete后缀的日志删除
  • cleaner 线程:用于压缩日志

2.3 定时任务 Kafka-log-retention

清理过期的 LogSegment 任务

def cleanupLogs(): Unit = {
    debug("Beginning log cleanup...")
    var total = 0
    val startMs = time.milliseconds

    // clean current logs.
    val deletableLogs = {
      if (cleaner != null) {
        // prevent cleaner from working on same partitions when changing cleanup policy
        //① 先中断当前 topic-partition 下正在进行执行cleaner的线程
        cleaner.pauseCleaningForNonCompactedPartitions()
      } else {
        //② 过滤掉 cleanup.policy 配置的不是 delete 的 log
        currentLogs.filter {
          case (_, log) => !log.config.compact
        }
      }
    }

    //到这里了 deletableLogs 中log的cleanup.policy 配置是delete,开始进行删除
    try {
      deletableLogs.foreach {
        case (topicPartition, log) =>
          debug(s"Garbage collecting '${log.name}'")
          //③ 委托给 log.deleteOldSegments() 方法进行删除过期的 `logSegment`
          total += log.deleteOldSegments()

          val futureLog = futureLogs.get(topicPartition)
          if (futureLog != null) {
            // clean future logs
            debug(s"Garbage collecting future log '${futureLog.name}'")
            total += futureLog.deleteOldSegments()
          }
      }
    } finally {
      //④ 如果在第①步中中断了cleaner 线程,则在这里进行恢复。也就是说在topic-partition的同一时刻 只有一个cleaner 对其进行清理
      if (cleaner != null) {
        cleaner.resumeCleaning(deletableLogs.map(_._1))
      }
    }

    debug(s"Log cleanup completed. $total files deleted in " +
                  (time.milliseconds - startMs) / 1000 + " seconds")
  }
  • ① 先中断当前 topic-partition 下正在进行执行 cleaner 的线程
  • ② 过滤掉 cleanup.policy 配置的不是 delete 的 log
  • ③ 委托给 log.deleteOldSegments() 方法进行删除过期的 logSegment
  • ④ 如果在第步中中断了cleaner 线程,则在这里进行恢复。也就是说在topic-partition的同一时刻 只有一个cleaner 对其进行清理

2.4 定时任务 kafka-log-flusher

定时 flush log 到磁盘任务

private def flushDirtyLogs(): Unit = {
    debug("Checking for dirty logs to flush...")

    //① 遍历 currentLogs 和  futureLogs 集合
    for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
      try {
        //② 计算上一次flush 的时间与当前时间做比较
        val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
        debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
              s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
        //③ 如果满足 flush.ms 配置的时间,则调用flush 方法 刷新到磁盘上
        //会把 recoverPoint ~ LEO 之间的消息数据刷新到磁盘上,并修改recoverPoint 值
        if(timeSinceLastFlush >= log.config.flushMs)
          log.flush
      } catch {
        case e: Throwable =>
          error(s"Error flushing topic ${topicPartition.topic}", e)
      }
    }
  }
}
  • ① 遍历 currentLogsfutureLogs 集合
  • ② 计算上一次flush 的时间与当前时间做比较
  • ③ 如果满足 flush.ms 配置的时间,则调用 flush 方法 刷新到磁盘上,会把 recoverPoint ~ LEO 之间的消息数据刷新到磁盘上,并修改recoverPoint

2.5 定时任务 kafka-recovery-point-checkpoint

定时将recovery-point 写入到 kafka-recovery-point-checkpoint文件

def checkpointLogRecoveryOffsets(): Unit = {
    logsByDir.foreach { case (dir, partitionToLogMap) =>
      liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f =>
        checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq)
      }
    }
  }

private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
    try {
      checkpointLogRecoveryOffsetsInDir(dir)
      logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
    } catch {
      case e: IOException =>
        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
          s"file in directory $dir", e)
    }
  }  

checkpointLogRecoveryOffsetsInDir 核心方法是调用 kafka.server.checkpoints.CheckpointFile.write( )方法,主要逻辑是:

  1. 先将log 目录下的所有 recoveryPoint 写到 tmp 临时文件中。
  2. 然后用tmp 文件替换原来的 RecoveryPointCheckpoint 文件。 比较简单,可以自行查看源码。

2.6 定时任务 kafka-log-start-offset-checkpoint

log start offset 的检查任务(它用来标识日志的起始偏移量。各个副本在变动 LEO 和 HW的过程中,logStartOffset 也有可能随之而动,Kafka 也有一个定时任务来负责将所有分区的logStartOffset书写到起始点文件

def checkpointLogStartOffsets(): Unit = {
    liveLogDirs.foreach(checkpointLogStartOffsetsInDir)
  }

private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
    for {
      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
      checkpoint <- logStartOffsetCheckpoints.get(dir)
    } {
      try {
        val logStartOffsets = partitionToLog.collect {
          case (k, log) if log.logStartOffset > log.logSegments.head.baseOffset => k -> log.logStartOffset
        }
        checkpoint.write(logStartOffsets)
      } catch {
        case e: IOException =>
          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e)
      }
    }
  }  

此定时任务和 kafka-recovery-point-checkpoint 任务的逻辑是一样的。


2.7 定时任务 kafka-delete-logs

定时把.delete后缀的日志删除,此定时任务要与 Kafka-log-retention区分开来。

  • Kafka-log-retention 是删除过期的 logSegment对象
  • kafka-delete-logs 是删除 delete后缀的文件。
    • LogManager 初始化的时候 会调用 loadLogs 方法加载日志,如果发现上次Kafka关闭的时候没有删除的文件,会加入 logsToBeDeleted 队列中,等待此定时任务来删除
    • 用户进行 move 文件的时候,会把旧的文件标记为.delete 加入 logsToBeDeleted 队列中,也就是说Kafka 移动文件并不是移动而是在目标目录下创建并且删除旧文件。

这个定时任务比较简单,就是不断从logsToBeDeleted 队列中取出然后删除。

private def deleteLogs(): Unit = {
    var nextDelayMs = 0L
    try {
      def nextDeleteDelayMs: Long = {
        if (!logsToBeDeleted.isEmpty) {
          //队列中取出
          val (_, scheduleTimeMs) = logsToBeDeleted.peek()
          scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
        } else
          currentDefaultConfig.fileDeleteDelayMs
      }

      while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
        val (removedLog, _) = logsToBeDeleted.take()
        if (removedLog != null) {
          try {
            //删除
            removedLog.delete()
            info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
          } catch {
            case e: KafkaStorageException =>
              error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
          }
        }
      }
    } catch {
      case e: Throwable =>
        error(s"Exception in kafka-delete-logs thread.", e)
    } finally {
      try {
        //重新创建一个定时任务
        scheduler.schedule("kafka-delete-logs",
          deleteLogs _,
          delay = nextDelayMs,
          unit = TimeUnit.MILLISECONDS)
      } catch {
        case e: Throwable =>
          if (scheduler.isStarted) {
            // No errors should occur unless scheduler has been shutdown
            error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
          }
      }
    }
  }

2.8 Cleaner 线程

执行日志压缩的操作线程,与 kafka-log-retention 定时任务的区别在于:

  • kafka-log-retention 是根据 时间日志大小日志起始偏移量三个维度来删除日志。
  • cleaner 线程是清理墓碑消息(key 不为空,value 为空的消息),以及压缩消息(相同 key 保留最新的value)。

    如果消费者只关心key 对应的最新value 值,就可以开启日志压缩功能,以官方的例子说明: key为K1 的offset 0、2、3的三条消息只会保存最新的一条。也就是offset为3的值。

Log 在写入消息的时候其实就是将消息加入到 activeSegment 的日志文件的末尾,为了避免activeSegment 成为热点,activeSegment 不会参与日志压缩操作。

在日志压缩的过程中可以启动多条Cleaner 线程,通过配置进行配置。

每个Log 都可以通过 CleanerPoint 为分隔成两个部分,如下图所示:

  • Log Head 部分就是和普通日志一样保存了所有的消息
  • Log Tail 部分就是经过Cleaner线程清理过的部分。
  • 可以通过 min.cleanable.dirty.ratio来配置指定topic 的日志dirty 部分占比,占比越高那么会优先进行清除,也可以通过 log.cleaner.min.cleanable.ratio 配置整个broker中的topic
  • Cleaner线程通过 dirty 的占比选出需要被清理的Log 后, 首先会为dirty 部分的消息建立 key 与其 last_offset 的映射关系,通过SkimpyOffsetMap维护。

日志压缩的相关实现类

  • LogCleaner:通过cleaner 字段管理 CleanerThread 线程,通过 startup()方法和 shutdown 方法 完成 CleanerThread 线程的启动和停止
  • CleanerThread:日志压缩的真正逻辑的地方,继承 ShutdownableThread
  • ShutdownableThread:继承Thread,给 CleanerThread 提供基础方法,比如 initiateShutdown 、awaitShutdownrun 方法
  • Cleaner:CleanerThread 线程的一些方法都委托给了 Cleaner 。
  • LogCleanerManager:负责每个log 的压缩状态管理以及 cleaner checkpoint 信息维护
  //启动 CleanerThread 线程。
  def startup(): Unit = {
    info("Starting the log cleaner")
    (0 until config.numThreads).foreach { i =>
      val cleaner = new CleanerThread(i)
      cleaners += cleaner
      cleaner.start()
    }
  }
  • 默认启动一个 CleanerThread 线程,可以通过 log.cleaner.threads 进行配置

CleanerThread

//初始化 Cleaner 对象
val cleaner = new Cleaner(id = threadId,
                              offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt,
                                                              hashAlgorithm = config.hashAlgorithm),
                              ioBufferSize = config.ioBufferSize / config.numThreads / 2,
                              maxIoBufferSize = config.maxMessageSize,
                              dupBufferLoadFactor = config.dedupeBufferLoadFactor,
                              throttler = throttler,
                              time = time,
                              checkDone = checkDone)

//核心方法                              
override def doWork(): Unit = {
      val cleaned = tryCleanFilthiestLog()
      if (!cleaned)
        pause(config.backOffMs, TimeUnit.MILLISECONDS)
    }

private def tryCleanFilthiestLog(): Boolean = {
      try {
        cleanFilthiestLog()
      } catch {
        case e: LogCleaningException =>
          warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e)
          cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition)

          false
      }
    }

@throws(classOf[LogCleaningException])
private def cleanFilthiestLog(): Boolean = {
      val preCleanStats = new PreCleanStats()
      //① 获取需要进行日志压缩的 log
      val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match {
        case None =>
          false
        case Some(cleanable) =>
          // there's a log, clean it
          this.lastPreCleanStats = preCleanStats
          try {
            //② 核心方法 进行压缩
            cleanLog(cleanable)
            true
          } catch {
            case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
            case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage, e)
          }
    }
      val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
      try {
        deletable.foreach { case (_, log) =>
          try {
            log.deleteOldSegments()
          } catch {
            case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
            case e: Exception => throw new LogCleaningException(log, e.getMessage, e)
          }
        }
      } finally  {
        cleanerManager.doneDeleting(deletable.map(_._1))
      }

      cleaned
    }

private def cleanLog(cleanable: LogToClean): Unit = {
      var endOffset = cleanable.firstDirtyOffset
      try {
        val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
        recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
        endOffset = nextDirtyOffset
      } catch {
        case _: LogCleaningAbortedException => // task can be aborted, let it go.
        case _: KafkaStorageException => // partition is already offline. let it go.
        case e: IOException =>
          val logDirectory = cleanable.log.dir.getParent
          val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException"
          logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e)
      } finally {
        cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
      }
  }                                  
  • 初始化 Cleaner 对象的重要参数

    • offsetMap:是一个SkimpyOffsetMap 类型的对象,是为 dirty 部分的消息建立 key 与 last_offset 的映射关系。
    • ioBufferSize: 读写 LogSegment 的 byteBuffer 的大小
    • maxIoBufferSizeSegment:消息的最大长度
    • dupBufferLoadFactor:指定了SkimpyOffsetMap 的最大占用比率
    • throttler:用来限制读写LogSegment的速度
    • checkDone:检查Log的压缩状态
  • ① 通过LogCleanerManager#grabFilthiestCompactedLog获取需要进行日志压缩的 log

    def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): Option[LogToClean] = {
      inLock(lock) {
        //当前时间
        val now = time.milliseconds
        //监控指标 记录 cleaner 线程跑的时间
        this.timeOfLastRun = now
        val lastClean = allCleanerCheckpoints
        val dirtyLogs = logs.filter {
          //① 过滤掉 cleaner.policy 不是 compact 的log
          case (_, log) => log.config.compact  // match logs that are marked as compacted
        }.filterNot {
          case (topicPartition, log) =>
            //② skip any logs already in-progress and uncleanable partitions
            // 过滤掉已经 in-progress 集合中的 不需要清理的
            inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
        }.map {
          case (topicPartition, log) => // create a LogToClean instance for each
            try {
              //计算出 firstDirtyOffset, firstDirtyOffset 的值可能是 logStartOffset 也可能是 clean checkpoint
              val lastCleanOffset = lastClean.get(topicPartition)
              val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset, now)
              val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
              preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
    
              //③ 为每个 Log 创建一个 LogToClean对象,维护了每个Log的clean部分的字节数、dirty部分字节数 以及 cleanableRatio
              LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0)
            } catch {
              case e: Throwable => throw new LogCleaningException(log,
                s"Failed to calculate log cleaning stats for partition $topicPartition", e)
            }
        }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
    
        //④ 获取 dirtyLogs 集合中 cleanableRatio 的最大值
        this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
        // and must meet the minimum threshold for dirty byte ratio or have some bytes required to be compacted
        //⑤ 过滤掉 cleanableRatio 小于配置的log
        val cleanableLogs = dirtyLogs.filter { ltc =>
          (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
        }
        if(cleanableLogs.isEmpty) {
          None
        } else {
          //⑥ 选择要压缩的日志,加入inProgress 集合中
          preCleanStats.recordCleanablePartitions(cleanableLogs.size)
          val filthiest = cleanableLogs.max
          inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
          Some(filthiest)
        }
      }
    }
    
  • ② 核心方法 调用一开始初始化的 Cleaner对象中的clean方法进行压缩,通过下图可以更好理解clean方法

    private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
      // figure out the timestamp below which it is safe to remove delete tombstones
      // this position is defined to be a configurable time beneath the last modified time of the last clean segment
      //计算可以安全删除 "删除标识" (即value 为空的消息) 的logSegment
      val deleteHorizonMs =
        cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
          case None => 0L
          case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
      }
    
      doClean(cleanable, deleteHorizonMs)
    }
    
    private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
      info("Beginning cleaning of log %s.".format(cleanable.log.name))
    
      val log = cleanable.log
      val stats = new CleanerStats()
    
      // build the offset map
      info("Building offset map for %s...".format(cleanable.log.name))
      //① 确认日志压缩的上限,因为 activeSegment不参与日志压缩,所以可以确定日志压缩的最大上限是 activeSegment.baseOffset。
      val upperBoundOffset = cleanable.firstUncleanableOffset
      //② 填充 offsetMap,确定日志压缩的真正上限
      buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats)
      val endOffset = offsetMap.latestOffset + 1
      stats.indexDone()
    
      // determine the timestamp up to which the log will be cleaned
      // this is the lower of the last active segment and the compaction lag
      //计算出将被清理到的时间戳,可以和upperBoundOffset 一起理解。 一个是offset 一个是timestamp
      val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
    
      // group the segments and clean the groups
      info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
      val transactionMetadata = new CleanedTransactionMetadata
    
      //③ 对要压缩的Segment进行分区,并且进行分组进行clean
      val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
        log.config.maxIndexSize, cleanable.firstUncleanableOffset)
      for (group <- groupedSegments)
        //④ 清理
        cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata)
    
      // record buffer utilization
      stats.bufferUtilization = offsetMap.utilization
    
      stats.allDone()
    
      (endOffset, stats)
    }
    
    • ① 确认日志压缩的上限,因为 activeSegment不参与日志压缩,所以可以确定日志压缩的最大上限是 activeSegment.baseOffset
    • ② 填充 offsetMap,确定日志压缩的真正上限
    private[log] def buildOffsetMap(log: Log,
                                    start: Long,
                                    end: Long,
                                    map: OffsetMap,
                                    stats: CleanerStats): Unit = {
      map.clear()
      //查找从firstDirtyOffset ~ upperBoundOffset 所有的 LogSegment
      val dirty = log.logSegments(start, end).toBuffer
      val nextSegmentStartOffsets = new ListBuffer[Long]
      if (dirty.nonEmpty) {
        for (nextSegment <- dirty.tail) nextSegmentStartOffsets.append(nextSegment.baseOffset)
        nextSegmentStartOffsets.append(end)
      }
      info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
    
      //事务有关
      val transactionMetadata = new CleanedTransactionMetadata
      val abortedTransactions = log.collectAbortedTransactions(start, end)
      transactionMetadata.addAbortedTransactions(abortedTransactions)
    
      // Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
      // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
      var full = false
      //遍历dirty集合,循环条件是offsetMap 未被填满
      for ( (segment, nextSegmentStartOffset) <- dirty.zip(nextSegmentStartOffsets) if !full) {
        //检查 LogCleanerManager 记录的该分区的压缩状态
        checkDone(log.topicPartition)
        //处理单个 logSegment,将消息的key 和 offset添加到 OffsetMap 中
        full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, nextSegmentStartOffset, log.config.maxMessageSize,
          transactionMetadata, stats)
        if (full)
          debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
      }
      info("Offset map for log %s complete.".format(log.name))
    }
    
    private def buildOffsetMapForSegment(topicPartition: TopicPartition,
                                         segment: LogSegment,
                                         map: OffsetMap,
                                         startOffset: Long,
                                         nextSegmentStartOffset: Long,
                                         maxLogMessageSize: Int,
                                         transactionMetadata: CleanedTransactionMetadata,
                                         stats: CleanerStats): Boolean = {
      var position = segment.offsetIndex.lookup(startOffset).position
      val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
      while (position < segment.log.sizeInBytes) {//遍历 LogSegment
        checkDone(topicPartition) //检查压缩状态
        readBuffer.clear()
        try {
          //从logSegment中读取消息
          segment.log.readInto(readBuffer, position)
        } catch {
          case e: Exception =>
            throw new KafkaException(s"Failed to read from segment $segment of partition $topicPartition " +
              "while loading offset map", e)
        }
        val records = MemoryRecords.readableRecords(readBuffer)
        throttler.maybeThrottle(records.sizeInBytes)
    
        val startPosition = position
        for (batch <- records.batches.asScala) {
          if (batch.isControlBatch) {
            transactionMetadata.onControlBatchRead(batch)
            stats.indexMessagesRead(1)
          } else {
            val isAborted = transactionMetadata.onBatchRead(batch)
            if (isAborted) {
              // If the batch is aborted, do not bother populating the offset map.
              // Note that abort markers are supported in v2 and above, which means count is defined.
              stats.indexMessagesRead(batch.countOrNull)
            } else {
              for (record <- batch.asScala) {
                //只处理有key的消息
                if (record.hasKey && record.offset >= startOffset) {
                  if (map.size < maxDesiredMapSize)
                    //将key和offset 放入 offsetMap中
                    map.put(record.key, record.offset)
                  else
                    return true
                }
                stats.indexMessagesRead(1)
              }
            }
          }
    
          if (batch.lastOffset >= startOffset)
            map.updateLatestOffset(batch.lastOffset)
        }
        val bytesRead = records.validBytes
        //移动position 准备下次读取
        position += bytesRead
        stats.indexBytesRead(bytesRead)
    
        // if we didn't read even one complete message, our read buffer may be too small
        //如果position 没有移动表示没有读取到一个完整的 message, 则对 readbuffer writebuffer进行扩容
        if(position == startPosition)
          growBuffersOrFail(segment.log, position, maxLogMessageSize, records)
        }
    
        // In the case of offsets gap, fast forward to latest expected offset in this segment.
        map.updateLatestOffset(nextSegmentStartOffset - 1L)
    
        //重置readbuffer writebuffer 大小
        restoreBuffers()
        false
        }
      }
    
    • ③ 对要压缩的Segment进行分区,并且进行分组进行 clean
    • ④ 清理
    private[log] def cleanSegments(log: Log,
                                   segments: Seq[LogSegment],
                                   map: OffsetMap,
                                   deleteHorizonMs: Long,
                                   stats: CleanerStats,
                                   transactionMetadata: CleanedTransactionMetadata): Unit = {
      // create a new segment with a suffix appended to the name of the log and indexes
      //创建 ".clean" 后缀的日志文件和索引文件,文件名是分组中第一个 logSegment 的 baseOffset
      val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset)
      transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
    
      try {
        // clean segments into the new destination segment
        val iter = segments.iterator
        var currentSegmentOpt: Option[LogSegment] = Some(iter.next())
        val lastOffsetOfActiveProducers = log.lastRecordsOfActiveProducers
    
        while (currentSegmentOpt.isDefined) {
          val currentSegment = currentSegmentOpt.get
          val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None
    
          val startOffset = currentSegment.baseOffset
          val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1)
          val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
          transactionMetadata.addAbortedTransactions(abortedTransactions)
    
          val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
          info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
            s"with deletion horizon $deleteHorizonMs, " +
            s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
    
          try {
            //进行日志压缩操作
            cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize,
              transactionMetadata, lastOffsetOfActiveProducers, stats)
          } catch {
            case e: LogSegmentOffsetOverflowException =>
              // Split the current segment. It's also safest to abort the current cleaning process, so that we retry from
              // scratch once the split is complete.
              info(s"Caught segment overflow error during cleaning: ${e.getMessage}")
              log.splitOverflowedSegment(currentSegment)
              throw new LogCleaningAbortedException()
          }
          currentSegmentOpt = nextSegmentOpt
        }
    
        cleaned.onBecomeInactiveSegment()
        // flush new segment to disk before swap
        //执行flush 操作,将数据刷新到磁盘上
        cleaned.flush()
    
        // update the modification date to retain the last modified date of the original files
        //更新最后的修改时间
        val modified = segments.last.lastModified
        cleaned.lastModified = modified
    
        // swap in new segment
        info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log")
        //将.clean 后缀改为 .swap 后缀
        //将 cleaned 对象加入到 segments 中
        //将分组中的 logSegment 从 segments 中删除
        //最后将文件的 .swap 后缀删除
        log.replaceSegments(List(cleaned), segments)
      } catch {
        case e: LogCleaningAbortedException =>
          try cleaned.deleteIfExists()
          catch {
            case deleteException: Exception =>
              e.addSuppressed(deleteException)
          } finally throw e
      }
    }
    
    • 日志压缩
    private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
                              retainDeletesAndTxnMarkers: Boolean,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, LastRecord],
                              stats: CleanerStats): Unit = {
     val logCleanerFilter: RecordFilter = new RecordFilter {
       var discardBatchRecords: Boolean = _
    
       override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
         // we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
         // note that we will never delete a marker until all the records from that transaction are removed.
         discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
    
         def isBatchLastRecordOfProducer: Boolean = {
           // We retain the batch in order to preserve the state of active producers. There are three cases:
           // 1) The producer is no longer active, which means we can delete all records for that producer.
           // 2) The producer is still active and has a last data offset. We retain the batch that contains
           //    this offset since it also contains the last sequence number for this producer.
           // 3) The last entry in the log is a transaction marker. We retain this marker since it has the
           //    last producer epoch, which is needed to ensure fencing.
           lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord =>
             lastRecord.lastDataOffset match {
               case Some(offset) => batch.lastOffset == offset
               case None => batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch
             }
           }
         }
    
         if (batch.hasProducerId && isBatchLastRecordOfProducer)
           BatchRetention.RETAIN_EMPTY
         else if (discardBatchRecords)
           BatchRetention.DELETE
         else
           BatchRetention.DELETE_EMPTY
       }
    
       override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = {
         //调用上面的 checkBatchRetention 方法判断是否要丢弃。
         if (discardBatchRecords)
           // The batch is only retained to preserve producer sequence information; the records can be removed
           false
         else
           //是否保存这个消息,有三个条件
           //1.此消息是否含有key
           //2.offsetMap中是否有相同的key
           //3.value 不为空 或者 value 为空但是现在不可以删除。
           Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats)
       }
     }
    
     var position = 0
     // 遍历待压缩的 LogSegment
     while (position < sourceRecords.sizeInBytes) {
       //检查压缩状态
       checkDone(topicPartition)
       // read a chunk of messages and copy any that are to be retained to the write buffer to be written out
       readBuffer.clear()
       writeBuffer.clear()
    
       //读取消息
       sourceRecords.readInto(readBuffer, position)
       val records = MemoryRecords.readableRecords(readBuffer)
       //是否限制读取速率
       throttler.maybeThrottle(records.sizeInBytes)
       val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize, decompressionBufferSupplier)
       stats.readMessages(result.messagesRead, result.bytesRead)
       stats.recopyMessages(result.messagesRetained, result.bytesRetained)
    
       position += result.bytesRead
    
       // if any messages are to be retained, write them out
       val outputBuffer = result.outputBuffer
       if (outputBuffer.position() > 0) {
         outputBuffer.flip()
         val retained = MemoryRecords.readableRecords(outputBuffer)
         // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
         // after `Log.replaceSegments` (which acquires the lock) is called
         //添加到目标legsegment中
         dest.append(largestOffset = result.maxOffset,
           largestTimestamp = result.maxTimestamp,
           shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
           records = retained)
         throttler.maybeThrottle(outputBuffer.limit())
       }
    
       // if we read bytes but didn't get even one complete batch, our I/O buffer is too small, grow it and try again
       // `result.bytesRead` contains bytes from `messagesRead` and any discarded batches.
       // 未读取一个完整的消息,表示 readBuffer 过小,需要扩容
       if (readBuffer.limit() > 0 && result.bytesRead == 0)
         growBuffersOrFail(sourceRecords, position, maxLogMessageSize, records)
     }
     //重置 readBuffer 和 writeBuffer
     restoreBuffers()
    }
    

参考:

Kafka-2.5.0 源码

Kafka 官网