Kafka 日志模块(三)索引

2020年5月3日 | 作者 Siran | 4600字 | 阅读大约需要10分钟
归档于 消息队列 | 标签 #kafka

1. 概述

Kafka 日志模块(二)LogSegment 一文中说过,每次添加 log 的时候都会,添加相对应的索引,索引是用来快速定义 message (通过二分查找)。


2. 源码分析

与索引相关的一些类:

  • LazyIndex:它定义了 AbstractIndex 上的一个包装类,实现索引项延迟加载。这个类主要是为了提高性能。
  • AbstractIndex:它定义了最顶层的抽象类,这个类封装了所有索引类型的公共操作。
  • OffsetIndex:定义位移索引,保存“< 位移值,文件磁盘物理位置 >”对。
  • TimeIndex:定义时间戳索引,保存“< 时间戳,位移值 >”对。
  • TransactionIndex:定义事务索引,为已中止事务(Aborted Transcation)保存重要的元数据信息。只有启用 Kafka 事务后,这个索引才有可能出现。本篇不会对事务索引进行分析。

2.1 AbstractIndex

abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1,
                             val writable: Boolean) extends Closeable {

  protected def entrySize: Int
  private[this] var _maxEntries: Int = mmap.limit() / entrySize
  protected var _entries: Int = mmap.position() / entrySize
}                                 
  • file :每个索引对象在磁盘上都对应了一个索引文件。
  • baseOffset :索引对象对应日志段对象的起始位移值。

    举个例子:如果你查看 Kafka 日志路径的话,就会发现,日志文件和索引文件都是成组出现的。比如说,如果日志文件是 00000000000000000123.log,正常情况下,一定还有一组索引文件 00000000000000000123.index00000000000000000123.timeindex 等。这里的 "123" 就是这组文件的起始位移值,也就是 baseOffset 值。

  • maxIndexSize :它控制索引文件的最大长度,通过 segment.index.bytes 进行设置,默认 10MB
  • writable :索引文件打开方式
    • True 表示以“读写”方式打开
    • False 表示以“只读”方式打开
  • entrySize:表示不同索引项的大小。
    • OffsetIndex : 8

    在 OffsetIndex 中,位移值用 4 个字节来表示,物理磁盘位置也用 4 个字节来表示,所以总共是 8 个字节。你可能会说,位移值不是长整型吗,应该是 8 个字节才对啊。还记得 AbstractIndex 已经保存了 baseOffset 了吗?这里的位移值,实际上是相对于 baseOffset 的相对位移值,即真实位移值减去 baseOffset 的值。

    • TimeIndex :12

    TimeIndex 中的时间戳类型是长整型,占用 8 个字节,位移依然使用相对位移值,占用 4 个字节,因此总共需要 12 个字节。

  • _maxEntries:最大索引项个数
  • _entries:索引项个数

2.1.1 MappedByteBuffer

MMAP 内存映射,实现零拷贝

@volatile
  protected var mmap: MappedByteBuffer = {
    //① 创建索引文件  
    val newlyCreated = file.createNewFile()
    //② 以 writable 指定的方式(读写方式或只读方法)打开索引文件
    val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
    try {
      /* pre-allocate the file if necessary */
      if(newlyCreated) {
        // 预设的索引文件大小不能太小,如果连一个索引项都保存不了,直接抛出异常  
        if(maxIndexSize < entrySize)
          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
        // ③ 设置索引文件长度,roundDownToExactMultiple计算的是不超过maxIndexSize的最大整数倍entrySize 比如maxIndexSize=1234567,entrySize=8,那么调整后的文件长度为1234560
        raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
      }

      /* memory-map the file */
      //④ 更新索引长度字段_length
      _length = raf.length()
      //⑤ 创建MappedByteBuffer对象
      val idx = {
        if (writable)
          raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
        else
          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
      }
      /* set the position in the index for the next entry */
      //⑥ 如果是新创建的索引文件,将MappedByteBuffer对象的当前位置置成0 如果索引文件已存在,将MappedByteBuffer对象的当前位置设置成最后一个索引项所在的位置
      if(newlyCreated)
        idx.position(0)
      else
        // if this is a pre-existing index, assume it is valid and set position to last entry
        idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
        // ⑦ 返回创建的MappedByteBuffer对象
      idx
    } finally {
        // 关闭打开索引文件句柄
      CoreUtils.swallow(raf.close(), AbstractIndex)
    }
  }
  • ① 创建索引文件
  • ② 以 writable 指定的方式(读写方式或只读方法)打开索引文件
  • ③ 设置索引文件长度, roundDownToExactMultiple 计算的是不超过maxIndexSize的最大整数倍entrySize 比如maxIndexSize=1234567,entrySize=8,那么调整后的文件长度为1234560
  • ④ 更新索引长度字段_length
  • ⑤ 创建 MappedByteBuffer 对象
  • ⑥ 如果是新创建的索引文件,将 MappedByteBuffer 对象的当前位置置成0 如果索引文件已存在,将 MappedByteBuffer 对象的当前位置设置成最后一个索引项所在的位置
  • ⑦ 返回创建的 MappedByteBuffer 对象

