Kafka 日志模块(二)LogSegment

2020年5月3日 | 作者 Siran | 2400字 | 阅读大约需要5分钟
归档于 消息队列 | 标签 #kafka

概述

Kafka 日志模块(一)Log 一文中讲到了Kafka 为了防止日志文件太大,把日志文件分成多个 LogSegment ,而在每个 Segment 中又有索引文件,为了快速查找所需要的消息。并且Log类中的Append 方法和 Read 方法最终都是调用的 LogSegment 相对应的方法,也就是说 Log 是LogSegment的抽象。


源码分析

LogSegment

class LogSegment private[log] (val log: FileRecords,
                               val lazyOffsetIndex: LazyIndex[OffsetIndex],
                               val lazyTimeIndex: LazyIndex[TimeIndex],
                               val txnIndex: TransactionIndex,
                               val baseOffset: Long,
                               val indexIntervalBytes: Int,
                               val rollJitterMs: Long,
                               val time: Time) extends Logging {
  • log : 用于操作对应日志文件的 FileRecords 对象
  • lazyOffsetIndex:索引,对应LogSegment中的index索引
  • lazyTimeIndex:索引,对应LogSegment中的timeindex索引
  • txnIndex:事务有关索引
  • baseOffset:LogSegment 中第一条消息的 offset 值,称作为 BaseOffset
  • indexIntervalBytes:索引项之间间隔的的最小字节数
  • rollJitterMs:生成Segment的时间,由定时生成segment的时间 - 随机生成的值

append 方法

由 Log 中的 append 方法调用,在 Kafka 日志模块(一)Log 一文中的 2.7.5 读写操作中有分析。

def append(largestOffset: Long, 
             largestTimestamp: Long,
             shallowOffsetOfMaxTimestamp: Long,
             records: MemoryRecords): Unit = {
    if (records.sizeInBytes > 0) {
      trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
            s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
      //该条目需要添加的位置
      val physicalPosition = log.sizeInBytes()
      if (physicalPosition == 0)
        rollingBasedTimestamp = Some(largestTimestamp)

      //确保索引在这个分段范围内
      ensureOffsetInRange(largestOffset)

      // append the messages
      //① 添加日志
      val appendedBytes = log.append(records)
      trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
      // Update the in memory max timestamp and corresponding offset.
      if (largestTimestamp > maxTimestampSoFar) {
        maxTimestampSoFar = largestTimestamp
        offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
      }
      // append an entry to the index (if needed)
      if (bytesSinceLastIndexEntry > indexIntervalBytes) {
        //② 给offsetIndex 添加记录
        offsetIndex.append(largestOffset, physicalPosition)
        //③ 给timeIndex 添加记录
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
        bytesSinceLastIndexEntry = 0
      }
      bytesSinceLastIndexEntry += records.sizeInBytes
    }
  }
  • 参数 largestOffset :在这个分段中最大的位移量

  • 参数 largestTimestamp :在这个分段中最大的时间戳

  • 参数 shallowOffsetOfMaxTimestamp :外层消息中最大的时间戳的位移量

  • 参数 records :要追加的日志条目, MemoryRecords 对象

  • log.append :添加日志,底层是往 ByteBuffer 中添加

    //org.apache.kafka.common.record.FileRecords
    public int append(MemoryRecords records) throws IOException {
          if (records.sizeInBytes() > Integer.MAX_VALUE - size.get())
              throw new IllegalArgumentException("Append of size " + records.sizeInBytes() +
                      " bytes is too large for segment with current file position at " + size.get());
    
          //调用MemoryRecords 的方法
          int written = records.writeFullyTo(channel);
          size.getAndAdd(written);
          return written;
      }
        
    //org.apache.kafka.common.record.MemoryRecords
    public int writeFullyTo(GatheringByteChannel channel) throws IOException {
          //java.nio.bytebuffer
          buffer.mark();
          int written = 0;
          while (written < sizeInBytes())
              written += channel.write(buffer);
          buffer.reset();
          return written;
      }  
    
  • offsetIndex.append:给 offsetIndex 添加记录

  • timeIndex.maybeAppend:给 timeIndex 添加记录

索引操作在此 Kafka 日志模块(三)索引 文中分析。


read 方法

Log 中的 read 方法调用,在 Kafka 日志模块(一)Log 一文中的 2.7.5 读写操作中有分析。

def read(startOffset: Long,
           maxSize: Int,
           maxPosition: Long = size,
           minOneMessage: Boolean = false): FetchDataInfo = {
    if (maxSize < 0)
      throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
    //① 把 startOffset 转换成 物理地址
    val startOffsetAndSize = translateOffset(startOffset)

    // if the start position is already off the end of the log, return null
    if (startOffsetAndSize == null)
      return null

    //② 获取物理位置
    val startPosition = startOffsetAndSize.position
    // 封装成LogOffsetMetadata 对象
    val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)

    //③ 调整这次读取的最大字节数,根据 minOneMessage 进行判断
    // 如果 `minOneMessage` 为true,则从第`①`步中返回的值与 maxSize 取最大值
    // 反之直接以 maxSize 为标准。
    val adjustedMaxSize =
      if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
      else maxSize

    // return a log segment but with zero size in the case below
    if (adjustedMaxSize == 0)
      return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    //④ 计算这次读取多少个字节数
    val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)

    //⑤ 通过position 和 fetchSize 获取消息 封装成 FetchDataInfo 并且返回
    FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
      firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
  }
  • 参数 startOffset:这次从哪个偏移量开始读取

  • 参数 maxSize:读取的最大的字节数

  • 参数 maxPosition:读取的最大的物理地址

  • 参数 minOneMessage:当读取大容量的消息的时候(超过了maxSize的限制),如果为则允许读取,反正不允许。

  • translateOffset:把 startOffset 转换成 物理地址

    private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
      //通过 offsetIndex 完成索引文件的查找
      val mapping = offsetIndex.lookup(offset)
    
      log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
    }
    
    • offsetIndex#lookup :通过 offsetIndex 完成索引文件的查找,
    • log#searchForOffsetWithSize:通过 FileRecords 对日志文件的查找
    • 以上两步操作会在 Kafka 日志模块(三)索引 文中分析。
  • ② 获取物理地址 position

  • ③ 调整这次读取的最大字节数,根据 minOneMessage 进行判断

    • 如果 minOneMessage 为true,则从第步中返回的值与 maxSize 取最大值
    • 反之直接以 maxSize 为标准。
  • ④ 计算这次读取多少个字节数

    • 根据 最大的物理地址 - 开始读取的物理地址 与 最大消息字节数 取最小值。
  • ⑤ 通过 positionfetchSize 获取消息 封装成 FetchDataInfo 并且返回

    public FileRecords slice(int position, int size) throws IOException {
          if (position < 0)
              throw new IllegalArgumentException("Invalid position: " + position + " in read from " + this);
          if (position > sizeInBytes() - start)
              throw new IllegalArgumentException("Slice from position " + position + " exceeds end position of " + this);
          if (size < 0)
              throw new IllegalArgumentException("Invalid size: " + size + " in read from " + this);
    
          int end = this.start + position + size;
          // handle integer overflow or if end is beyond the end of the file
          if (end < 0 || end >= start + sizeInBytes())
              end = start + sizeInBytes();
          return new FileRecords(file, channel, this.start + position, end, true);
      }
    

