概述
在 Kafka 日志模块(一)Log 一文中讲到了Kafka 为了防止日志文件太大,把日志文件分成多个 LogSegment
,而在每个 Segment
中又有索引文件
,为了快速查找所需要的消息。并且Log类中的Append
方法和 Read
方法最终都是调用的 LogSegment
相对应的方法,也就是说 Log 是LogSegment的抽象。
源码分析
LogSegment
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging {
log
: 用于操作对应日志文件的FileRecords
对象lazyOffsetIndex
:索引,对应LogSegment中的index索引lazyTimeIndex
:索引,对应LogSegment中的timeindex索引txnIndex
:事务有关索引baseOffset
:LogSegment 中第一条消息的 offset 值,称作为BaseOffset
indexIntervalBytes
:索引项之间间隔的的最小字节数rollJitterMs
:生成Segment的时间,由定时生成segment的时间 - 随机生成的值
append 方法
由 Log 中的 append
方法调用,在 Kafka 日志模块(一)Log 一文中的 2.7.5 读写操作
中有分析。
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
//该条目需要添加的位置
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
//确保索引在这个分段范围内
ensureOffsetInRange(largestOffset)
// append the messages
//① 添加日志
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
//② 给offsetIndex 添加记录
offsetIndex.append(largestOffset, physicalPosition)
//③ 给timeIndex 添加记录
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
-
参数
largestOffset
:在这个分段中最大的位移量 -
参数
largestTimestamp
:在这个分段中最大的时间戳 -
参数
shallowOffsetOfMaxTimestamp
:外层消息中最大的时间戳的位移量 -
参数
records
:要追加的日志条目,MemoryRecords
对象 -
①
log.append
:添加日志,底层是往 ByteBuffer 中添加//org.apache.kafka.common.record.FileRecords public int append(MemoryRecords records) throws IOException { if (records.sizeInBytes() > Integer.MAX_VALUE - size.get()) throw new IllegalArgumentException("Append of size " + records.sizeInBytes() + " bytes is too large for segment with current file position at " + size.get()); //调用MemoryRecords 的方法 int written = records.writeFullyTo(channel); size.getAndAdd(written); return written; } //org.apache.kafka.common.record.MemoryRecords public int writeFullyTo(GatheringByteChannel channel) throws IOException { //java.nio.bytebuffer buffer.mark(); int written = 0; while (written < sizeInBytes()) written += channel.write(buffer); buffer.reset(); return written; }
-
②
offsetIndex.append
:给 offsetIndex 添加记录 -
③
timeIndex.maybeAppend
:给 timeIndex 添加记录
索引操作在此 Kafka 日志模块(三)索引 文中分析。
read 方法
由 Log
中的 read
方法调用,在 Kafka 日志模块(一)Log 一文中的 2.7.5 读写操作
中有分析。
def read(startOffset: Long,
maxSize: Int,
maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
//① 把 startOffset 转换成 物理地址
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
//② 获取物理位置
val startPosition = startOffsetAndSize.position
// 封装成LogOffsetMetadata 对象
val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
//③ 调整这次读取的最大字节数,根据 minOneMessage 进行判断
// 如果 `minOneMessage` 为true,则从第`①`步中返回的值与 maxSize 取最大值
// 反之直接以 maxSize 为标准。
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
//④ 计算这次读取多少个字节数
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
//⑤ 通过position 和 fetchSize 获取消息 封装成 FetchDataInfo 并且返回
FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
-
参数
startOffset
:这次从哪个偏移量开始读取 -
参数
maxSize
:读取的最大的字节数 -
参数
maxPosition
:读取的最大的物理地址 -
参数
minOneMessage
:当读取大容量的消息的时候(超过了maxSize的限制
),如果为则允许读取,反正不允许。 -
①
translateOffset
:把 startOffset 转换成 物理地址private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = { //通过 offsetIndex 完成索引文件的查找 val mapping = offsetIndex.lookup(offset) log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition)) }
offsetIndex#lookup
:通过 offsetIndex 完成索引文件的查找,log#searchForOffsetWithSize
:通过 FileRecords 对日志文件的查找- 以上两步操作会在 Kafka 日志模块(三)索引 文中分析。
-
② 获取物理地址
position
-
③ 调整这次读取的最大字节数,根据
minOneMessage
进行判断- 如果
minOneMessage
为true,则从第①
步中返回的值与 maxSize 取最大值 - 反之直接以
maxSize
为标准。
- 如果
-
④ 计算这次读取多少个字节数
- 根据 最大的物理地址 - 开始读取的物理地址 与 最大消息字节数 取最小值。
-
⑤ 通过
position
和fetchSize
获取消息 封装成FetchDataInfo
并且返回public FileRecords slice(int position, int size) throws IOException { if (position < 0) throw new IllegalArgumentException("Invalid position: " + position + " in read from " + this); if (position > sizeInBytes() - start) throw new IllegalArgumentException("Slice from position " + position + " exceeds end position of " + this); if (size < 0) throw new IllegalArgumentException("Invalid size: " + size + " in read from " + this); int end = this.start + position + size; // handle integer overflow or if end is beyond the end of the file if (end < 0 || end >= start + sizeInBytes()) end = start + sizeInBytes(); return new FileRecords(file, channel, this.start + position, end, true); }
recover 方法
根据日志文件重建索引文件,同时验证日志文件中消息的合法性。在重新索引文件过程中,如果遇到了压缩消息需要进行解压,主要原因是因为索引项中保存的相对offset是第一条消息的offset而外层消息的offset是压缩消息集合中的最后一条消息的offset
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
//① 分别调用 offsetIndex、timeIndex、txnIndex 索引的reset 方法,清空索引文件,并且移动 position 指针
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
// 记录了已经通过验证的字节数
var validBytes = 0
// 最后一个索引项对应的物理地址
var lastIndexEntry = 0
// 最大的时间戳
maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
try {
for (batch <- log.batches.asScala) {
//② 验证message 是否合法,验证失败就抛异常
batch.ensureValid()
ensureOffsetInRange(batch.lastOffset)
// The max timestamp is exposed at the batch level, so no need to iterate the records
if (batch.maxTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = batch.maxTimestamp
offsetOfMaxTimestampSoFar = batch.lastOffset
}
// Build offset index
// 开始构建索引项
if (validBytes - lastIndexEntry > indexIntervalBytes) {
//③ 构建offsetIndex timeIndex 索引
offsetIndex.append(batch.lastOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
lastIndexEntry = validBytes
}
validBytes += batch.sizeInBytes()
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
}
}
} catch {
case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
.format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
}
val truncated = log.sizeInBytes - validBytes
if (truncated > 0)
debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
//④ 截断验证失败的日志,对索引文件进行相对应的截断
log.truncateTo(validBytes)
offsetIndex.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
timeIndex.trimToValidSize()
truncated
}
- ① 分别调用
offsetIndex
、timeIndex
、txnIndex
索引的reset
方法,清空索引文件,并且移动position
指针 - ② 验证 message 是否合法,验证失败就抛异常
- ③ 构建
offsetIndex
timeIndex
索引 - ④ 截断验证失败的日志,对索引文件进行相对应的截断
这一系列对索引的操作,在 Kafka 日志模块(三)索引 文中分析。