1. 概述
在 Kafka 日志模块(二)LogSegment 一文中说过,每次添加 log
的时候都会,添加相对应的索引,索引是用来快速定义 message
(通过二分查找
)。
2. 源码分析
与索引相关的一些类:
LazyIndex
:它定义了AbstractIndex
上的一个包装类,实现索引项延迟加载
。这个类主要是为了提高性能。AbstractIndex
:它定义了最顶层的抽象类,这个类封装了所有索引类型的公共操作。OffsetIndex
:定义位移索引,保存“< 位移值,文件磁盘物理位置 >”
对。TimeIndex
:定义时间戳索引,保存“< 时间戳,位移值 >”
对。TransactionIndex
:定义事务索引,为已中止事务(Aborted Transcation)保存重要的元数据信息。只有启用 Kafka 事务后,这个索引才有可能出现。本篇不会对事务索引进行分析。
2.1 AbstractIndex
abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1,
val writable: Boolean) extends Closeable {
protected def entrySize: Int
private[this] var _maxEntries: Int = mmap.limit() / entrySize
protected var _entries: Int = mmap.position() / entrySize
}
file
:每个索引对象在磁盘上都对应了一个索引文件。baseOffset
:索引对象对应日志段对象的起始位移值。举个例子:如果你查看 Kafka 日志路径的话,就会发现,日志文件和索引文件都是成组出现的。比如说,如果日志文件是
00000000000000000123.log
,正常情况下,一定还有一组索引文件00000000000000000123.index
、00000000000000000123.timeindex
等。这里的"123"
就是这组文件的起始位移值,也就是baseOffset
值。maxIndexSize
:它控制索引文件的最大长度,通过segment.index.bytes
进行设置,默认 10MBwritable
:索引文件打开方式True
表示以“读写
”方式打开False
表示以“只读
”方式打开
entrySize
:表示不同索引项的大小。OffsetIndex
: 8
在 OffsetIndex 中,位移值用 4 个字节来表示,物理磁盘位置也用 4 个字节来表示,所以总共是 8 个字节。你可能会说,位移值不是长整型吗,应该是 8 个字节才对啊。还记得 AbstractIndex 已经保存了 baseOffset 了吗?这里的位移值,实际上是相对于 baseOffset 的相对位移值,即真实位移值减去 baseOffset 的值。
TimeIndex
:12
TimeIndex 中的时间戳类型是长整型,占用 8 个字节,位移依然使用相对位移值,占用 4 个字节,因此总共需要 12 个字节。
_maxEntries
:最大索引项个数_entries
:索引项个数
2.1.1 MappedByteBuffer
MMAP 内存映射,实现零拷贝
。
@volatile
protected var mmap: MappedByteBuffer = {
//① 创建索引文件
val newlyCreated = file.createNewFile()
//② 以 writable 指定的方式(读写方式或只读方法)打开索引文件
val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
try {
/* pre-allocate the file if necessary */
if(newlyCreated) {
// 预设的索引文件大小不能太小,如果连一个索引项都保存不了,直接抛出异常
if(maxIndexSize < entrySize)
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
// ③ 设置索引文件长度,roundDownToExactMultiple计算的是不超过maxIndexSize的最大整数倍entrySize 比如maxIndexSize=1234567,entrySize=8,那么调整后的文件长度为1234560
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
}
/* memory-map the file */
//④ 更新索引长度字段_length
_length = raf.length()
//⑤ 创建MappedByteBuffer对象
val idx = {
if (writable)
raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
else
raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
}
/* set the position in the index for the next entry */
//⑥ 如果是新创建的索引文件,将MappedByteBuffer对象的当前位置置成0 如果索引文件已存在,将MappedByteBuffer对象的当前位置设置成最后一个索引项所在的位置
if(newlyCreated)
idx.position(0)
else
// if this is a pre-existing index, assume it is valid and set position to last entry
idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
// ⑦ 返回创建的MappedByteBuffer对象
idx
} finally {
// 关闭打开索引文件句柄
CoreUtils.swallow(raf.close(), AbstractIndex)
}
}
- ① 创建索引文件
- ② 以
writable
指定的方式(读写方式或只读方法)打开索引文件 - ③ 设置索引文件长度,
roundDownToExactMultiple
计算的是不超过maxIndexSize的最大整数倍entrySize 比如maxIndexSize=1234567,entrySize=8,那么调整后的文件长度为1234560 - ④ 更新索引长度字段
_length
- ⑤ 创建
MappedByteBuffer
对象 - ⑥ 如果是新创建的索引文件,将
MappedByteBuffer
对象的当前位置置成0 如果索引文件已存在,将MappedByteBuffer
对象的当前位置设置成最后一个索引项所在的位置 - ⑦ 返回创建的
MappedByteBuffer
对象
AbstractIndex
中最重要的就是这个 mmap
变量了。事实上,AbstractIndex 继承类实现添加索引项的主要逻辑,也就是向 mmap
中添加对应的字段。
2.1.2 indexSlotRangeFor
索引中最重要的方法,通过二分查找
,获取小于给定offset最大的offset和对应的position。
在2.5.0
版本中,kafka 通过改进了这个二分搜索,将索引项分成两个部分:热区(Warm Area)
和 冷区(Cold Area)
,然后分别在这两个区域内执行二分查找。
- 它能保证那些经常需要被访问的
Page
组合是固定的。 - 查询最热那部分数据所遍历的
Page
永远是固定的,因此大概率在页缓存中,从而避免无意义的Page Fault
。
Kafka 通过冷热区改进原本二分查找的原因:
- 大多数操作系统使用页缓存来实现内存映射,而目前几乎所有的操作系统都使用
LRU(Least Recently Used)
或类似于 LRU 的机制来管理页缓存。 - Kafka 写入索引文件的方式是在文件
末尾追加写入
,而几乎所有的索引查询都集中在索引的尾部。这么来看的话,LRU 机制是非常适合 Kafka 的索引访问场景的。 - 但,这里有个问题是,当 Kafka 在查询索引的时候,原版的二分查找算法并没有考虑到缓存的问题,因此很可能会导致一些不必要的
缺页中断(Page Fault)
。 - 此时,Kafka 线程会被阻塞,等待对应的索引项从物理磁盘中读出并放入到页缓存中。
下面我举个例子来说明一下这个情况。假设 Kafka 的某个索引占用了操作系统页缓存 13 个页(Page),如果待查找的位移值位于最后一个页上,也就是 Page 12,那么标准的二分查找算法会依次读取页号
0、6、9、11
和12
,具体的推演流程如下所示:
- 通常来说,一个页上保存了成百上千的索引项数据。随着索引文件不断被写入,
Page #12
不断地被填充新的索引项。 - 如果此时索引查询方都来自
ISR
副本或Lag
很小的消费者,那么这些查询大多集中在对Page #12
的查询, - 因此,
Page #0、6、9、11、12
一定经常性地被源码访问。也就是说,这些页一定保存在页缓存上。 - 后面当新的索引项填满了
Page #12
,页缓存就会申请一个新的Page
来保存索引项,即Page #13
。 - 现在,最新索引项保存在
Page #13
中。如果要查找最新索引项,原版二分查找算法将会依次访问Page #0、7、10、12
和13
。 - 此时,问题来了:
Page 7
和10
已经很久没有被访问过了,它们大概率不在页缓存中,因此,一旦索引开始征用Page #13
,就会发生Page Fault
,等待那些冷页数据从磁盘中加载到页缓存。
根据国外用户的测试,这种加载过程可能长达 1 秒。显然,这是一个普遍的问题,即每当索引文件占用 Page 数发生变化时,就会强行变更二分查找的搜索路径,从而出现不在页缓存的冷数据必须要加载到页缓存的情形,而这种加载过程是非常耗时的。
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
// check if the index is empty
// ① 如果索引为空,直接返回<-1,-1> 对
if(_entries == 0)
return (-1, -1)
// 二分查找算法
def binarySearch(begin: Int, end: Int) : (Int, Int) = {
// binary search for the entry
var lo = begin
var hi = end
while(lo < hi) {
val mid = (lo + hi + 1) >>> 1
val found = parseEntry(idx, mid)
// 查找的位移值不能小于当前最小位移值
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult > 0)
hi = mid - 1
else if(compareResult < 0)
lo = mid
else
return (mid, mid)
}
(lo, if (lo == _entries - 1) -1 else lo + 1)
}
//② 确认热区首个索引项位于哪个槽,_warmEntries 就是所谓的分割线,固定为8
// 如果是OffsetIndex,_warmEntries = 8192 / 8 = 1024,即第1024个槽
// 如果是TimeIndex,_warmEntries = 8192 / 12 = 682,即第682个槽
val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
// check if the target offset is in the warm section of the index
//③ 判断 target 位移值在热区还是冷区
if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
// 如果在热区,搜索热区
return binarySearch(firstHotEntry, _entries - 1)
}
// check if the target offset is smaller than the least offset
//④ 确保 target 位移值不能小于当前最小位移值
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
return (-1, 0)
//⑤ 如果在冷区,搜索冷区。
binarySearch(0, firstHotEntry)
}
2.2 OffsetIndex
AbstractIndex
的实现类,OffsetIndex
class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) {
import OffsetIndex._
override def entrySize = 8
file
:每个索引对象在磁盘上都对应了一个索引文件。baseOffset
:索引对象对应日志段对象的起始位移值。举个例子,如果你查看 Kafka 日志路径的话,就会发现,日志文件和索引文件都是成组出现的。比如说,如果日志文件是 00000000000000000123.log,正常情况下,一定还有一组索引文件 00000000000000000123.index、00000000000000000123.timeindex 等。这里的“123”就是这组文件的起始位移值,也就是 baseOffset 值。
maxIndexSize
:它控制索引文件的最大长度,通过segment.index.bytes
进行设置,默认 10MBwritable
:索引文件打开方式True
表示以“读写”方式打开False
表示以“只读”方式打开
entrySize
:表示不同索引项的大小。在 OffsetIndex 中,位移值用 4 个字节来表示,物理磁盘位置也用 4 个字节来表示,所以总共是 8 个字节。你可能会说,位移值不是长整型吗,应该是 8 个字节才对啊。还记得 AbstractIndex 已经保存了 baseOffset 了吗?这里的位移值,实际上是相对于 baseOffset 的相对位移值,即真实位移值减去 baseOffset 的值。
2.2.1 append
添加索引操作流程图:
def append(offset: Long, position: Int): Unit = {
inLock(lock) {
//① 判断索引文件是否未写满
require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
//② 必须满足以下条件之一才允许写入索引项
//1. 当前索引文件为空
//2. 要写入的位移大于当前所有已写入的索引项的位移 — kafka 规定索引项中的位移必须是单调增加的
if (_entries == 0 || offset > _lastOffset) {
trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
//③ 在 mmap 中写入物理位置信息
mmap.putInt(relativeOffset(offset))
mmap.putInt(position)
//④ 更新其他元数据统计信息,如当前索引项计数器 _entries 和当前索引项最新位移值 _lastOffset
_entries += 1
_lastOffset = offset
//⑤ 执行校验,写入索引项格式必须符合要求,即索引项个数 * 单个索引项占用字节数匹配当前文件物理大小,否则说明文件已损坏
require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
} else {
// 如果第 ② 步中两个条件都不满足,不能执行写入索引项操作,抛出异常
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
}
}
}
- ① 判断索引文件是否未写满
- ② 必须满足以下条件之一才允许写入索引项
-
- 当前索引文件为空
-
- 要写入的位移大于当前所有已写入的索引项的位移 — kafka 规定索引项中的位移必须是单调增加的
-
- ③ 在 mmap 中写入物理位置信息
- ④ 更新其他元数据统计信息,如当前索引项计数器
_entries
和当前索引项最新位移值_lastOffset
- ⑤ 执行校验,写入索引项格式必须符合要求,即
索引项个数
*单个索引项占用字节数
匹配当前文件物理大小,否则说明文件已损坏
2.2.2 lookup
找到小于给定的 offset 的最大的 offset 的 物理地址。
def lookup(targetOffset: Long): OffsetPosition = {
//加锁
maybeLock(lock) {
//创建一个副本
val idx = mmap.duplicate
//二分查找
val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
//如果找到则调用 parseEntry 方法 返回offset和postition的映射关系
//如果找不到则返回 baseOffset, 0
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
parseEntry(idx, slot)
}
}
2.3 TimeIndex
TimeIndex 索引和 OffsetIndex 一样。
2.3.1 maybeAppend
def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
inLock(lock) {
if (!skipFullCheck)
require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
// We do not throw exception when the offset equals to the offset of last entry. That means we are trying
// to insert the same time index entry as the last entry.
// If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion
// because that could happen in the following two scenarios:
// 1. A log segment is closed.
// 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
if (_entries != 0 && offset < lastEntry.offset)
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
if (_entries != 0 && timestamp < lastEntry.timestamp)
throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")
// We only append to the time index when the timestamp is greater than the last inserted timestamp.
// If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
// index will be empty.
if (timestamp > lastEntry.timestamp) {
trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
mmap.putLong(timestamp)
mmap.putInt(relativeOffset(offset))
_entries += 1
_lastEntry = TimestampOffset(timestamp, offset)
require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".")
}
}
}
2.3.2 lookup
def lookup(targetTimestamp: Long): TimestampOffset = {
maybeLock(lock) {
val idx = mmap.duplicate
val slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
if (slot == -1)
TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
else
parseEntry(idx, slot)
}
}
参考自:Kafka核心源码解读