1. 概述
在 Kafka 服务端(三)kafka-server-start.sh 一文中讲述了我们启动Kafka之后,Kafka 每个模块初始化的入口类。
Kafka 的所有的消息都是通过日志来存储的,它是通过 LogManager
来进行初始化的,Log 类是真正操作日志的,LogManager 是用来管理 Log 的。
LogManager
提供了创建、恢复、删除、查询 Log集合等功能,所有的读写
操作都委托给 Log
去完成,并且在LogManager
初始化的时候会启动五个定时任务
以及启动一个后台线程 Cleaner
。
2. 源码解析
在Kafka 启动初始化的时候调用 LogManager#startup
进行初始化日志模块
def startup(): Unit = {
//...省略
/* start log manager */
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
//...省略
}
2.1 LogManager
LogManager 类中的参数与属性
class LogManager(logDirs: Seq[File],
initialOfflineDirs: Seq[File],
val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation
val initialDefaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
recoveryThreadsPerDataDir: Int,
val flushCheckMs: Long,
val flushRecoveryOffsetCheckpointMs: Long,
val flushStartOffsetCheckpointMs: Long,
val retentionCheckMs: Long,
val maxPidExpirationMs: Int,
scheduler: Scheduler,
val brokerState: BrokerState,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
time: Time) extends Logging with KafkaMetricsGroup {
val LockFile = ".lock"
val InitialTaskDelayMs = 30 * 1000
private val logCreationOrDeletionLock = new Object
private val currentLogs = new Pool[TopicPartition, Log]()
private val futureLogs = new Pool[TopicPartition, Log]()
private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
@volatile private var _currentDefaultConfig = initialDefaultConfig
@volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir
private[log] val partitionsInitializing = new ConcurrentHashMap[TopicPartition, Boolean]().asScala
def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
this._currentDefaultConfig = logConfig
}
def currentDefaultConfig: LogConfig = _currentDefaultConfig
def liveLogDirs: Seq[File] = {
if (_liveLogDirs.size == logDirs.size)
logDirs
else
_liveLogDirs.asScala.toBuffer
}
//dirLocks:FileLock 集合,这些FileLock 用来在文件系统层面为每个Log 目录加文件锁。在LogManager 对象初始化时,就会将所有的log目录加锁
private val dirLocks = lockLogDirs(liveLogDirs)
//Map(File,OffsetCheckpointFile) 类型,用于管理每个log目录与其下的RecoveryPointCheckPoint文件之间的映射关系,在LogManager 对象初始化时,
//会在每个目录下创建一个对应的RecoveryPointCheckpoint 文件。此map 的value 是offsetCheckpoint对象。其中封装了对应log目录下的RecoveryPointCheckpoint文件,
//并提供对RecoveryPointCheckpoint 文件的读写操作。Recovery文件中则记录了该log目录下的所有Log的recoveryPoint。
@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
@volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()
private def offlineLogDirs: Iterable[File] = {
val logDirsSet = mutable.Set[File]() ++= logDirs
_liveLogDirs.asScala.foreach(logDirsSet -=)
logDirsSet
}
loadLogs()
private[kafka] val cleaner: LogCleaner =
if (cleanerConfig.enableCleaner)
new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
else
null
}
-
logDirs
:log 目录集合,在server.properties
配置文件中通过log.dirs
项指定的多个目录。每个目录下可以创建多个Log,每个log 都有自己对应的目录,LogManager 在创建Log时会选择Log最少的log目录创建Log -
initialOfflineDirs
: -
topicConfigs
:topic的一些信息,从zk中读取。Kafka 目前元数据
存取以及KafkaController
选举都是通过zk的,但是kafka已经有计划替换掉zk了,详情可以查看此连接 KIP-500val (topicConfigs, failed) = zkClient.getLogConfigs( zkClient.getAllTopicsInCluster, defaultProps )
-
initialDefaultConfig
:Kafka关于 Log 的一些配置比如:分段大小,flush间隔时间,日志清理的配置等。val defaultProps = KafkaServer.copyKafkaConfigToLog(config) LogConfig.validateValues(defaultProps) val defaultLogConfig = LogConfig(defaultProps)
-
cleanerConfig
:cleaner 线程的配置case class CleanerConfig(numThreads: Int = 1, //启动多少个 cleaner 线程 dedupeBufferSize: Long = 4*1024*1024L, //用于日志重复数据删除的总内存 4mb dedupeBufferLoadFactor: Double = 0.9d,//重复数据删除缓冲区的最大已用百分比 ioBufferSize: Int = 1024*1024, maxMessageSize: Int = 32*1024*1024, //日志中可以显示的消息的最大大小 maxIoBytesPerSecond: Double = Double.MaxValue, //允许所有 cleaner 线程执行的最大读写I / O backOffMs: Long = 15 * 1000, //如果没有可清除的日志则等待的时间 enableCleaner: Boolean = true, //是否开启cleaner 线程 hashAlgorithm: String = "MD5") { //key 比较的算法 }
numThreads
:启动多少个 cleaner 线程dedupeBufferSize
:用于日志重复数据删除的总内存 4mbdedupeBufferLoadFactor
:重复数据删除缓冲区的最大已用百分比maxMessageSize
:日志中可以显示的消息的最大大小maxIoBytesPerSecond
:允许所有 cleaner 线程执行的最大读写I / ObackOffMs
:如果没有可清除的日志则等待的时间enableCleaner
:是否开启cleaner 线程hashAlgorithm
:key 比较的算法
-
recoveryThreadsPerDataDir
:在Kafka启动的时候处理日志恢复,关闭的时候处理日志flush的线程数量。默认为1个,可以通过num.recovery.threads.per.data.dir
进行配置。 -
flushCheckMs
:从内存刷新到磁盘的间隔时间,通过log.flush.scheduler.interval.ms
进行配置,默认Long.MaxValue
-
flushRecoveryOffsetCheckpointMs
:更新日志恢复点的频率,通过log.flush.offset.checkpoint.interval.ms
进行配置,默认60000ms
-
flushStartOffsetCheckpointMs
:更新日志Log Start Offset
的频率,通过log.flush.start.offset.checkpoint.interval.ms
进行配置,默认60000ms
-
retentionCheckMs
:检查日志是否有过期的频率,通过log.retention.check.interval.ms
进行配置,默认5 * 60 * 1000Lms
-
maxPidExpirationMs
:transactional id 过期的时间,通过transactional.id.expiration.ms
进行配置,默认7天
-
scheduler
:定时器,用的是Java中的ScheduledThreadPoolExecutor
对象 -
brokerState
:broker的状态,有NotRunning
、Starting
、RecoveringFromUncleanShutdown
、RunningAsBroker
、PendingControlledShutdown
、BrokerShuttingDown
这几种状态。 -
brokerTopicStats
:broker中topic 的状态 -
logDirFailureChannel
:访问日志目录失败的,会加入此对象中的offlineLogDirs
和offlineLogDirQueue
class LogDirFailureChannel(logDirNum: Int) extends Logging { private val offlineLogDirs = new ConcurrentHashMap[String, String] private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum) }
-
InitialTaskDelayMs
:定时任务启动的延迟时间,30 * 1000
ms -
logCreationOrDeletionLock
:锁 -
currentLogs
:用于管理TopicAndPartition
与Log
之间的对应关系。使用的是Kafka自定义的pool
类型对象,底层是jdk 提供的ConcurrentHashMap
private val currentLogs = new Pool[TopicPartition, Log]() class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { private val pool: ConcurrentMap[K, V] = new ConcurrentHashMap[K, V] }
-
futureLogs
:用于存放-future
后缀的日志,当用户在想在相同的broker 移动到另外一个replica 中的时候会创建-future 的日志,使用的是Kafka自定义的pool
类型对象,底层是jdk 提供的ConcurrentHashMap
-
logsToBeDeleted
: 需要被删除的日志的集合,使用jdk 提供的LinkedBlockingQueue
-
partitionsInitializing
:用于解决 KAFKA-8813 这个 bug大致的意思是:
Topic-Partition 下的 log 初始化有以下几个步骤
- 从 zk 中拉取 log 的配置
- 调用 LogManager.getOrCreateLog 方法创建 log 对象
- 注册 log 对象
但是如果在
第二步
创建完 log 对象之后,如果用户修改了配置,那么这个配置将无法更新。所以通过在
LogManager
初始化的时候,在partitionsInitializing
集合中,插入Topic-Partition ,false。
如果有人修改了配置,那么该
Topic-Partition
的值设置为true
在初始化完结束后检查是否为
true
如果是的话,重新从zk
中拉取配置,重新创建log
对象 -
_liveLogDirs
:创建或者获取 log.dirs 项指定的多个目录,要求:没有重复,都是可读的,通过 ConcurrentLinkedQueue 存储 -
dirLocks
:FileLock 集合,这些FileLock
用来在文件系统层面为每个Log 目录加文件锁。在LogManager 对象初始化时,就会将所有的log目录加锁 -
recoveryPointCheckpoints
: 用于管理每个log目录与其下的RecoveryPointCheckPoint
文件之间的映射关系,Map(File,OffsetCheckpointFile)
类型。在
LogManager
对象初始化时,会在每个目录下创建一个对应的RecoveryPointCheckpoint
文件。此map 的value是offsetCheckpoint
对象。其中封装了对应log目录下的RecoveryPointCheckpoint文件,并提供对
RecoveryPointCheckpoint
文件的读写操作。Recovery文件中则记录了该log目录下的所有Log的recoveryPoint
。 -
logStartOffsetCheckpoints
:用于管理每个log目录与其下的LogStartOffsetCheckpoint
文件之间的映射关系,Map(File,OffsetCheckpointFile)
类型。 -
preferredLogDirs
:用于管理每个log目录与其下的preferredLog
之间的映射关系 -
cleaner
:cleaner 线程
2.2 startup( )
def startup(): Unit = {
/* Schedule the cleanup task to delete old logs */
//在KafkaServer 中初始化的 Schedule 传入,开定时的清理log的任务
if (scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention", //LogSegment 清理任务
cleanupLogs _, //核心方法
delay = InitialTaskDelayMs, // 30 * 1000 启动延迟时间30秒
period = retentionCheckMs, // 每次检查过期 log 的间隔时间 通过 log.retention.check.interval.ms 配置 默认300000毫秒也就是每5分钟检查一次。
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher", //定时 flush log 到磁盘
flushDirtyLogs _, //核心方法
delay = InitialTaskDelayMs,
period = flushCheckMs,//检查时间 通过log.flush.scheduler.interval.ms 配置 默认 Long.maxValue 2的63次方 -1.
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint", //定时将recovery-point 写入到 kafka-recovery-point-checkpoint文件中。
checkpointLogRecoveryOffsets _, //核心方法
delay = InitialTaskDelayMs,
period = flushRecoveryOffsetCheckpointMs, //间隔时间 通过log.flush.offset.checkpoint.interval.ms 配置,默认60000 毫秒
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-log-start-offset-checkpoint", //log start offset 的检查任务(它用来标识日志的起始偏移量。各个副本在变动 LEO 和 HW的过程中,logStartOffset 也有可能随之而动,Kafka 也有一个定时任务来负责将所有分区的logStartOffset书写到起始点文件log-start-offset-checkpoint中)
checkpointLogStartOffsets _,//核心方法
delay = InitialTaskDelayMs,
period = flushStartOffsetCheckpointMs, //间隔时间 通过 log.flush.start.offset.checkpoint.interval.ms配置,默认60000 毫秒
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
deleteLogs _,//核心方法
delay = InitialTaskDelayMs,
unit = TimeUnit.MILLISECONDS)
}
if (cleanerConfig.enableCleaner)
cleaner.startup() //启动cleaner 线程
}
LogManager 初始化方法就启动了5个定时任务和一个cleaner 线程
kafka-log-retention
:清理过期的LogSegment
任务kafka-log-flusher
:定时 flush log 到磁盘kafka-recovery-point-checkpoint
:定时将recovery-point 写入到kafka-recovery-point-checkpoint
文件kafka-log-start-offset-checkpoint
:log start offset
的检查任务(它用来标识日志的起始偏移量。各个副本在变动 LEO 和 HW的过程中,logStartOffset 也有可能随之而动,Kafka 也有一个定时任务来负责将所有分区的logStartOffset
书写到起始点文件kafka-delete-logs
:定时把.delete
后缀的日志删除cleaner 线程
:用于压缩日志
2.3 定时任务 Kafka-log-retention
清理过期的 LogSegment
任务
def cleanupLogs(): Unit = {
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
// clean current logs.
val deletableLogs = {
if (cleaner != null) {
// prevent cleaner from working on same partitions when changing cleanup policy
//① 先中断当前 topic-partition 下正在进行执行cleaner的线程
cleaner.pauseCleaningForNonCompactedPartitions()
} else {
//② 过滤掉 cleanup.policy 配置的不是 delete 的 log
currentLogs.filter {
case (_, log) => !log.config.compact
}
}
}
//到这里了 deletableLogs 中log的cleanup.policy 配置是delete,开始进行删除
try {
deletableLogs.foreach {
case (topicPartition, log) =>
debug(s"Garbage collecting '${log.name}'")
//③ 委托给 log.deleteOldSegments() 方法进行删除过期的 `logSegment`
total += log.deleteOldSegments()
val futureLog = futureLogs.get(topicPartition)
if (futureLog != null) {
// clean future logs
debug(s"Garbage collecting future log '${futureLog.name}'")
total += futureLog.deleteOldSegments()
}
}
} finally {
//④ 如果在第①步中中断了cleaner 线程,则在这里进行恢复。也就是说在topic-partition的同一时刻 只有一个cleaner 对其进行清理
if (cleaner != null) {
cleaner.resumeCleaning(deletableLogs.map(_._1))
}
}
debug(s"Log cleanup completed. $total files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}
- ① 先中断当前
topic-partition
下正在进行执行cleaner
的线程 - ② 过滤掉
cleanup.policy
配置的不是delete
的 log - ③ 委托给
log.deleteOldSegments()
方法进行删除过期的logSegment
- 此方法在 Kafka 日志模块(一)Log 的
2.7.3 LogSegment
日志分段小节已经分析了就不重复分析了
- 此方法在 Kafka 日志模块(一)Log 的
- ④ 如果在第
①
步中中断了cleaner
线程,则在这里进行恢复。也就是说在topic-partition的同一时刻 只有一个cleaner 对其进行清理
2.4 定时任务 kafka-log-flusher
定时 flush log 到磁盘任务
private def flushDirtyLogs(): Unit = {
debug("Checking for dirty logs to flush...")
//① 遍历 currentLogs 和 futureLogs 集合
for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
try {
//② 计算上一次flush 的时间与当前时间做比较
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
//③ 如果满足 flush.ms 配置的时间,则调用flush 方法 刷新到磁盘上
//会把 recoverPoint ~ LEO 之间的消息数据刷新到磁盘上,并修改recoverPoint 值
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
} catch {
case e: Throwable =>
error(s"Error flushing topic ${topicPartition.topic}", e)
}
}
}
}
- ① 遍历
currentLogs
和futureLogs
集合 - ② 计算上一次
flush
的时间与当前时间做比较 - ③ 如果满足
flush.ms
配置的时间,则调用flush
方法 刷新到磁盘上,会把recoverPoint ~ LEO
之间的消息数据刷新到磁盘上,并修改recoverPoint
值
2.5 定时任务 kafka-recovery-point-checkpoint
定时将recovery-point
写入到 kafka-recovery-point-checkpoint
文件
def checkpointLogRecoveryOffsets(): Unit = {
logsByDir.foreach { case (dir, partitionToLogMap) =>
liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f =>
checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq)
}
}
}
private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
try {
checkpointLogRecoveryOffsetsInDir(dir)
logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
s"file in directory $dir", e)
}
}
checkpointLogRecoveryOffsetsInDir
核心方法是调用 kafka.server.checkpoints.CheckpointFile.write( )
方法,主要逻辑是:
- 先将log 目录下的所有
recoveryPoint
写到tmp
临时文件中。 - 然后用
tmp
文件替换原来的RecoveryPointCheckpoint
文件。 比较简单,可以自行查看源码。
2.6 定时任务 kafka-log-start-offset-checkpoint
log start offset
的检查任务(它用来标识日志的起始偏移量。各个副本在变动 LEO 和 HW的过程中,logStartOffset 也有可能随之而动,Kafka 也有一个定时任务来负责将所有分区的logStartOffset
书写到起始点文件
def checkpointLogStartOffsets(): Unit = {
liveLogDirs.foreach(checkpointLogStartOffsetsInDir)
}
private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
for {
partitionToLog <- logsByDir.get(dir.getAbsolutePath)
checkpoint <- logStartOffsetCheckpoints.get(dir)
} {
try {
val logStartOffsets = partitionToLog.collect {
case (k, log) if log.logStartOffset > log.logSegments.head.baseOffset => k -> log.logStartOffset
}
checkpoint.write(logStartOffsets)
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e)
}
}
}
此定时任务和 kafka-recovery-point-checkpoint
任务的逻辑是一样的。
2.7 定时任务 kafka-delete-logs
定时把.delete
后缀的日志删除,此定时任务要与 Kafka-log-retention
区分开来。
Kafka-log-retention
是删除过期的 logSegment对象kafka-delete-logs
是删除 delete后缀的文件。- 在
LogManager
初始化的时候 会调用 loadLogs 方法加载日志,如果发现上次Kafka关闭的时候没有删除的文件,会加入logsToBeDeleted
队列中,等待此定时任务来删除 - 用户进行
move
文件的时候,会把旧的文件标记为.delete
加入logsToBeDeleted
队列中,也就是说Kafka 移动文件并不是移动而是在目标目录下创建并且删除旧文件。
- 在
这个定时任务比较简单,就是不断从logsToBeDeleted
队列中取出然后删除。
private def deleteLogs(): Unit = {
var nextDelayMs = 0L
try {
def nextDeleteDelayMs: Long = {
if (!logsToBeDeleted.isEmpty) {
//队列中取出
val (_, scheduleTimeMs) = logsToBeDeleted.peek()
scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
} else
currentDefaultConfig.fileDeleteDelayMs
}
while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
val (removedLog, _) = logsToBeDeleted.take()
if (removedLog != null) {
try {
//删除
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
case e: KafkaStorageException =>
error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
}
}
}
} catch {
case e: Throwable =>
error(s"Exception in kafka-delete-logs thread.", e)
} finally {
try {
//重新创建一个定时任务
scheduler.schedule("kafka-delete-logs",
deleteLogs _,
delay = nextDelayMs,
unit = TimeUnit.MILLISECONDS)
} catch {
case e: Throwable =>
if (scheduler.isStarted) {
// No errors should occur unless scheduler has been shutdown
error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
}
}
}
}
2.8 Cleaner 线程
执行日志压缩的操作线程,与 kafka-log-retention
定时任务的区别在于:
kafka-log-retention
是根据时间
,日志大小
,日志起始偏移量
三个维度来删除日志。cleaner
线程是清理墓碑消息(key 不为空,value 为空的消息),以及压缩消息(相同 key 保留最新的value)。如果消费者只关心key 对应的最新value 值,就可以开启日志压缩功能,以官方的例子说明: key为K1 的offset 0、2、3的三条消息只会保存最新的一条。也就是offset为3的值。
Log 在写入消息的时候其实就是将消息加入到 activeSegment
的日志文件的末尾,为了避免activeSegment 成为热点,activeSegment
不会参与日志压缩操作。
在日志压缩的过程中可以启动多条Cleaner
线程,通过配置进行配置。
每个Log 都可以通过 CleanerPoint
为分隔成两个部分,如下图所示:
Log Head
部分就是和普通日志一样保存了所有的消息Log Tail
部分就是经过Cleaner线程清理过的部分。- 可以通过
min.cleanable.dirty.ratio
来配置指定topic 的日志dirty 部分占比,占比越高那么会优先进行清除,也可以通过log.cleaner.min.cleanable.ratio
配置整个broker中的topic - Cleaner线程通过
dirty
的占比选出需要被清理的Log 后, 首先会为dirty 部分的消息建立key
与其last_offset
的映射关系,通过SkimpyOffsetMap
维护。
日志压缩的相关实现类
LogCleaner
:通过cleaner 字段管理 CleanerThread 线程,通过 startup()方法和 shutdown 方法 完成 CleanerThread 线程的启动和停止CleanerThread
:日志压缩的真正逻辑的地方,继承 ShutdownableThreadShutdownableThread
:继承Thread,给 CleanerThread 提供基础方法,比如initiateShutdown
、awaitShutdown
、run
方法Cleaner
:CleanerThread 线程的一些方法都委托给了 Cleaner 。LogCleanerManager
:负责每个log 的压缩状态管理以及 cleaner checkpoint 信息维护
//启动 CleanerThread 线程。
def startup(): Unit = {
info("Starting the log cleaner")
(0 until config.numThreads).foreach { i =>
val cleaner = new CleanerThread(i)
cleaners += cleaner
cleaner.start()
}
}
- 默认启动一个
CleanerThread
线程,可以通过log.cleaner.threads
进行配置
CleanerThread
//初始化 Cleaner 对象
val cleaner = new Cleaner(id = threadId,
offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt,
hashAlgorithm = config.hashAlgorithm),
ioBufferSize = config.ioBufferSize / config.numThreads / 2,
maxIoBufferSize = config.maxMessageSize,
dupBufferLoadFactor = config.dedupeBufferLoadFactor,
throttler = throttler,
time = time,
checkDone = checkDone)
//核心方法
override def doWork(): Unit = {
val cleaned = tryCleanFilthiestLog()
if (!cleaned)
pause(config.backOffMs, TimeUnit.MILLISECONDS)
}
private def tryCleanFilthiestLog(): Boolean = {
try {
cleanFilthiestLog()
} catch {
case e: LogCleaningException =>
warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e)
cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition)
false
}
}
@throws(classOf[LogCleaningException])
private def cleanFilthiestLog(): Boolean = {
val preCleanStats = new PreCleanStats()
//① 获取需要进行日志压缩的 log
val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match {
case None =>
false
case Some(cleanable) =>
// there's a log, clean it
this.lastPreCleanStats = preCleanStats
try {
//② 核心方法 进行压缩
cleanLog(cleanable)
true
} catch {
case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage, e)
}
}
val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
try {
deletable.foreach { case (_, log) =>
try {
log.deleteOldSegments()
} catch {
case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
case e: Exception => throw new LogCleaningException(log, e.getMessage, e)
}
}
} finally {
cleanerManager.doneDeleting(deletable.map(_._1))
}
cleaned
}
private def cleanLog(cleanable: LogToClean): Unit = {
var endOffset = cleanable.firstDirtyOffset
try {
val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
endOffset = nextDirtyOffset
} catch {
case _: LogCleaningAbortedException => // task can be aborted, let it go.
case _: KafkaStorageException => // partition is already offline. let it go.
case e: IOException =>
val logDirectory = cleanable.log.dir.getParent
val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException"
logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e)
} finally {
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
}
}
-
初始化
Cleaner
对象的重要参数offsetMap
:是一个SkimpyOffsetMap 类型的对象,是为 dirty 部分的消息建立 key 与 last_offset 的映射关系。ioBufferSize
: 读写 LogSegment 的 byteBuffer 的大小maxIoBufferSize
Segment:消息的最大长度dupBufferLoadFactor
:指定了SkimpyOffsetMap 的最大占用比率throttler
:用来限制读写LogSegment的速度checkDone
:检查Log的压缩状态
-
① 通过
LogCleanerManager#grabFilthiestCompactedLog
获取需要进行日志压缩的log
def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): Option[LogToClean] = { inLock(lock) { //当前时间 val now = time.milliseconds //监控指标 记录 cleaner 线程跑的时间 this.timeOfLastRun = now val lastClean = allCleanerCheckpoints val dirtyLogs = logs.filter { //① 过滤掉 cleaner.policy 不是 compact 的log case (_, log) => log.config.compact // match logs that are marked as compacted }.filterNot { case (topicPartition, log) => //② skip any logs already in-progress and uncleanable partitions // 过滤掉已经 in-progress 集合中的 不需要清理的 inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition) }.map { case (topicPartition, log) => // create a LogToClean instance for each try { //计算出 firstDirtyOffset, firstDirtyOffset 的值可能是 logStartOffset 也可能是 clean checkpoint val lastCleanOffset = lastClean.get(topicPartition) val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset, now) val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now) preCleanStats.updateMaxCompactionDelay(compactionDelayMs) //③ 为每个 Log 创建一个 LogToClean对象,维护了每个Log的clean部分的字节数、dirty部分字节数 以及 cleanableRatio LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0) } catch { case e: Throwable => throw new LogCleaningException(log, s"Failed to calculate log cleaning stats for partition $topicPartition", e) } }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs //④ 获取 dirtyLogs 集合中 cleanableRatio 的最大值 this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0 // and must meet the minimum threshold for dirty byte ratio or have some bytes required to be compacted //⑤ 过滤掉 cleanableRatio 小于配置的log val cleanableLogs = dirtyLogs.filter { ltc => (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio } if(cleanableLogs.isEmpty) { None } else { //⑥ 选择要压缩的日志,加入inProgress 集合中 preCleanStats.recordCleanablePartitions(cleanableLogs.size) val filthiest = cleanableLogs.max inProgress.put(filthiest.topicPartition, LogCleaningInProgress) Some(filthiest) } } }
-
② 核心方法 调用一开始初始化的
Cleaner
对象中的clean
方法进行压缩,通过下图可以更好理解clean方法private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment //计算可以安全删除 "删除标识" (即value 为空的消息) 的logSegment val deleteHorizonMs = cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs } doClean(cleanable, deleteHorizonMs) } private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = { info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log val stats = new CleanerStats() // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) //① 确认日志压缩的上限,因为 activeSegment不参与日志压缩,所以可以确定日志压缩的最大上限是 activeSegment.baseOffset。 val upperBoundOffset = cleanable.firstUncleanableOffset //② 填充 offsetMap,确定日志压缩的真正上限 buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) val endOffset = offsetMap.latestOffset + 1 stats.indexDone() // determine the timestamp up to which the log will be cleaned // this is the lower of the last active segment and the compaction lag //计算出将被清理到的时间戳,可以和upperBoundOffset 一起理解。 一个是offset 一个是timestamp val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) val transactionMetadata = new CleanedTransactionMetadata //③ 对要压缩的Segment进行分区,并且进行分组进行clean val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset) for (group <- groupedSegments) //④ 清理 cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata) // record buffer utilization stats.bufferUtilization = offsetMap.utilization stats.allDone() (endOffset, stats) }
- ① 确认日志压缩的上限,因为
activeSegment
不参与日志压缩,所以可以确定日志压缩的最大上限是activeSegment.baseOffset
。 - ② 填充
offsetMap
,确定日志压缩的真正上限
private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap, stats: CleanerStats): Unit = { map.clear() //查找从firstDirtyOffset ~ upperBoundOffset 所有的 LogSegment val dirty = log.logSegments(start, end).toBuffer val nextSegmentStartOffsets = new ListBuffer[Long] if (dirty.nonEmpty) { for (nextSegment <- dirty.tail) nextSegmentStartOffsets.append(nextSegment.baseOffset) nextSegmentStartOffsets.append(end) } info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end)) //事务有关 val transactionMetadata = new CleanedTransactionMetadata val abortedTransactions = log.collectAbortedTransactions(start, end) transactionMetadata.addAbortedTransactions(abortedTransactions) // Add all the cleanable dirty segments. We must take at least map.slots * load_factor, // but we may be able to fit more (if there is lots of duplication in the dirty section of the log) var full = false //遍历dirty集合,循环条件是offsetMap 未被填满 for ( (segment, nextSegmentStartOffset) <- dirty.zip(nextSegmentStartOffsets) if !full) { //检查 LogCleanerManager 记录的该分区的压缩状态 checkDone(log.topicPartition) //处理单个 logSegment,将消息的key 和 offset添加到 OffsetMap 中 full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, nextSegmentStartOffset, log.config.maxMessageSize, transactionMetadata, stats) if (full) debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset)) } info("Offset map for log %s complete.".format(log.name)) } private def buildOffsetMapForSegment(topicPartition: TopicPartition, segment: LogSegment, map: OffsetMap, startOffset: Long, nextSegmentStartOffset: Long, maxLogMessageSize: Int, transactionMetadata: CleanedTransactionMetadata, stats: CleanerStats): Boolean = { var position = segment.offsetIndex.lookup(startOffset).position val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt while (position < segment.log.sizeInBytes) {//遍历 LogSegment checkDone(topicPartition) //检查压缩状态 readBuffer.clear() try { //从logSegment中读取消息 segment.log.readInto(readBuffer, position) } catch { case e: Exception => throw new KafkaException(s"Failed to read from segment $segment of partition $topicPartition " + "while loading offset map", e) } val records = MemoryRecords.readableRecords(readBuffer) throttler.maybeThrottle(records.sizeInBytes) val startPosition = position for (batch <- records.batches.asScala) { if (batch.isControlBatch) { transactionMetadata.onControlBatchRead(batch) stats.indexMessagesRead(1) } else { val isAborted = transactionMetadata.onBatchRead(batch) if (isAborted) { // If the batch is aborted, do not bother populating the offset map. // Note that abort markers are supported in v2 and above, which means count is defined. stats.indexMessagesRead(batch.countOrNull) } else { for (record <- batch.asScala) { //只处理有key的消息 if (record.hasKey && record.offset >= startOffset) { if (map.size < maxDesiredMapSize) //将key和offset 放入 offsetMap中 map.put(record.key, record.offset) else return true } stats.indexMessagesRead(1) } } } if (batch.lastOffset >= startOffset) map.updateLatestOffset(batch.lastOffset) } val bytesRead = records.validBytes //移动position 准备下次读取 position += bytesRead stats.indexBytesRead(bytesRead) // if we didn't read even one complete message, our read buffer may be too small //如果position 没有移动表示没有读取到一个完整的 message, 则对 readbuffer writebuffer进行扩容 if(position == startPosition) growBuffersOrFail(segment.log, position, maxLogMessageSize, records) } // In the case of offsets gap, fast forward to latest expected offset in this segment. map.updateLatestOffset(nextSegmentStartOffset - 1L) //重置readbuffer writebuffer 大小 restoreBuffers() false } }
- ③ 对要压缩的Segment进行
分区
,并且进行分组进行 clean - ④ 清理
private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, deleteHorizonMs: Long, stats: CleanerStats, transactionMetadata: CleanedTransactionMetadata): Unit = { // create a new segment with a suffix appended to the name of the log and indexes //创建 ".clean" 后缀的日志文件和索引文件,文件名是分组中第一个 logSegment 的 baseOffset val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset) transactionMetadata.cleanedIndex = Some(cleaned.txnIndex) try { // clean segments into the new destination segment val iter = segments.iterator var currentSegmentOpt: Option[LogSegment] = Some(iter.next()) val lastOffsetOfActiveProducers = log.lastRecordsOfActiveProducers while (currentSegmentOpt.isDefined) { val currentSegment = currentSegmentOpt.get val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None val startOffset = currentSegment.baseOffset val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1) val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset) transactionMetadata.addAbortedTransactions(abortedTransactions) val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " + s"with deletion horizon $deleteHorizonMs, " + s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.") try { //进行日志压缩操作 cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize, transactionMetadata, lastOffsetOfActiveProducers, stats) } catch { case e: LogSegmentOffsetOverflowException => // Split the current segment. It's also safest to abort the current cleaning process, so that we retry from // scratch once the split is complete. info(s"Caught segment overflow error during cleaning: ${e.getMessage}") log.splitOverflowedSegment(currentSegment) throw new LogCleaningAbortedException() } currentSegmentOpt = nextSegmentOpt } cleaned.onBecomeInactiveSegment() // flush new segment to disk before swap //执行flush 操作,将数据刷新到磁盘上 cleaned.flush() // update the modification date to retain the last modified date of the original files //更新最后的修改时间 val modified = segments.last.lastModified cleaned.lastModified = modified // swap in new segment info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") //将.clean 后缀改为 .swap 后缀 //将 cleaned 对象加入到 segments 中 //将分组中的 logSegment 从 segments 中删除 //最后将文件的 .swap 后缀删除 log.replaceSegments(List(cleaned), segments) } catch { case e: LogCleaningAbortedException => try cleaned.deleteIfExists() catch { case deleteException: Exception => e.addSuppressed(deleteException) } finally throw e } }
- 日志压缩
private[log] def cleanInto(topicPartition: TopicPartition, sourceRecords: FileRecords, dest: LogSegment, map: OffsetMap, retainDeletesAndTxnMarkers: Boolean, maxLogMessageSize: Int, transactionMetadata: CleanedTransactionMetadata, lastRecordsOfActiveProducers: Map[Long, LastRecord], stats: CleanerStats): Unit = { val logCleanerFilter: RecordFilter = new RecordFilter { var discardBatchRecords: Boolean = _ override def checkBatchRetention(batch: RecordBatch): BatchRetention = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) def isBatchLastRecordOfProducer: Boolean = { // We retain the batch in order to preserve the state of active producers. There are three cases: // 1) The producer is no longer active, which means we can delete all records for that producer. // 2) The producer is still active and has a last data offset. We retain the batch that contains // this offset since it also contains the last sequence number for this producer. // 3) The last entry in the log is a transaction marker. We retain this marker since it has the // last producer epoch, which is needed to ensure fencing. lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord => lastRecord.lastDataOffset match { case Some(offset) => batch.lastOffset == offset case None => batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch } } } if (batch.hasProducerId && isBatchLastRecordOfProducer) BatchRetention.RETAIN_EMPTY else if (discardBatchRecords) BatchRetention.DELETE else BatchRetention.DELETE_EMPTY } override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = { //调用上面的 checkBatchRetention 方法判断是否要丢弃。 if (discardBatchRecords) // The batch is only retained to preserve producer sequence information; the records can be removed false else //是否保存这个消息,有三个条件 //1.此消息是否含有key //2.offsetMap中是否有相同的key //3.value 不为空 或者 value 为空但是现在不可以删除。 Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats) } } var position = 0 // 遍历待压缩的 LogSegment while (position < sourceRecords.sizeInBytes) { //检查压缩状态 checkDone(topicPartition) // read a chunk of messages and copy any that are to be retained to the write buffer to be written out readBuffer.clear() writeBuffer.clear() //读取消息 sourceRecords.readInto(readBuffer, position) val records = MemoryRecords.readableRecords(readBuffer) //是否限制读取速率 throttler.maybeThrottle(records.sizeInBytes) val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize, decompressionBufferSupplier) stats.readMessages(result.messagesRead, result.bytesRead) stats.recopyMessages(result.messagesRetained, result.bytesRetained) position += result.bytesRead // if any messages are to be retained, write them out val outputBuffer = result.outputBuffer if (outputBuffer.position() > 0) { outputBuffer.flip() val retained = MemoryRecords.readableRecords(outputBuffer) // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads // after `Log.replaceSegments` (which acquires the lock) is called //添加到目标legsegment中 dest.append(largestOffset = result.maxOffset, largestTimestamp = result.maxTimestamp, shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp, records = retained) throttler.maybeThrottle(outputBuffer.limit()) } // if we read bytes but didn't get even one complete batch, our I/O buffer is too small, grow it and try again // `result.bytesRead` contains bytes from `messagesRead` and any discarded batches. // 未读取一个完整的消息,表示 readBuffer 过小,需要扩容 if (readBuffer.limit() > 0 && result.bytesRead == 0) growBuffersOrFail(sourceRecords, position, maxLogMessageSize, records) } //重置 readBuffer 和 writeBuffer restoreBuffers() }
- ① 确认日志压缩的上限,因为
参考:
Kafka-2.5.0 源码