AbstractIndex 中最重要的就是这个 mmap 变量了。事实上,AbstractIndex 继承类实现添加索引项的主要逻辑,也就是向 mmap 中添加对应的字段。


2.1.2 indexSlotRangeFor

索引中最重要的方法,通过二分查找,获取小于给定offset最大的offset和对应的position。

2.5.0版本中,kafka 通过改进了这个二分搜索,将索引项分成两个部分:热区(Warm Area)冷区(Cold Area),然后分别在这两个区域内执行二分查找。

  • 它能保证那些经常需要被访问的 Page 组合是固定的。
  • 查询最热那部分数据所遍历的 Page 永远是固定的,因此大概率在页缓存中,从而避免无意义的 Page Fault

Kafka 通过冷热区改进原本二分查找的原因:

  • 大多数操作系统使用页缓存来实现内存映射,而目前几乎所有的操作系统都使用 LRU(Least Recently Used)或类似于 LRU 的机制来管理页缓存。
  • Kafka 写入索引文件的方式是在文件末尾追加写入,而几乎所有的索引查询都集中在索引的尾部。这么来看的话,LRU 机制是非常适合 Kafka 的索引访问场景的。
  • 但,这里有个问题是,当 Kafka 在查询索引的时候,原版的二分查找算法并没有考虑到缓存的问题,因此很可能会导致一些不必要的缺页中断(Page Fault)
  • 此时,Kafka 线程会被阻塞,等待对应的索引项从物理磁盘中读出并放入到页缓存中。

下面我举个例子来说明一下这个情况。假设 Kafka 的某个索引占用了操作系统页缓存 13 个页(Page),如果待查找的位移值位于最后一个页上,也就是 Page 12,那么标准的二分查找算法会依次读取页号 0、6、9、1112,具体的推演流程如下所示:

  • 通常来说,一个页上保存了成百上千的索引项数据。随着索引文件不断被写入,Page #12 不断地被填充新的索引项。
  • 如果此时索引查询方都来自 ISR 副本或 Lag 很小的消费者,那么这些查询大多集中在对 Page #12 的查询,
  • 因此,Page #0、6、9、11、12 一定经常性地被源码访问。也就是说,这些页一定保存在页缓存上。
  • 后面当新的索引项填满了 Page #12,页缓存就会申请一个新的 Page 来保存索引项,即 Page #13
  • 现在,最新索引项保存在 Page #13 中。如果要查找最新索引项,原版二分查找算法将会依次访问 Page #0、7、10、1213
  • 此时,问题来了:Page 710 已经很久没有被访问过了,它们大概率不在页缓存中,因此,一旦索引开始征用 Page #13,就会发生 Page Fault,等待那些冷页数据从磁盘中加载到页缓存。

根据国外用户的测试,这种加载过程可能长达 1 秒。显然,这是一个普遍的问题,即每当索引文件占用 Page 数发生变化时,就会强行变更二分查找的搜索路径,从而出现不在页缓存的冷数据必须要加载到页缓存的情形,而这种加载过程是非常耗时的。

private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
    // check if the index is empty
    // ① 如果索引为空,直接返回<-1,-1> 对
    if(_entries == 0)
      return (-1, -1)

    // 二分查找算法
    def binarySearch(begin: Int, end: Int) : (Int, Int) = {
      // binary search for the entry
      var lo = begin
      var hi = end
      while(lo < hi) {
        val mid = (lo + hi + 1) >>> 1
        val found = parseEntry(idx, mid)
        // 查找的位移值不能小于当前最小位移值
        val compareResult = compareIndexEntry(found, target, searchEntity)
        if(compareResult > 0)
          hi = mid - 1
        else if(compareResult < 0)
          lo = mid
        else
          return (mid, mid)
      }
      (lo, if (lo == _entries - 1) -1 else lo + 1)
    }

    //② 确认热区首个索引项位于哪个槽,_warmEntries 就是所谓的分割线,固定为8
    // 如果是OffsetIndex,_warmEntries = 8192 / 8 = 1024,即第1024个槽 
    // 如果是TimeIndex,_warmEntries = 8192 / 12 = 682,即第682个槽
    val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
    // check if the target offset is in the warm section of the index
    //③ 判断 target 位移值在热区还是冷区
    if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
       // 如果在热区,搜索热区 
      return binarySearch(firstHotEntry, _entries - 1)
    }

    // check if the target offset is smaller than the least offset
    //④ 确保 target 位移值不能小于当前最小位移值
    if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
      return (-1, 0)

    //⑤ 如果在冷区,搜索冷区。
    binarySearch(0, firstHotEntry)
  }    

2.2 OffsetIndex