recover 方法

根据日志文件重建索引文件,同时验证日志文件中消息的合法性。在重新索引文件过程中,如果遇到了压缩消息需要进行解压,主要原因是因为索引项中保存的相对offset是第一条消息的offset而外层消息的offset是压缩消息集合中的最后一条消息的offset

  def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
    //① 分别调用 offsetIndex、timeIndex、txnIndex 索引的reset 方法,清空索引文件,并且移动 position 指针
    offsetIndex.reset()
    timeIndex.reset()
    txnIndex.reset()
    // 记录了已经通过验证的字节数
    var validBytes = 0
    // 最后一个索引项对应的物理地址
    var lastIndexEntry = 0
    // 最大的时间戳
    maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
    try {
      for (batch <- log.batches.asScala) {
        //② 验证message 是否合法,验证失败就抛异常
        batch.ensureValid()
        ensureOffsetInRange(batch.lastOffset)

        // The max timestamp is exposed at the batch level, so no need to iterate the records
        if (batch.maxTimestamp > maxTimestampSoFar) {
          maxTimestampSoFar = batch.maxTimestamp
          offsetOfMaxTimestampSoFar = batch.lastOffset
        }

        // Build offset index
        // 开始构建索引项
        if (validBytes - lastIndexEntry > indexIntervalBytes) {
          //③ 构建offsetIndex timeIndex 索引 
          offsetIndex.append(batch.lastOffset, validBytes)
          timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
          lastIndexEntry = validBytes
        }
        validBytes += batch.sizeInBytes()

        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
          leaderEpochCache.foreach { cache =>
            if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
              cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
          }
          updateProducerState(producerStateManager, batch)
        }
      }
    } catch {
      case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
        warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
          .format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
    }
    val truncated = log.sizeInBytes - validBytes
    if (truncated > 0)
      debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
    
    //④ 截断验证失败的日志,对索引文件进行相对应的截断
    log.truncateTo(validBytes)
    offsetIndex.trimToValidSize()
    // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
    timeIndex.trimToValidSize()
    truncated
  }
  • ① 分别调用 offsetIndextimeIndextxnIndex 索引的 reset 方法,清空索引文件,并且移动 position 指针
  • ② 验证 message 是否合法,验证失败就抛异常
  • ③ 构建 offsetIndex timeIndex 索引
  • ④ 截断验证失败的日志,对索引文件进行相对应的截断

这一系列对索引的操作,在 Kafka 日志模块(三)索引 文中分析。