1. 概述
Kafka 使用日志文件
的方式保存生产者发送的消息。每条消息都有一个 offset
值来标识它在分区中的偏移量,这个offset 是逻辑值
,并不是消息实际存放的物理地址。
offset
值类似与数据库表中的主键,主键唯一确定了数据库表中的一条记录offset
唯一确定了分区的一条消息。
如下图所示:
-
Topic
:Kafka 消息以Topic
为基本单位进行归类,各个主题在逻辑上相互独立。 -
Partition
:每个主题有可以分为一个或者多个partition
,可以在创建主题的时候指定,也可以之后修改 -
Offset
:每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号
Kafka 为了避免存储的日志文件太大,并不是直接对应磁盘上的一个日志文件,而是对应磁盘上的一个目录,如下图所示:
- 通过
topic_name
-partition_id
来命名,则对应topic 和对应的partition 的log的存储在此目录下的文件中。
Kafka 还引入了 LogSegment
的概念,把Log分隔成多个分段也就是多个 LogSegment,如下如所示:
-
.log(日志文件)
-
.index(偏移量索引文件)
-
.timeindex(时间戳索引文件)
-
其他文件
-
activeSegment
:向Log 中追加消息时是顺序写入的,只有最后一个LogSegment 才能执行写入操作,之前的都不行。最后一个LogSegment 称之为ActiveSegment
即表示活跃的日志分段,- 当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment
-
每个
LogSegment
都有一个基准偏移量baseOffset
,用来表示当前LogSegment
中第一条消息的offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为20位数字,没有达到的位数则用0填充。比如第一个LogSegment的基准偏移量为0,对应的日志文件为 00000000000000000000.log。
在 Kafka 服务端(三)kafka-server-start.sh 一文中讲述了我们启动Kafka之后,Kafka 每个模块初始化的入口类。
Kafka
的所有的消息都是通过日志
来存储的,它是通过 LogManager
来进行初始化的,Log
类是真正操作日志的,LogManager 是用来管理 Log 的。
在LogManager 初始化的时候会通过 log.dirs
配置的位置来生成我们的Log,并且会生成5个文件,如下图:
-
cleaner-offset-checkPoint
:清理检查点文件,用来记录每个主题的每个分区中已清理的偏移量 -
log-start-offset-checkpoint
:它用来标识日志的起始偏移量。各个副本在变动LEO
和HW
的过程中,logStartOffset
也有可能随之而动,Kafka 也有一个定时任务来负责将所有分区的logStartOffset书写到起始点文件log-start-offset-checkpoint中,定时周期由broker端参数log.flush.start.offset.checkpoint.interval.ms
来配置,默认值为60000
。 -
meta.properties
:记录一些元数据cluster.id=7YpDHjejRY2QU_U5Wk_CDQ version=0 broker.id=0
-
recovery-point-offset-checkpoint
:Kafka 中会有一个定时任务负责将所有分区的 LEO 刷写到恢复点文件 recovery-point-offset-checkpoint 中,定时周期由 broker 端参数log.flush.offset.checkpoint.interval.ms
来配置,默认值为60000
-
replication-offset-checkpoint
:定时任务负责将所有分区的HW
刷写到复制点文件replication-offset-checkpoint中,定时周期由broker端参数replica.high.watermark.checkpoint.interval.ms
来配置,默认值为5000
这些定时任务在 []() 文中进行分析。这篇文章主要来分析 Kafka 是如何操作日志的。
2. 源码解析
Log 类结构如下图:
LogAppendInfo
:一组待写入消息的各种信息。Log
:核心类,对日志进行一些操作比如添加RollParams
:用于日志是否切分LogOffsetSnapshot
:分区内所有位移元数据的容器类LogMetricNames
:Log的监控指标LogReadInfo
:读取日志返回的数据及其元数据CompletedTxn
:记录已完成事务的元数据,主要用于构建事务
2.1 LogAppendInfo
保存了一组待写入消息的各种信息,比如这组消息中最后一条消息的offset,最大的时间戳是什么等信息。
case class LogAppendInfo(var firstOffset: Option[Long],
var lastOffset: Long, // 消息集合最后一条消息的位移值
var maxTimestamp: Long, // 消息集合最大消息时间戳
var offsetOfMaxTimestamp: Long, // 消息集合最大消息时间戳所属消息的位移值
var logAppendTime: Long, // 写入消息时间戳
var logStartOffset: Long,
var recordConversionStats: RecordConversionStats, // 消息转换统计类,里面记录了执行了格式转换的消息数等数据
sourceCodec: CompressionCodec, // 消息集合中消息使用的压缩器(Compressor)类型,比如是Snappy还是LZ4
targetCodec: CompressionCodec, // 写入消息时需要使用的压缩器类型
shallowCount: Int, // 消息批次数,每个消息批次下可能包含多条消息
validBytes: Int, // 写入消息总字节数
offsetsMonotonic: Boolean, // 消息位移值是否是顺序增加的
lastOffsetOfFirstBatch: Long, // 首个消息批次中最后一条消息的位移
recordErrors: Seq[RecordError] = List(), // 写入消息时出现的异常列表
errorMessage: String = null) { // 错误码
def firstOrLastOffsetOfFirstBatch: Long = firstOffset.getOrElse(lastOffsetOfFirstBatch)
def numMessages: Long = {
firstOffset match {
case Some(firstOffsetVal) if (firstOffsetVal >= 0 && lastOffset >= 0) => (lastOffset - firstOffsetVal + 1)
case _ => 0
}
}
}
firstOffset
:消息集合中第一条消息的位移值lastOffset
:消息集合最后一条消息的位移值maxTimestamp
:消息集合最大消息时间戳offsetOfMaxTimestamp
:消息集合最大消息时间戳所属消息的位移值logAppendTime
:写入消息时间戳logStartOffset
:日志文件的起始偏移量等于第一个日志分段的 baseOffset,但这并不是绝对的,logStartOffset 的值可以通过 DeleteRecordsRequest 请求(比如使用KafkaAdminClient的deleteRecords()方法、使用kafka-delete-records.sh脚本)、日志的清理和截断等操作进行修改。recordConversionStats
:消息转换统计类,里面记录了执行了格式转换的消息数等数据sourceCodec
:消息集合中消息使用的压缩器(Compressor)类型,比如是Snappy还是LZ4targetCodec
:写入消息时需要使用的压缩器类型shallowCount
:消息批次数,每个消息批次下可能包含多条消息validBytes
:写入消息总字节数offsetsMonotonic
:消息位移值是否是顺序增加的lastOffsetOfFirstBatch
:首个消息批次中最后一条消息的位移recordErrors
:写入消息时出现的异常列表errorMessage
:错误码
2.2 RollParams
定义用于控制日志是否要切分的数据结构
case class RollParams(maxSegmentMs: Long,
maxSegmentBytes: Int,
maxTimestampInMessages: Long,
maxOffsetInMessages: Long,
messagesSize: Int,
now: Long)
maxSegmentMs
:这个Segment
持续多久,当到了这个时间如果这个Segment
文件没有满,那么会强制创建新的Segment
,确保可以删除或者压缩日志。maxSegmentBytes
:当前Segment
能存储最大的字节数maxTimestampInMessages
:当前Segment
中最大的时间戳maxOffsetInMessages
:当前Segment
中最大的位移量messagesSize
:当前Segment
中消息的数量now
:切分Segment
的时间
2.3 LogReadInfo
case class LogReadInfo(fetchedData: FetchDataInfo,
highWatermark: Long,
logStartOffset: Long,
logEndOffset: Long,
lastStableOffset: Long)
2.4 LogOffsetSnapshot
分区内所有位移元数据的容器类
case class LogOffsetSnapshot(logStartOffset: Long,
logEndOffset: LogOffsetMetadata,
highWatermark: LogOffsetMetadata,
lastStableOffset: LogOffsetMetadata)
logStartOffset
:日志文件的起始偏移量等于第一个日志分段的baseOffset
,但这并不是绝对的,logStartOffset 的值可以通过 DeleteRecordsRequest 请求(比如使用KafkaAdminClient的deleteRecords()方法、使用kafka-delete-records.sh脚本)、日志的清理和截断等操作进行修改。logEndOffset
:下一条插入日志的的位移值highWatermark
:高水位,消费者只能消费<=
此值的消息,当一条消息发送给Server 端,只有当所有的Replica
同步后才会增加此值lastStableOffset
:这个值和高水位一个意思,但是这个值只有在开启事务才有用,也就是说当开启事务后,消费者只能消费<=
lastStableOffset 的消息。它不会超过 HW
2.5 LogMetricNames
Log的监控指标
object LogMetricNames {
val NumLogSegments: String = "NumLogSegments"
val LogStartOffset: String = "LogStartOffset"
val LogEndOffset: String = "LogEndOffset"
val Size: String = "Size"
}
2.6 CompletedTxn
记录已完成事务的元数据,主要用于构建事务
case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) {
override def toString: String = {
"CompletedTxn(" +
s"producerId=$producerId, " +
s"firstOffset=$firstOffset, " +
s"lastOffset=$lastOffset, " +
s"isAborted=$isAborted)"
}
}
2.7 Log
核心类,对日志进行一些操作比如 添加,Log 是对多个 LogSegment
对象顺序的组合,形成一个逻辑的日志,使用跳表
来对 LogSegment 进行管理。
class Log(@volatile var dir: File, // dir 就是这个日志所在的文件夹路径,也就是主题分区的路径
@volatile var config: LogConfig,
@volatile var logStartOffset: Long, //日志的当前最早位移
@volatile var recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
val time: Time,
val maxProducerIdExpirationMs: Int,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup{
private val lastFlushedTime = new AtomicLong(time.milliseconds)
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
}
dir
:日志所在的文件夹路径,也就是主题分区的路径config
:配置logStartOffset
:当前日志最早的位移值recoveryPoint
:日志的恢复点scheduler
:定时器brokerTopicStats
:broker中topic的状态maxProducerIdExpirationMs
:在ProducerId 被视为过期之后最长等待时间producerIdExpirationCheckIntervalMs
:检查 Producer Id 是否过期的间隔时间topicPartition
:topic - partition 的映射关系producerStateManager
:logDirFailureChannel
:日志失败的channellastFlushedTime
:最后一次flush的时间nextOffsetMetadata
:下一个offset的一些元数据highWatermarkMetadata
:hw的元数据segments
:日志分段通过跳表来存储leaderEpochCache
:leader epoch 的缓存,用于一致性。
2.7.1 locally
Log 类的初始化方法
流程图如下:
locally {
val startMs = time.milliseconds
// create the log directory if it doesn't exist
//① 创建log文件,如果不存在就创建。通过dir属性
Files.createDirectories(dir.toPath)
//② 初始化 Leader Epoch Cache
//创建Leader Epoch 检查点文件
//生成Leader Epoch Cache 对象
initializeLeaderEpochCache()
//③ 加载所有日志分段
val nextOffset = loadSegments()
/* Calculate the offset of the next message */
//④ 更新nextOffsetMetadata 和 logStartOffset
nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
updateLogStartOffset(math.max(logStartOffset, segments.firstEntry.getValue.baseOffset))
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
//⑤ 更新Leader Epoch Cache ,清除无效处理
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
if (!producerStateManager.isEmpty)
throw new IllegalStateException("Producer state must be empty during log initialization")
loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
}
-
①
createDirectories( )
:创建 log 文件夹,如果不存在就创建。 -
②
initializeLeaderEpochCache( )
:初始化 Leader Epoch Cacheprivate def initializeLeaderEpochCache(): Unit = lock synchronized { //2.1 创建 leader-epoch-checkpoint 文件 val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) def newLeaderEpochFileCache(): LeaderEpochFileCache = { val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile) } //2.2 实例化 LeaderEpochFileCache 对象,此对象用来缓存 tp 和 leo的关系 if (recordVersion.precedes(RecordVersion.V2)) { val currentCache = if (leaderEpochFile.exists()) Some(newLeaderEpochFileCache()) else None if (currentCache.exists(_.nonEmpty)) warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion") Files.deleteIfExists(leaderEpochFile.toPath) leaderEpochCache = None } else { leaderEpochCache = Some(newLeaderEpochFileCache()) } }
- 2.1 创建
leader-epoch-checkpoint
文件,此文件是用于记录每个当前Kafka Leader 的epoch,用于一致性。 - 2.2 实例化
LeaderEpochFileCache
对象,此对象用来缓存tp
和leo
的关系
- 2.1 创建
-
③
loadSegments( )
:加载所有日志分段 -
④ 更新
nextOffsetMetadata
和logStartOffset
-
⑤ 更新 Leader Epoch Cache ,清除无效处理
2.7.1.1 loadSegments
在 Log
类的初始化 locally
方法中的第 ③
步 通过loadSegments( )
方法加载当前所有的 Segments
。
大致逻辑:
- 移除上次
Failure
遗留下来的各种临时文件(包括.cleaned、.swap、.deleted
文件等) - 清空所有日志段对象,并且再次遍历分区路径,重建日志段
segments Map
以及索引文件。 - 待执行完这两次遍历之后,它会完成未完成的 swap 操作,即调用
completeSwapOperations
方法。 - 等这些都做完之后,再调用
recoverLog
方法恢复日志段对象,然后返回恢复之后的分区日志 LEO 值。
private def loadSegments(): Long = {
//① 移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等)
val swapFiles = removeTempFilesAndCollectSwapFiles()
retryOnOffsetOverflow {
logSegments.foreach(_.close())
segments.clear()
//② 重新加载日志段文件
loadSegmentFiles()
}
//③ 处理第①步返回的有效.swap 文件集合
completeSwapOperations(swapFiles)
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
val nextOffset = retryOnOffsetOverflow {
//④ 恢复日志
recoverLog()
}
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
} else {
if (logSegments.isEmpty) {
addSegment(LogSegment.open(dir = dir,
baseOffset = 0,
config,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize,
preallocate = false))
}
0
}
}
- ①
removeTempFilesAndCollectSwapFiles( )
:移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等)private def removeTempFilesAndCollectSwapFiles(): Set[File] = { //在方法内部定义一个名为deleteIndicesIfExist的方法,用于删除日志文件对应的索引文件 def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = { info(s"Deleting index files with suffix $suffix for baseFile $baseFile") val offset = offsetFromFile(baseFile) Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath) Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath) Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath) } var swapFiles = Set[File]() var cleanFiles = Set[File]() var minCleanedFileOffset = Long.MaxValue //① 遍历分区日志路径下的所有文件 for (file <- dir.listFiles if file.isFile) { if (!file.canRead) //如果不可读,直接抛出IOException throw new IOException(s"Could not read file $file") val filename = file.getName //② 如果文件是以.deleted结尾,说明是上次Failure遗留下来的文件,直接删除 if (filename.endsWith(DeletedFileSuffix)) { debug(s"Deleting stray temporary file ${file.getAbsolutePath}") Files.deleteIfExists(file.toPath) //③ 如果是以.cleaned结尾 } else if (filename.endsWith(CleanedFileSuffix)) { //选取文件名中位移值最小的.cleaned文件,获取其位移值,并将该文件加入待删除文件集合中 minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) cleanFiles += file //④ 如果以.swap结尾 } else if (filename.endsWith(SwapFileSuffix)) { // we crashed in the middle of a swap operation, to recover: // if a log, delete the index files, complete the swap operation later // if an index just delete the index files, they will be rebuilt val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.") if (isIndexFile(baseFile)) { // 如果该.swap文件原来是索引文件 deleteIndicesIfExist(baseFile) // 删除原来的索引文件 } else if (isLogFile(baseFile)) { // 如果该.swap文件原来是日志文件 deleteIndicesIfExist(baseFile) // 删除掉原来的索引文件 swapFiles += file // 加入待恢复的.swap文件集合中 } } } // KAFKA-6264: Delete all .swap files whose base offset is greater than the minimum .cleaned segment offset. Such .swap // files could be part of an incomplete split operation that could not complete. See Log#splitOverflowedSegment // for more details about the split operation. // 从待恢复swap集合中找出那些起始位移值大于minCleanedFileOffset值的文件,直接删掉这些无效的.swap文件 val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset) invalidSwapFiles.foreach { file => debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset") val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) deleteIndicesIfExist(baseFile, SwapFileSuffix) Files.deleteIfExists(file.toPath) } // Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files //清除所有待删除文件集合中的文件 cleanFiles.foreach { file => debug(s"Deleting stray .clean file ${file.getAbsolutePath}") Files.deleteIfExists(file.toPath) } //最后返回当前有效的.swap文件集合 validSwapFiles }
- ②
loadSegmentFiles( )
:重新加载日志段文件private def loadSegmentFiles(): Unit = { // load segments in ascending order because transactional data from one segment may depend on the // segments that come before it //按照日志段文件名中的位移值正序排列,然后遍历每个文件 for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) { if (isIndexFile(file)) { // 如果是索引文件 // if it is an index file, make sure it has a corresponding .log file val offset = offsetFromFile(file) val logFile = Log.logFile(dir, offset) // 确保存在对应的日志文件,否则记录一个警告,并删除该索引文件 if (!logFile.exists) { warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.") Files.deleteIfExists(file.toPath) } } else if (isLogFile(file)) { // 如果是日志文件 // if it's a log file, load the corresponding log segment val baseOffset = offsetFromFile(file) val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists() // 创建对应的LogSegment对象实例,并加入segments中 val segment = LogSegment.open(dir = dir, baseOffset = baseOffset, config, time = time, fileAlreadyExists = true) try segment.sanityCheck(timeIndexFileNewlyCreated) catch { case _: NoSuchFileException => error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " + "recovering segment and rebuilding index files...") recoverSegment(segment) case e: CorruptIndexException => warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " + s"to ${e.getMessage}}, recovering segment and rebuilding index files...") recoverSegment(segment) } addSegment(segment) } } }
- ③
completeSwapOperations( )
:处理第①步返回的有效.swap 文件集合private def completeSwapOperations(swapFiles: Set[File]): Unit = { //① 遍历所有有效.swap文件 for (swapFile <- swapFiles) { val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) //拿到日志文件的起始位移值 val baseOffset = offsetFromFile(logFile) //② 创建对应的LogSegment实例 val swapSegment = LogSegment.open(swapFile.getParentFile, baseOffset = baseOffset, config, time = time, fileSuffix = SwapFileSuffix) info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.") //③ 执行日志段恢复操作 recoverSegment(swapSegment) // We create swap files for two cases: // (1) Log cleaning where multiple segments are merged into one, and // (2) Log splitting where one segment is split into multiple. // // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to // do a replace with an existing segment. //④确认之前删除日志段是否成功,是否还存在老的日志段文件 val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment => segment.readNextOffset > swapSegment.baseOffset } // ⑤ 如果存在,直接把.swap文件重命名成.log replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true) } }
- ④
recoverLog( )
:恢复日志private def recoverLog(): Long = { // if we have the clean shutdown marker, skip recovery //① 如果不存在以.kafka_cleanshutdown结尾的文件。通常都不存在 if (!hasCleanShutdownFile) { // okay we need to actually recover this log //获取到上次恢复点以外的所有unflushed日志段对象 val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator var truncated = false //遍历这些unflushed日志段 while (unflushed.hasNext && !truncated) { val segment = unflushed.next info(s"Recovering unflushed segment ${segment.baseOffset}") val truncatedBytes = try { //执行恢复日志段操作 recoverSegment(segment, leaderEpochCache) } catch { case _: InvalidOffsetException => val startOffset = segment.baseOffset warn("Found invalid offset during recovery. Deleting the corrupt segment and " + s"creating an empty one with starting offset $startOffset") segment.truncateTo(startOffset) } // 如果有无效的消息导致被截断的字节数不为0,直接删除剩余的日志段对象 if (truncatedBytes > 0) { // we had an invalid message, delete all remaining log warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}") removeAndDeleteSegments(unflushed.toList, asyncDelete = true) truncated = true } } } // 这些都做完之后,如果日志段集合不为空 if (logSegments.nonEmpty) { val logEndOffset = activeSegment.readNextOffset // 验证分区日志的LEO值不能小于Log Start Offset值,否则删除这些日志段对象 if (logEndOffset < logStartOffset) { warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " + "This could happen if segment files were deleted from the file system.") removeAndDeleteSegments(logSegments, asyncDelete = true) } } // 这些都做完之后,如果日志段集合为空了 if (logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at logStartOffset // 至少创建一个新的日志段,以logStartOffset为日志段的起始位移,并加入日志段集合中 addSegment(LogSegment.open(dir = dir, baseOffset = logStartOffset, config, time = time, fileAlreadyExists = false, initFileSize = this.initFileSize, preallocate = config.preallocate)) } // 更新上次恢复点属性,并返回 recoveryPoint = activeSegment.readNextOffset recoveryPoint }
recoverSegment
:循环恢复日志分段,在Kafka 日志模块(二)LogSegment一文中进行分析。
Log 初始化完成之后接下来就是对
Log
的一些操作,比如重要的就是 HW、LogSegment、日志位移、读写日志。
2.7.2 HW 高水位
高水位值的初始值是 Log Start Offset
值。每个 Log 对象都会维护一个 Log Start Offset
值。当首次构建高水位时,它会被赋值成 Log Start Offset
值。它是表示消费者只能看到 <= HW
的值。
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
可以看到它是一个 LogOffsetMetadata
对象
case class LogOffsetMetadata(messageOffset: Long,
segmentBaseOffset: Long = Log.UnknownOffset,
relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
def onOlderSegment(that: LogOffsetMetadata): Boolean = {
if (messageOffsetOnly)
throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
this.segmentBaseOffset < that.segmentBaseOffset
}
def onSameSegment(that: LogOffsetMetadata): Boolean = {
if (messageOffsetOnly)
throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
this.segmentBaseOffset == that.segmentBaseOffset
}
def offsetDiff(that: LogOffsetMetadata): Long = {
this.messageOffset - that.messageOffset
}
def positionDiff(that: LogOffsetMetadata): Int = {
if(!onSameSegment(that))
throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment")
if(messageOffsetOnly)
throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")
this.relativePositionInSegment - that.relativePositionInSegment
}
def messageOffsetOnly: Boolean = {
segmentBaseOffset == Log.UnknownOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition
}
-
messageOffset 参数
:消息位移值,这是最重要的信息。我们总说高水位值,其实指的就是这个变量的值。 -
segmentBaseOffset 参数
:保存该位移值所在日志段的起始位移。日志段起始位移值辅助计算两条消息在物理磁盘文件中位置的差值,即两条消息彼此隔了多少字节。- 这个计算有个前提条件,即两条消息必须处在同一个日志段对象上,不能跨日志段对象。否则它们就位于不同的物理文件上,计算这个值就没有意义了。
- 这里的 segmentBaseOffset,就是用来判断两条消息是否处于同一个日志段的。
-
relativePositionInSegment 参数
:保存该位移值所在日志段的物理磁盘位置。这个字段在计算两个位移值之间的物理磁盘位置差值时非常有用。 -
onOlderSegment( )方法
:判断给定的LogOffsetMetadata
对象是否是在新的日志段 -
onSameSegment( )方法
:来判断给定的两个LogOffsetMetadata
对象是否处于同一个日志段的 -
offsetDiff( )方法
:计算两个LogOffsetMetadata
对象中隔了多少消息(offset 差值) -
positionDiff( )方法
:计算两个LogOffsetMetadata
对象的物理位置差值。
2.7.2.1 获取和设置高水位
// getter method:读取高水位的位移值
def highWatermark: Long = highWatermarkMetadata.messageOffset
// setter method:设置高水位值
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
if (newHighWatermark.messageOffset < 0) // 高水位值不能是负数
throw new IllegalArgumentException("High watermark offset should be non-negative")
lock synchronized { // 保护Log对象修改的Monitor锁
highWatermarkMetadata = newHighWatermark // 赋值新的高水位值
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) // 处理事务状态管理器的高水位值更新逻辑,忽略它……
maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事务机制的一部分
}
trace(s"Setting high watermark $newHighWatermark")
}
2.7.2.2 更新高水位
// updateHighWatermark method
def updateHighWatermark(hw: Long): Long = {
// 新高水位值一定介于[Log Start Offset,Log End Offset]之间
val newHighWatermark = if (hw < logStartOffset)
logStartOffset
else if (hw > logEndOffset)
logEndOffset
else
hw
// 调用Setter方法来更新高水位值
updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
newHighWatermark // 最后返回新高水位值
}
// maybeIncrementHighWatermark method
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
// 新高水位值不能越过Log End Offset
if (newHighWatermark.messageOffset > logEndOffset)
throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
s"log end offset $logEndOffsetMetadata")
lock.synchronized {
val oldHighWatermark = fetchHighWatermarkMetadata // 获取老的高水位值
// 新高水位值要比老高水位值大以维持单调增加特性,否则就不做更新!
// 另外,如果新高水位值在新日志段上,也可执行更新高水位操作
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
updateHighWatermarkMetadata(newHighWatermark)
Some(oldHighWatermark) // 返回老的高水位值
} else {
None
}
}
}
更新高水位有两个方法一个是 updateHighWatermark
和 maybeIncrementHighWatermark
updateHighWatermark
:主要是Follower 副本从 Leader 副本获取到消息后更新高水位值。一旦拿到立即更新maybeIncrementHighWatermark
:主要是Leader 副本接收Producer 消息,它不会立即更新,而是要根据一定的条件来会更新。- 新高水位值要比老高水位值大以维持单调增加特性,否则就不做更新。
- 如果新高水位值在新日志段上,也可执行更新高水位操作。
2.7.2.3 读取高水位
private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
checkIfMemoryMappedBufferClosed() // 读取时确保日志不能被关闭
val offsetMetadata = highWatermarkMetadata // 保存当前高水位值到本地变量,避免多线程访问干扰
if (offsetMetadata.messageOffsetOnly) { //没有获得到完整的高水位元数据
lock.synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(highWatermark) // 通过读日志文件的方式把完整的高水位元数据信息拉出来
updateHighWatermarkMetadata(fullOffset) // 然后再更新一下高水位对象
fullOffset
}
} else { // 否则,直接返回即可
offsetMetadata
}
}
2.7.3 LogSegment 日志分段
通过Java中的 ConcurrentSkipListMap 进行存储,线程安全。
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
//主要方法
def deleteOldSegments(): Int = {
if (config.delete) {
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
} else {
deleteLogStartOffsetBreachedSegments()
}
}
// deleteRetentionMsBreachedSegments 方法
private def deleteRetentionMsBreachedSegments(): Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
reason = s"retention time ${config.retentionMs}ms breach")
}
// deleteRetentionSizeBreachedSegments 方法
private def deleteRetentionSizeBreachedSegments(): Int = {
if (config.retentionSize < 0 || size < config.retentionSize) return 0
var diff = size - config.retentionSize
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
if (diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
}
// deleteLogStartOffsetBreachedSegments 方法
private def deleteLogStartOffsetBreachedSegments(): Int = {
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
}
这三个方法都调用了 deleteOldSegments
方法
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
lock synchronized {
//① 使用传入的函数计算哪些日志段对象能够被删除
val deletable = deletableSegments(predicate)
if (deletable.nonEmpty)
info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
//② 删除这些日志段
deleteSegments(deletable)
}
}
- ① 使用传入的函数计算哪些日志段对象能够被删除
private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { // 如果当前压根就没有任何日志段对象,直接返回 if (segments.isEmpty) { Seq.empty } else { val deletable = ArrayBuffer.empty[LogSegment] var segmentEntry = segments.firstEntry // 从具有最小起始位移值的日志段对象开始遍历,直到满足以下条件之一便停止遍历: //1. 测定条件函数predicate = false //2. 扫描到包含Log对象高水位值所在的日志段对象 //3. 最新的日志段对象不包含任何消息 //最新日志段对象是segments中Key值最大对应的那个日志段,也就是我们常说的Active Segment。 //完全为空的Active Segment如果被允许删除,后面还要重建它,故代码这里不允许删除大小为空的Active Segment。 //在遍历过程中,同时不满足以上3个条件的所有日志段都是可以被删除的! while (segmentEntry != null) { val segment = segmentEntry.getValue val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey) val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null) (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false) else (null, logEndOffset, segment.size == 0) if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) { deletable += segment segmentEntry = nextSegmentEntry } else { segmentEntry = null } } deletable } }
- ② 删除这些日志段
private def deleteSegments(deletable: Iterable[LogSegment]): Int = { maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { val numToDelete = deletable.size if (numToDelete > 0) { // we must always have at least one segment, so if we are going to delete all the segments, create a new one first // 不允许删除所有日志段对象。如果一定要做,先创建出一个新的来,然后再把前面N个删掉 if (segments.size == numToDelete) roll() lock synchronized { checkIfMemoryMappedBufferClosed()// 确保Log对象没有被关闭 // remove the segments for lookups // 删除给定的日志段对象以及底层的物理文件 removeAndDeleteSegments(deletable, asyncDelete = true) // 尝试更新日志的Log Start Offset值 maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset) } } numToDelete } }
2.7.4 日志位移
Log 对象维护了一些关键位移值数据,比如 Log Start Offset
、LEO
等。
//LEO的定义
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
//更新 LEO 的方法
private def updateLogEndOffset(offset: Long): Unit = {
nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size)
if (highWatermark >= offset) {
updateHighWatermarkMetadata(nextOffsetMetadata)
}
if (this.recoveryPoint > offset) {
this.recoveryPoint = offset
}
}
-
LEO 会被更新的时机:
Log 对象初始化时
:当 Log 对象初始化时,我们必须要创建一个 LEO 对象,并对其进行初始化。写入新消息时
:以上面的图为例,当不断向 Log 对象插入新消息时,LEO 值就像一个指针一样,需要不停地向右移动,也就是不断地增加。Log 对象发生日志切分(Log Roll)时
:在创建一个新的LogSegment
的时候,并且关闭当前写入的日志段对象。这通常发生在当前日志段对象已满的时候。一旦发生日志切分,说明 Log 对象切换了 Active Segment,那么,LEO 中的起始位移值和段大小数据都要被更新,因此,在进行这一步操作时,我们必须要更新 LEO 对象。日志截断(Log Truncation)时
:日志中的部分消息被删除了,自然可能导致 LEO 值发生变化,从而要更新 LEO 对象。
-
Log Start Offset 会被更新的时机:
Log 对象初始化时
:和 LEO 类似,Log 对象初始化时要给Log Start Offset
赋值,一般是将第一个日志段的起始位移值赋值给它。日志截断时
:同理,一旦日志中的部分消息被删除,可能会导致Log Start Offset
发生变化,因此有必要更新该值。Follower 副本同步时
:一旦 Leader 副本的 Log 对象的 Log Start Offset 值发生变化。为了维持和 Leader 副本的一致性,Follower 副本也需要尝试去更新该值。删除日志段时
:这个和日志截断是类似的。凡是涉及消息删除的操作都有可能导致 Log Start Offset 值的变化。删除消息时:严格来说,这个更新时机有点本末倒置了。在 Kafka 中,删除消息就是通过抬高 Log Start Offset 值来实现的,因此,删除消息时必须要更新该值。
2.7.5 读写操作
append 写操作
添加消息到 Log 中,相关写操作有3个: appendAsLeader
、 appendAsFollower
、 append
,调用关系如下:
- appendAsLeader 是用于写
Leader
副本的 - appendAsFollower 是用于
Follower
副本同步的。 - 它们的底层都调用了 append 方法。
下图是 append 方法的执行流程:
private def append(records: MemoryRecords,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo = {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
// ① 分析和验证待写入消息集合,并返回校验结果,验证消息的长度、crc32校验码、内部offset 是否单调递增,都是对外层消息进行的,并不会解压内部的压缩消息
val appendInfo = analyzeAndValidateRecords(records, origin)
// return if we have no valid messages or if this is a duplicate of the last appended entry
// 如果压根就不需要写入任何消息,直接返回即可(也就是说在第①步中没有发现有message 这个这里 shallowCount 是0)
if (appendInfo.shallowCount == 0)
return appendInfo
// trim any invalid bytes or partial messages before appending it to the on-disk log
// ② 消息格式规整,即删除无效格式消息或无效字节,根据步骤 ① 方法返回的LogAppendInfo 对象,将未通过验证的消息截断
var validRecords = trimInvalidBytes(records, appendInfo)
// they are valid, insert them in the log
lock synchronized {
checkIfMemoryMappedBufferClosed()//确保log对象未关闭
if (assignOffsets) {//需要分配位移
// assign offsets to the message set
//③ 使用当前LEO值作为待写入消息集合的第一条消息位移值
val offset = new LongRef(nextOffsetMetadata.messageOffset)
appendInfo.firstOffset = Some(offset.value)
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
LogValidator.validateMessagesAndAssignOffsets(validRecords,
topicPartition,
offset,
time,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
config.messageFormatVersion.recordVersion.value,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
origin,
interBrokerProtocolVersion,
brokerTopicStats)
} catch {
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
}
//更新校验结果对象类 LogAppendInfo
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1
appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
// re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
// format conversion)
//④ 验证消息,保证消息大小不超限
if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
for (batch <- validRecords.batches.asScala) {
if (batch.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
}
}
}
} else {//直接使用给定的移位值,无需自己分配位移值
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic)//确保消息移植的单调递增性
throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
records.records.asScala.map(_.offset))
if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
// we may still be able to recover if the log is empty
// one example: fetching from log start offset on the leader which is not batch aligned,
// which may happen as a result of AdminClient#deleteRecords()
val firstOffset = appendInfo.firstOffset match {
case Some(offset) => offset
case None => records.batches.asScala.head.baseOffset()
}
val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
throw new UnexpectedAppendOffsetException(
s"Unexpected offset in append to $topicPartition. $firstOrLast " +
s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
firstOffset, appendInfo.lastOffset)
}
}
// update the epoch cache with the epoch stamped onto the message by the leader
//⑤ 更新Leader Epoch缓存
validRecords.batches.asScala.foreach { batch =>
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
} else {
// In partial upgrade scenarios, we may get a temporary regression to the message format. In
// order to ensure the safety of leader election, we clear the epoch cache so that we revert
// to truncation by high watermark after the next leader election.
leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
cache.clearAndFlush()
}
}
}
// check messages set size may be exceed config.segmentSize
//⑥ 确保消息大小不超限
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
}
// maybe roll the log if this segment is full
//⑦ 执行日志切分,当前日志断剩余容量可能无法容纳新消息集合,因此要创建一个(如果满足需要创建一个新的activeSegment,然后返回当前的activeSegment)
// 1. 当前activeSegment 的日志大小加上本次待追加的消息集合大小,超过配置的LogSegment 的最大长度
// 2. 当前activeSegment 的寿命超过了配置的 LogSegment 最长存活时间
// 3. 索引文件满了
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
val logOffsetMetadata = LogOffsetMetadata(
messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
segmentBaseOffset = segment.baseOffset,
relativePositionInSegment = segment.size)
// now that we have valid records, offsets assigned, and timestamps updated, we need to
// validate the idempotent/transactional state of the producers and collect some metadata
//⑧ 验证事务状态
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
logOffsetMetadata, validRecords, origin)
maybeDuplicate.foreach { duplicate =>
appendInfo.firstOffset = Some(duplicate.firstOffset)
appendInfo.lastOffset = duplicate.lastOffset
appendInfo.logAppendTime = duplicate.timestamp
appendInfo.logStartOffset = logStartOffset
return appendInfo
}
//⑨:执行真正的消息写入操作,主要调用日志段对象的append方法实现
segment.append(largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
// ⑩:更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1
//前面说过,LEO值永远指向下一条不存在的消息
updateLogEndOffset(appendInfo.lastOffset + 1)
// update the producer state
//⑪:更新事务状态
for (producerAppendInfo <- updatedProducers.values) {
producerStateManager.update(producerAppendInfo)
}
// update the transaction index with the true last stable offset. The last offset visible
// to consumers using READ_COMMITTED will be limited by this value and the high watermark.
for (completedTxn <- completedTxns) {
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
segment.updateTxnIndex(completedTxn, lastStableOffset)
producerStateManager.completeTxn(completedTxn)
}
// always update the last producer id map offset so that the snapshot reflects the current offset
// even if there isn't any idempotent data being written
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
// update the first unstable offset (which is used to compute LSO)
maybeIncrementFirstUnstableOffset()
trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
s"first offset: ${appendInfo.firstOffset}, " +
s"next offset: ${nextOffsetMetadata.messageOffset}, " +
s"and messages: $validRecords")
// 是否需要手动落盘。一般情况下我们不需要设置Broker端参数log.flush.interval.messages
// 落盘操作交由操作系统来完成。但某些情况下,可以设置该参数来确保高可靠性
if (unflushedMessages >= config.flushInterval)
flush()
//⑫ 返回写入结果
appendInfo
}
}
}
-
① 分析和验证待写入消息集合,并返回校验结果,验证消息的长度、crc32校验码、内部offset 是否单调递增,都是对
外层消息
进行的,并不会解压内部的压缩消息private def analyzeAndValidateRecords(records: MemoryRecords, origin: AppendOrigin): LogAppendInfo = { var shallowMessageCount = 0 //记录外层消息的数量 var validBytesCount = 0 //记录通过验证的 records 的字节数之和 var firstOffset: Option[Long] = None //记录第一条消息的offset,就是baseOffset var lastOffset = -1L //记录最后一条消息的offset var sourceCodec: CompressionCodec = NoCompressionCodec //消息的压缩方法 var monotonic = true //标识生产者为消息分配的内部 offset 是否单调递增,使用浅层迭代器进行迭代,如果是压缩消息,并不会解压缩 var maxTimestamp = RecordBatch.NO_TIMESTAMP //最大的时间戳 var offsetOfMaxTimestamp = -1L //最大时间戳的offset var readFirstMessage = false //是否已经读了第一条消息的 offset。也就是这个batch的 baseOffset var lastOffsetOfFirstBatch = -1L //和fistOffset 一个意思,只是这个是旧版本 < v2 版本 for (batch <- records.batches.asScala) { // we only validate V2 and higher to avoid potential compatibility issues with older clients // 消息格式Version 2的消息批次,起始位移值必须从0开始 if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == AppendOrigin.Client && batch.baseOffset != 0) throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " + s"be 0, but it is ${batch.baseOffset}") //一开始readFirstMessage 是 false 会去获取这个 batch 的baseOffset。 //这里有两种获取方式: //1. 如果是老版本的kafka 那么通过这个batch的 lastOffset。 //2. 如果是新版本的 >= v2,那么在Batch的Header 里面就有这个baseOffset,可以直接获取 //然后修改 readFirstMessage 为true if (!readFirstMessage) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) firstOffset = Some(batch.baseOffset) // 更新firstOffset字段 lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段 readFirstMessage = true } // check that offsets are monotonically increasing //检查1: 一旦出现当前lastOffset不小于下一个batch的lastOffset,说明上一个batch中有消息的位移值大于后面batch的消息 // 这违反了位移值单调递增性 if (lastOffset >= batch.lastOffset) monotonic = false // update the last offset seen // 使用当前batch最后一条消息的位移值去更新lastOffset lastOffset = batch.lastOffset // Check if the message sizes are valid. //检查2: 检查消息批次总字节数大小是否超限,即是否大于Broker端参数max.message.bytes值 val batchSize = batch.sizeInBytes if (batchSize > config.maxMessageSize) { brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + s"which exceeds the maximum configured value of ${config.maxMessageSize}.") } // check the validity of the message by checking CRC //检查3: 执行消息批次校验,包括格式是否正确以及CRC校验 if (!batch.isValid) { brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark() throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.") } // 更新maxTimestamp字段和offsetOfMaxTimestamp if (batch.maxTimestamp > maxTimestamp) { maxTimestamp = batch.maxTimestamp offsetOfMaxTimestamp = lastOffset } // 累加消息批次计数器以及有效字节数,更新shallowMessageCount字段 shallowMessageCount += 1 validBytesCount += batchSize // 从消息批次中获取压缩器类型 val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id) if (messageCodec != NoCompressionCodec) sourceCodec = messageCodec } // Apply broker-side compression if any // 获取Broker端设置的压缩器类型,即Broker端参数compression.type值。 // 该参数默认值是producer,表示sourceCodec用的什么压缩器,targetCodec就用什么 val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) // 最后生成LogAppendInfo对象并返回 LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch) }
-
② 消息格式规整,即删除无效格式消息或无效字节,根据步骤
①
方法返回的LogAppendInfo
对象,将未通过验证的消息截断private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = { //写入消息总字节数 val validBytes = info.validBytes if (validBytes < 0) throw new CorruptRecordException(s"Cannot append record batch with illegal length $validBytes to " + s"log for $topicPartition. A possible cause is a corrupted produce request.") if (validBytes == records.sizeInBytes) { records } else { // trim invalid bytes //超过部分就会被截断 val validByteBuffer = records.buffer.duplicate() validByteBuffer.limit(validBytes) MemoryRecords.readableRecords(validByteBuffer) } }
-
③ 使用当前LEO值作为待写入消息集合的第一条消息位移值
val offset = new LongRef(nextOffsetMetadata.messageOffset) appendInfo.firstOffset = Some(offset.value)
-
④ 验证消息,保证消息大小不超限
-
⑤ 更新
Leader Epoch
缓存 -
⑥ 确保消息大小不超限
-
⑦ 执行日志切分,当前日志断剩余容量可能无法容纳新消息集合,因此要创建一个(如果满足需要创建一个新的
activeSegment
,然后返回当前的activeSegment)-
- 当前
activeSegment
的日志大小加上本次待追加的消息集合大小,超过配置的 LogSegment 的最大长度
- 当前
-
- 当前
activeSegment
的寿命超过了配置的 LogSegment 最长存活时间
- 当前
-
- 索引文件满了
private def maybeRoll(messagesSize: Int, appendInfo: LogAppendInfo): LogSegment = { val segment = activeSegment val now = time.milliseconds val maxTimestampInMessages = appendInfo.maxTimestamp val maxOffsetInMessages = appendInfo.lastOffset // 在shouldRoll 方法中 判断是否需要创建一个 segment if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) { appendInfo.firstOffset match { //创建新的activeSegment 调用 roll 方法 //这里的 activeSegment 其实就是 LogSegment,由于是Kafka 是顺序添加的,所以最后一个LogSegment 就是正在被添加 record 的 Segment 称作为 activeSegment case Some(firstOffset) => roll(Some(firstOffset)) case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE)) } } else { //通过上面的 shouldRoll 方法得出不需要重新创建activeSegment ,则直接返回 segment } } //是否要重新创建一个 segment def shouldRoll(rollParams: RollParams): Boolean = { val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs size > rollParams.maxSegmentBytes - rollParams.messagesSize || //条件1 logSegment 满了 (size > 0 && reachedRollMs) || //条件2 当前activeSegment 的寿命超过了配置的 LogSegment 最长存活时间 //条件3 索引满了 offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages) } //这里是roll方法,通过shouldRoll 方法如果满足条件需要创建一个Segment 则会调用此方法 def roll(expectedNextOffset: Option[Long] = None): LogSegment = { maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") { val start = time.hiResClockMs() lock synchronized { // 加锁 多Handler 线程操作会引发线程问题 checkIfMemoryMappedBufferClosed() //获取 LEO: LEO是即将要插入的数据 log end offset val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset) //新日志文件的文件名是 [LEO].log val logFile = Log.logFile(dir, newOffset) if (segments.containsKey(newOffset)) { // segment with the same base offset already exists and loaded if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) { // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0). warn(s"Trying to roll a new log segment with start offset $newOffset " + s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " + s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + s" size of offset index: ${activeSegment.offsetIndex.entries}.") removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true) } else { throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" + s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " + s"segment is ${segments.get(newOffset)}.") } } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) { throw new KafkaException( s"Trying to roll a new log segment for topic partition $topicPartition with " + s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment") } else { val offsetIdxFile = offsetIndexFile(dir, newOffset) val timeIdxFile = timeIndexFile(dir, newOffset) val txnIdxFile = transactionIndexFile(dir, newOffset) for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") Files.delete(file.toPath) } Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment()) } producerStateManager.updateMapEndOffset(newOffset) producerStateManager.takeSnapshot() //新创建的LogSegment val segment = LogSegment.open(dir, baseOffset = newOffset, config, time = time, fileAlreadyExists = false, initFileSize = initFileSize, preallocate = config.preallocate) //将新创建的Segment 添加到 segments 这个跳表中。 addSegment(segment) //更新 nextOffsetMetadata 这次更新的目的是为了更新其中记录的 activeSegment.baseOffset 和 activeSegment.size ,而 LEO 并不会改变。 updateLogEndOffset(nextOffsetMetadata.messageOffset) // schedule an asynchronous flush of the old segment //定时任务 执行flush 操作 scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.") segment //返回新建的 activeSegment } } }
-
-
⑧ 验证事务状态
-
⑨:执行真正的消息写入操作,主要调用
LogSegment
对象的append
方法实现segment.append(largestOffset = appendInfo.lastOffset, largestTimestamp = appendInfo.maxTimestamp, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords)
- 调用
LogSegment.append
添加日志,在 Kafka 日志模块(二)LogSegment 一文中进行分析
- 调用
-
⑩:更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1
private def updateLogEndOffset(offset: Long): Unit = { nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size) // Update the high watermark in case it has gotten ahead of the log end offset following a truncation // or if a new segment has been rolled and the offset metadata needs to be updated. if (highWatermark >= offset) { updateHighWatermarkMetadata(nextOffsetMetadata) } if (this.recoveryPoint > offset) { this.recoveryPoint = offset } }
- 更新
HW
在2.7.2.2 更新高水位小节
中有分析
- 更新
-
⑪:更新事务状态
-
⑫:返回写入结果
read
def read(startOffset: Long,
maxLength: Int,
isolation: FetchIsolation,
minOneMessage: Boolean): FetchDataInfo = {
maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")
val includeAbortedTxns = isolation == FetchTxnCommitted
// 读取消息时没有使用Monitor锁同步机制,因此这里取巧了,用本地变量的方式把LEO对象保存起来,避免争用(race condition)
val endOffsetMetadata = nextOffsetMetadata
val endOffset = nextOffsetMetadata.messageOffset
//① 如果从LEO处开始读取,那么自然不会返回任何数据,直接返回空消息集合即可
if (startOffset == endOffset)
return emptyFetchDataInfo(endOffsetMetadata, includeAbortedTxns)
//② 找到startOffset值所在的日志段对象。注意要使用floorEntry方法
var segmentEntry = segments.floorEntry(startOffset)
// return error on attempt to read beyond the log end offset or read below log start offset
//③ 满足以下条件之一将被视为消息越界,即你要读取的消息不在该Log对象中:
// 1. 要读取的消息位移超过了LEO值
// 2. 没找到对应的日志段对象
// 3. 要读取的消息在Log Start Offset之下,同样是对外不可见的消息
if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
s"but we only have log segments in the range $logStartOffset to $endOffset.")
//④ 查看一下读取隔离级别设置。
// 普通消费者能够看到[Log Start Offset, LEO)之间的消息
// 事务型消费者只能看到[Log Start Offset, Log Stable Offset]之间的消息。Log Stable Offset(LSO)是比LEO值小的位移值,为Kafka事务使用
// Follower副本消费者能够看到[Log Start Offset,高水位值]之间的消息
val maxOffsetMetadata = isolation match {
case FetchLogEnd => nextOffsetMetadata
case FetchHighWatermark => fetchHighWatermarkMetadata
case FetchTxnCommitted => fetchLastStableOffsetMetadata
}
//⑤ 如果要读取的起始位置超过了能读取的最大位置,返回空的消息集合,因为没法读取任何消息
if (startOffset > maxOffsetMetadata.messageOffset) {
val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
}
//⑥ 开始遍历日志段对象,直到读出东西来或者读到日志末尾
while (segmentEntry != null) {
val segment = segmentEntry.getValue
val maxPosition = {
// Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
maxOffsetMetadata.relativePositionInSegment
} else {
segment.size
}
}
//⑦ 调用Segment.read方法执行真正的读取消息操作
val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
if (fetchInfo == null) { // 如果没有返回任何消息,去下一个日志段对象试试
segmentEntry = segments.higherEntry(segmentEntry.getKey)
} else { // 否则返回
return if (includeAbortedTxns)
addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
else
fetchInfo
}
}
//⑧ 已经读到日志末尾还是没有数据返回,只能返回空消息集合
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
}
-
startOffset
参数:开始读取消息的offset位置 -
maxLength
参数:最大读取多少字节 -
isolation
参数:提取隔离,控制我们允许读取的最大偏移量,它是FetchIsolation
有三个实现类如下:FetchHighWatermark
: 只能消费到 HW 的位置FetchLogEnd
: 能消费到LEO的位置FetchTxnCommitted
: 只能消费到事务提交的offset的位置。
-
minOneMessage
参数:拉取的消息如果超过maxLength
设置的值,是否要返回,true是返回。 -
① 如果从LEO处开始读取,那么自然不会返回任何数据,直接返回空消息集合即可
-
② 找到startOffset值所在的日志段对象。注意要使用floorEntry方法
-
③ 满足以下条件之一将被视为消息越界,即你要读取的消息不在该Log对象中:
- 要读取的消息位移超过了LEO值
- 没找到对应的日志段对象
- 要读取的消息在Log Start Offset之下,同样是对外不可见的消息
-
④ 查看一下读取隔离级别设置。根据
isolation
参数 -
⑤ 如果要读取的起始位置超过了能读取的最大位置,返回空的消息集合,因为没法读取任何消息
-
⑥ 开始遍历日志段对象,直到读出东西来或者读到日志末尾
-
⑦ 调用
Segment#read
方法执行真正的读取消息操作 -
⑧ 已经读到日志末尾还是没有数据返回,只能返回空消息集合
由于篇幅问题,在 跳表在 Kafka 中的实现 一文中会详细分析跳表
在Kafka中的使用。
参考:
Kafka-2.5.0 源码
《Apache Kafka源码剖析》
《深入理解Kafka:核心设计与实践》