AbstractIndex 的实现类,OffsetIndex

class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
    extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) {
  import OffsetIndex._

  override def entrySize = 8
  • file :每个索引对象在磁盘上都对应了一个索引文件。
  • baseOffset :索引对象对应日志段对象的起始位移值。

    举个例子,如果你查看 Kafka 日志路径的话,就会发现,日志文件和索引文件都是成组出现的。比如说,如果日志文件是 00000000000000000123.log,正常情况下,一定还有一组索引文件 00000000000000000123.index、00000000000000000123.timeindex 等。这里的“123”就是这组文件的起始位移值,也就是 baseOffset 值。

  • maxIndexSize :它控制索引文件的最大长度,通过 segment.index.bytes 进行设置,默认 10MB
  • writable :索引文件打开方式
    • True 表示以“读写”方式打开
    • False 表示以“只读”方式打开
  • entrySize:表示不同索引项的大小。

    在 OffsetIndex 中,位移值用 4 个字节来表示,物理磁盘位置也用 4 个字节来表示,所以总共是 8 个字节。你可能会说,位移值不是长整型吗,应该是 8 个字节才对啊。还记得 AbstractIndex 已经保存了 baseOffset 了吗?这里的位移值,实际上是相对于 baseOffset 的相对位移值,即真实位移值减去 baseOffset 的值。


2.2.1 append

添加索引操作流程图:

def append(offset: Long, position: Int): Unit = {
    inLock(lock) {
      //① 判断索引文件是否未写满
      require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
      //② 必须满足以下条件之一才允许写入索引项
      //1. 当前索引文件为空
      //2. 要写入的位移大于当前所有已写入的索引项的位移 — kafka 规定索引项中的位移必须是单调增加的
      if (_entries == 0 || offset > _lastOffset) {
        trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
        //③ 在 mmap 中写入物理位置信息
        mmap.putInt(relativeOffset(offset))
        mmap.putInt(position)
        //④ 更新其他元数据统计信息,如当前索引项计数器 _entries 和当前索引项最新位移值 _lastOffset
        _entries += 1
        _lastOffset = offset
        //⑤ 执行校验,写入索引项格式必须符合要求,即索引项个数 * 单个索引项占用字节数匹配当前文件物理大小,否则说明文件已损坏
        require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
      } else {
        // 如果第 ② 步中两个条件都不满足,不能执行写入索引项操作,抛出异常
        throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
          s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
      }
    }
  }
  • ① 判断索引文件是否未写满
  • ② 必须满足以下条件之一才允许写入索引项
      1. 当前索引文件为空
      1. 要写入的位移大于当前所有已写入的索引项的位移 — kafka 规定索引项中的位移必须是单调增加的
  • ③ 在 mmap 中写入物理位置信息
  • ④ 更新其他元数据统计信息,如当前索引项计数器 _entries 和当前索引项最新位移值 _lastOffset
  • ⑤ 执行校验,写入索引项格式必须符合要求,即索引项个数 * 单个索引项占用字节数匹配当前文件物理大小,否则说明文件已损坏

2.2.2 lookup

找到小于给定的 offset 的最大的 offset 的 物理地址。

  def lookup(targetOffset: Long): OffsetPosition = {
    //加锁
    maybeLock(lock) {
      //创建一个副本
      val idx = mmap.duplicate
      //二分查找
      val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
      //如果找到则调用 parseEntry 方法 返回offset和postition的映射关系
      //如果找不到则返回 baseOffset, 0
      if(slot == -1)
        OffsetPosition(baseOffset, 0)
      else
        parseEntry(idx, slot)
    }
  }

2.3 TimeIndex

TimeIndex 索引和 OffsetIndex 一样。

2.3.1 maybeAppend

def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
    inLock(lock) {
      if (!skipFullCheck)
        require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
      // We do not throw exception when the offset equals to the offset of last entry. That means we are trying
      // to insert the same time index entry as the last entry.
      // If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion
      // because that could happen in the following two scenarios:
      // 1. A log segment is closed.
      // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
      if (_entries != 0 && offset < lastEntry.offset)
        throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
          s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
      if (_entries != 0 && timestamp < lastEntry.timestamp)
        throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
          s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")
      // We only append to the time index when the timestamp is greater than the last inserted timestamp.
      // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
      // index will be empty.
      if (timestamp > lastEntry.timestamp) {
        trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
        mmap.putLong(timestamp)
        mmap.putInt(relativeOffset(offset))
        _entries += 1
        _lastEntry = TimestampOffset(timestamp, offset)
        require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".")
      }
    }
  }

2.3.2 lookup

def lookup(targetTimestamp: Long): TimestampOffset = {
    maybeLock(lock) {
      val idx = mmap.duplicate
      val slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
      if (slot == -1)
        TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
      else
        parseEntry(idx, slot)
    }
  }

参考自:Kafka核心源码解读