Kafka 日志模块(一)Log

2020年5月1日 | 作者 Siran | 15200字 | 阅读大约需要31分钟
归档于 消息队列 | 标签 #kafka

1. 概述

Kafka 使用日志文件的方式保存生产者发送的消息。每条消息都有一个 offset 值来标识它在分区中的偏移量,这个offset 是逻辑值,并不是消息实际存放的物理地址。

  • offset 值类似与数据库表中的主键,主键唯一确定了数据库表中的一条记录
  • offset 唯一确定了分区的一条消息。

如下图所示:

  • Topic:Kafka 消息以 Topic 为基本单位进行归类,各个主题在逻辑上相互独立。

  • Partition:每个主题有可以分为一个或者多个 partition ,可以在创建主题的时候指定,也可以之后修改

  • Offset:每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号

Kafka 为了避免存储的日志文件太大,并不是直接对应磁盘上的一个日志文件,而是对应磁盘上的一个目录,如下图所示:

  • 通过 topic_name - partition_id 来命名,则对应topic 和对应的partition 的log的存储在此目录下的文件中。

Kafka 还引入了 LogSegment 的概念,把Log分隔成多个分段也就是多个 LogSegment,如下如所示:

  • .log(日志文件)

  • .index(偏移量索引文件)

  • .timeindex(时间戳索引文件)

  • 其他文件

  • activeSegment:向Log 中追加消息时是顺序写入的,只有最后一个LogSegment 才能执行写入操作,之前的都不行。最后一个LogSegment 称之为ActiveSegment即表示活跃的日志分段,

    • 当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment
  • 每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment中第一条消息的offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为20位数字,没有达到的位数则用0填充。比如第一个LogSegment的基准偏移量为0,对应的日志文件为 00000000000000000000.log。

Kafka 服务端(三)kafka-server-start.sh 一文中讲述了我们启动Kafka之后,Kafka 每个模块初始化的入口类。

Kafka 的所有的消息都是通过日志来存储的,它是通过 LogManager 来进行初始化的,Log 类是真正操作日志的,LogManager 是用来管理 Log 的。

在LogManager 初始化的时候会通过 log.dirs 配置的位置来生成我们的Log,并且会生成5个文件,如下图:

  • cleaner-offset-checkPoint:清理检查点文件,用来记录每个主题的每个分区中已清理的偏移量

  • log-start-offset-checkpoint:它用来标识日志的起始偏移量。各个副本在变动 LEOHW 的过程中,logStartOffset 也有可能随之而动,Kafka 也有一个定时任务来负责将所有分区的logStartOffset书写到起始点文件log-start-offset-checkpoint中,定时周期由broker端参数log.flush.start.offset.checkpoint.interval.ms来配置,默认值为 60000

  • meta.properties:记录一些元数据

    cluster.id=7YpDHjejRY2QU_U5Wk_CDQ 
    version=0
    broker.id=0
    
  • recovery-point-offset-checkpoint:Kafka 中会有一个定时任务负责将所有分区的 LEO 刷写到恢复点文件 recovery-point-offset-checkpoint 中,定时周期由 broker 端参数log.flush.offset.checkpoint.interval.ms来配置,默认值为 60000

  • replication-offset-checkpoint:定时任务负责将所有分区的 HW 刷写到复制点文件replication-offset-checkpoint中,定时周期由broker端参数replica.high.watermark.checkpoint.interval.ms来配置,默认值为5000

这些定时任务在 []() 文中进行分析。这篇文章主要来分析 Kafka 是如何操作日志的。


2. 源码解析

Log 类结构如下图:

  • LogAppendInfo:一组待写入消息的各种信息。
  • Log:核心类,对日志进行一些操作比如添加
  • RollParams:用于日志是否切分
  • LogOffsetSnapshot:分区内所有位移元数据的容器类
  • LogMetricNames:Log的监控指标
  • LogReadInfo:读取日志返回的数据及其元数据
  • CompletedTxn:记录已完成事务的元数据,主要用于构建事务

2.1 LogAppendInfo

保存了一组待写入消息的各种信息,比如这组消息中最后一条消息的offset,最大的时间戳是什么等信息。

case class LogAppendInfo(var firstOffset: Option[Long],
                         var lastOffset: Long,  // 消息集合最后一条消息的位移值
                         var maxTimestamp: Long,  // 消息集合最大消息时间戳
                         var offsetOfMaxTimestamp: Long, // 消息集合最大消息时间戳所属消息的位移值
                         var logAppendTime: Long, // 写入消息时间戳
                         var logStartOffset: Long, 
                         var recordConversionStats: RecordConversionStats,  // 消息转换统计类,里面记录了执行了格式转换的消息数等数据
                         sourceCodec: CompressionCodec, // 消息集合中消息使用的压缩器(Compressor)类型,比如是Snappy还是LZ4
                         targetCodec: CompressionCodec, // 写入消息时需要使用的压缩器类型
                         shallowCount: Int, // 消息批次数,每个消息批次下可能包含多条消息
                         validBytes: Int, // 写入消息总字节数
                         offsetsMonotonic: Boolean, // 消息位移值是否是顺序增加的
                         lastOffsetOfFirstBatch: Long, // 首个消息批次中最后一条消息的位移
                         recordErrors: Seq[RecordError] = List(), // 写入消息时出现的异常列表
                         errorMessage: String = null) { // 错误码
  def firstOrLastOffsetOfFirstBatch: Long = firstOffset.getOrElse(lastOffsetOfFirstBatch)

  def numMessages: Long = {
    firstOffset match {
      case Some(firstOffsetVal) if (firstOffsetVal >= 0 && lastOffset >= 0) => (lastOffset - firstOffsetVal + 1)
      case _ => 0
    }
  }
}
  • firstOffset:消息集合中第一条消息的位移值
  • lastOffset:消息集合最后一条消息的位移值
  • maxTimestamp:消息集合最大消息时间戳
  • offsetOfMaxTimestamp:消息集合最大消息时间戳所属消息的位移值
  • logAppendTime:写入消息时间戳
  • logStartOffset:日志文件的起始偏移量等于第一个日志分段的 baseOffset,但这并不是绝对的,logStartOffset 的值可以通过 DeleteRecordsRequest 请求(比如使用KafkaAdminClient的deleteRecords()方法、使用kafka-delete-records.sh脚本)、日志的清理和截断等操作进行修改。
  • recordConversionStats:消息转换统计类,里面记录了执行了格式转换的消息数等数据
  • sourceCodec:消息集合中消息使用的压缩器(Compressor)类型,比如是Snappy还是LZ4
  • targetCodec:写入消息时需要使用的压缩器类型
  • shallowCount:消息批次数,每个消息批次下可能包含多条消息
  • validBytes:写入消息总字节数
  • offsetsMonotonic:消息位移值是否是顺序增加的
  • lastOffsetOfFirstBatch:首个消息批次中最后一条消息的位移
  • recordErrors:写入消息时出现的异常列表
  • errorMessage:错误码

2.2 RollParams

定义用于控制日志是否要切分的数据结构

case class RollParams(maxSegmentMs: Long,
                      maxSegmentBytes: Int,
                      maxTimestampInMessages: Long,
                      maxOffsetInMessages: Long,
                      messagesSize: Int,
                      now: Long)
  • maxSegmentMs:这个 Segment 持续多久,当到了这个时间如果这个 Segment 文件没有满,那么会强制创建新的 Segment ,确保可以删除或者压缩日志。
  • maxSegmentBytes:当前 Segment 能存储最大的字节数
  • maxTimestampInMessages:当前 Segment 中最大的时间戳
  • maxOffsetInMessages:当前 Segment 中最大的位移量
  • messagesSize:当前 Segment 中消息的数量
  • now:切分 Segment 的时间

2.3 LogReadInfo

case class LogReadInfo(fetchedData: FetchDataInfo,
                       highWatermark: Long,
                       logStartOffset: Long,
                       logEndOffset: Long,
                       lastStableOffset: Long)


2.4 LogOffsetSnapshot

分区内所有位移元数据的容器类

case class LogOffsetSnapshot(logStartOffset: Long,
                             logEndOffset: LogOffsetMetadata,
                             highWatermark: LogOffsetMetadata,
                             lastStableOffset: LogOffsetMetadata)
  • logStartOffset:日志文件的起始偏移量等于第一个日志分段的 baseOffset,但这并不是绝对的,logStartOffset 的值可以通过 DeleteRecordsRequest 请求(比如使用KafkaAdminClient的deleteRecords()方法、使用kafka-delete-records.sh脚本)、日志的清理和截断等操作进行修改。
  • logEndOffset:下一条插入日志的的位移值
  • highWatermark:高水位,消费者只能消费 <= 此值的消息,当一条消息发送给Server 端,只有当所有的 Replica 同步后才会增加此值
  • lastStableOffset:这个值和高水位一个意思,但是这个值只有在开启事务才有用,也就是说当开启事务后,消费者只能消费 <= lastStableOffset 的消息。它不会超过 HW

2.5 LogMetricNames

Log的监控指标

object LogMetricNames {
  val NumLogSegments: String = "NumLogSegments"
  val LogStartOffset: String = "LogStartOffset"
  val LogEndOffset: String = "LogEndOffset"
  val Size: String = "Size"
}

2.6 CompletedTxn

记录已完成事务的元数据,主要用于构建事务

case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) {
  override def toString: String = {
    "CompletedTxn(" +
      s"producerId=$producerId, " +
      s"firstOffset=$firstOffset, " +
      s"lastOffset=$lastOffset, " +
      s"isAborted=$isAborted)"
  }
}

2.7 Log

核心类,对日志进行一些操作比如 添加,Log 是对多个 LogSegment 对象顺序的组合,形成一个逻辑的日志,使用跳表来对 LogSegment 进行管理。

class Log(@volatile var dir: File, // dir 就是这个日志所在的文件夹路径,也就是主题分区的路径
          @volatile var config: LogConfig,
          @volatile var logStartOffset: Long, //日志的当前最早位移
          @volatile var recoveryPoint: Long,
          scheduler: Scheduler,
          brokerTopicStats: BrokerTopicStats,
          val time: Time,
          val maxProducerIdExpirationMs: Int,
          val producerIdExpirationCheckIntervalMs: Int,
          val topicPartition: TopicPartition,
          val producerStateManager: ProducerStateManager,
          logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup{
  private val lastFlushedTime = new AtomicLong(time.milliseconds)
  @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
  @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
  @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None

}
      
  • dir :日志所在的文件夹路径,也就是主题分区的路径
  • config :配置
  • logStartOffset :当前日志最早的位移值
  • recoveryPoint :日志的恢复点
  • scheduler :定时器
  • brokerTopicStats:broker中topic的状态
  • maxProducerIdExpirationMs:在ProducerId 被视为过期之后最长等待时间
  • producerIdExpirationCheckIntervalMs:检查 Producer Id 是否过期的间隔时间
  • topicPartition:topic - partition 的映射关系
  • producerStateManager
  • logDirFailureChannel:日志失败的channel
  • lastFlushedTime:最后一次flush的时间
  • nextOffsetMetadata:下一个offset的一些元数据
  • highWatermarkMetadata:hw的元数据
  • segments:日志分段通过跳表来存储
  • leaderEpochCache:leader epoch 的缓存,用于一致性。

2.7.1 locally

Log 类的初始化方法

流程图如下:

locally {
    val startMs = time.milliseconds

    // create the log directory if it doesn't exist
    //① 创建log文件,如果不存在就创建。通过dir属性
    Files.createDirectories(dir.toPath)

    //② 初始化 Leader Epoch Cache
    //创建Leader Epoch 检查点文件
    //生成Leader Epoch Cache 对象
    initializeLeaderEpochCache()

    //③ 加载所有日志分段
    val nextOffset = loadSegments()

    /* Calculate the offset of the next message */
    //④ 更新nextOffsetMetadata 和 logStartOffset
    nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)

    leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))

    updateLogStartOffset(math.max(logStartOffset, segments.firstEntry.getValue.baseOffset))

    // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
    //⑤ 更新Leader Epoch Cache ,清除无效处理
    leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))

    // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
    // from scratch.
    if (!producerStateManager.isEmpty)
      throw new IllegalStateException("Producer state must be empty during log initialization")
    loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)

    info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
      s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
  }
  • createDirectories( ):创建 log 文件夹,如果不存在就创建。

  • initializeLeaderEpochCache( ):初始化 Leader Epoch Cache

    private def initializeLeaderEpochCache(): Unit = lock synchronized {
      //2.1 创建 leader-epoch-checkpoint 文件
      val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
    
      def newLeaderEpochFileCache(): LeaderEpochFileCache = {
        val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
        new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
      }
    
      //2.2 实例化 LeaderEpochFileCache 对象,此对象用来缓存 tp 和 leo的关系
      if (recordVersion.precedes(RecordVersion.V2)) {
        val currentCache = if (leaderEpochFile.exists())
          Some(newLeaderEpochFileCache())
        else
          None
    
        if (currentCache.exists(_.nonEmpty))
          warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")
    
        Files.deleteIfExists(leaderEpochFile.toPath)
        leaderEpochCache = None
      } else {
        leaderEpochCache = Some(newLeaderEpochFileCache())
      }
    }
    
    • 2.1 创建 leader-epoch-checkpoint 文件,此文件是用于记录每个当前Kafka Leader 的epoch,用于一致性。
    • 2.2 实例化 LeaderEpochFileCache 对象,此对象用来缓存 tpleo 的关系
  • loadSegments( ):加载所有日志分段

  • ④ 更新 nextOffsetMetadatalogStartOffset

  • ⑤ 更新 Leader Epoch Cache ,清除无效处理

2.7.1.1 loadSegments

Log 类的初始化 locally 方法中的第 步 通过loadSegments( )方法加载当前所有的 Segments 。 大致逻辑:

  1. 移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等)
  2. 清空所有日志段对象,并且再次遍历分区路径,重建日志段 segments Map 以及索引文件。
  3. 待执行完这两次遍历之后,它会完成未完成的 swap 操作,即调用 completeSwapOperations 方法。
  4. 等这些都做完之后,再调用 recoverLog 方法恢复日志段对象,然后返回恢复之后的分区日志 LEO 值。
private def loadSegments(): Long = {
    //① 移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等)
    val swapFiles = removeTempFilesAndCollectSwapFiles()

    retryOnOffsetOverflow {
      logSegments.foreach(_.close())
      segments.clear()
      //② 重新加载日志段文件
      loadSegmentFiles()
    }

    //③ 处理第①步返回的有效.swap 文件集合
    completeSwapOperations(swapFiles)

    if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
      val nextOffset = retryOnOffsetOverflow {
        //④ 恢复日志
        recoverLog()
      }

      activeSegment.resizeIndexes(config.maxIndexSize)
      nextOffset
    } else {
       if (logSegments.isEmpty) {
          addSegment(LogSegment.open(dir = dir,
            baseOffset = 0,
            config,
            time = time,
            fileAlreadyExists = false,
            initFileSize = this.initFileSize,
            preallocate = false))
       }
      0
    }
  }
  • removeTempFilesAndCollectSwapFiles( ):移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等)
    private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
    
      //在方法内部定义一个名为deleteIndicesIfExist的方法,用于删除日志文件对应的索引文件
      def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
        info(s"Deleting index files with suffix $suffix for baseFile $baseFile")
        val offset = offsetFromFile(baseFile)
        Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
        Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
        Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
      }
    
      var swapFiles = Set[File]()
      var cleanFiles = Set[File]()
      var minCleanedFileOffset = Long.MaxValue
    
      //① 遍历分区日志路径下的所有文件
      for (file <- dir.listFiles if file.isFile) {
        if (!file.canRead) //如果不可读,直接抛出IOException
          throw new IOException(s"Could not read file $file")
        val filename = file.getName
        //② 如果文件是以.deleted结尾,说明是上次Failure遗留下来的文件,直接删除
        if (filename.endsWith(DeletedFileSuffix)) {
          debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
          Files.deleteIfExists(file.toPath) 
        //③ 如果是以.cleaned结尾
        } else if (filename.endsWith(CleanedFileSuffix)) {
          //选取文件名中位移值最小的.cleaned文件,获取其位移值,并将该文件加入待删除文件集合中
          minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset)
          cleanFiles += file
        //④ 如果以.swap结尾 
        } else if (filename.endsWith(SwapFileSuffix)) { 
          // we crashed in the middle of a swap operation, to recover:
          // if a log, delete the index files, complete the swap operation later
          // if an index just delete the index files, they will be rebuilt
          val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
          info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.")
          if (isIndexFile(baseFile)) { // 如果该.swap文件原来是索引文件
            deleteIndicesIfExist(baseFile) // 删除原来的索引文件
          } else if (isLogFile(baseFile)) { // 如果该.swap文件原来是日志文件
            deleteIndicesIfExist(baseFile) // 删除掉原来的索引文件
            swapFiles += file // 加入待恢复的.swap文件集合中
          }
        }
      }
    
      // KAFKA-6264: Delete all .swap files whose base offset is greater than the minimum .cleaned segment offset. Such .swap
      // files could be part of an incomplete split operation that could not complete. See Log#splitOverflowedSegment
      // for more details about the split operation.
      // 从待恢复swap集合中找出那些起始位移值大于minCleanedFileOffset值的文件,直接删掉这些无效的.swap文件
      val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
      invalidSwapFiles.foreach { file =>
        debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
        val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
        deleteIndicesIfExist(baseFile, SwapFileSuffix)
        Files.deleteIfExists(file.toPath)
      }
    
      // Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
      //清除所有待删除文件集合中的文件
      cleanFiles.foreach { file =>
        debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
        Files.deleteIfExists(file.toPath)
      }
    
      //最后返回当前有效的.swap文件集合
      validSwapFiles
    }
    
  • loadSegmentFiles( ) :重新加载日志段文件
    private def loadSegmentFiles(): Unit = {
      // load segments in ascending order because transactional data from one segment may depend on the
      // segments that come before it
      //按照日志段文件名中的位移值正序排列,然后遍历每个文件
      for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
        if (isIndexFile(file)) { // 如果是索引文件
          // if it is an index file, make sure it has a corresponding .log file
          val offset = offsetFromFile(file)
          val logFile = Log.logFile(dir, offset)
          // 确保存在对应的日志文件,否则记录一个警告,并删除该索引文件
          if (!logFile.exists) {
            warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
            Files.deleteIfExists(file.toPath)
          }
        } else if (isLogFile(file)) { // 如果是日志文件
          // if it's a log file, load the corresponding log segment
          val baseOffset = offsetFromFile(file)
          val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
          // 创建对应的LogSegment对象实例,并加入segments中
          val segment = LogSegment.open(dir = dir,
            baseOffset = baseOffset,
            config,
            time = time,
            fileAlreadyExists = true)
    
          try segment.sanityCheck(timeIndexFileNewlyCreated)
          catch {
            case _: NoSuchFileException =>
              error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
                "recovering segment and rebuilding index files...")
              recoverSegment(segment)
            case e: CorruptIndexException =>
              warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
                s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
              recoverSegment(segment)
          }
          addSegment(segment)
        }
      }
    }
    
  • completeSwapOperations( ) :处理第①步返回的有效.swap 文件集合
    private def completeSwapOperations(swapFiles: Set[File]): Unit = {
      //① 遍历所有有效.swap文件
      for (swapFile <- swapFiles) {
        val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
        //拿到日志文件的起始位移值
        val baseOffset = offsetFromFile(logFile)
        //② 创建对应的LogSegment实例
        val swapSegment = LogSegment.open(swapFile.getParentFile,
          baseOffset = baseOffset,
          config,
          time = time,
          fileSuffix = SwapFileSuffix)
        info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
        //③ 执行日志段恢复操作
        recoverSegment(swapSegment)
    
        // We create swap files for two cases:
        // (1) Log cleaning where multiple segments are merged into one, and
        // (2) Log splitting where one segment is split into multiple.
        //
        // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
        // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
        // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
        // do a replace with an existing segment.
        //④确认之前删除日志段是否成功,是否还存在老的日志段文件
        val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
          segment.readNextOffset > swapSegment.baseOffset
        }
        // ⑤ 如果存在,直接把.swap文件重命名成.log
        replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
      }
    }
    
  • recoverLog( ):恢复日志
    private def recoverLog(): Long = {
      // if we have the clean shutdown marker, skip recovery
      //① 如果不存在以.kafka_cleanshutdown结尾的文件。通常都不存在
      if (!hasCleanShutdownFile) {
        // okay we need to actually recover this log
        //获取到上次恢复点以外的所有unflushed日志段对象
        val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
        var truncated = false
    
        //遍历这些unflushed日志段
        while (unflushed.hasNext && !truncated) {
          val segment = unflushed.next
          info(s"Recovering unflushed segment ${segment.baseOffset}")
          val truncatedBytes =
            try {
              //执行恢复日志段操作
              recoverSegment(segment, leaderEpochCache)
            } catch {
              case _: InvalidOffsetException =>
                val startOffset = segment.baseOffset
                warn("Found invalid offset during recovery. Deleting the corrupt segment and " +
                  s"creating an empty one with starting offset $startOffset")
                segment.truncateTo(startOffset)
            }
          // 如果有无效的消息导致被截断的字节数不为0,直接删除剩余的日志段对象
          if (truncatedBytes > 0) {
            // we had an invalid message, delete all remaining log
            warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
            removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
            truncated = true
          }
        }
      }
    
      // 这些都做完之后,如果日志段集合不为空
      if (logSegments.nonEmpty) {
        val logEndOffset = activeSegment.readNextOffset
        // 验证分区日志的LEO值不能小于Log Start Offset值,否则删除这些日志段对象
        if (logEndOffset < logStartOffset) {
          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
            "This could happen if segment files were deleted from the file system.")
          removeAndDeleteSegments(logSegments, asyncDelete = true)
        }
      }
    
      // 这些都做完之后,如果日志段集合为空了
      if (logSegments.isEmpty) {
        // no existing segments, create a new mutable segment beginning at logStartOffset
        // 至少创建一个新的日志段,以logStartOffset为日志段的起始位移,并加入日志段集合中
        addSegment(LogSegment.open(dir = dir,
          baseOffset = logStartOffset,
          config,
          time = time,
          fileAlreadyExists = false,
          initFileSize = this.initFileSize,
          preallocate = config.preallocate))
      }
      // 更新上次恢复点属性,并返回
      recoveryPoint = activeSegment.readNextOffset
      recoveryPoint
    }
    
  • recoverSegment:循环恢复日志分段,在Kafka 日志模块(二)LogSegment一文中进行分析。

Log 初始化完成之后接下来就是对Log的一些操作,比如重要的就是 HW、LogSegment、日志位移、读写日志

2.7.2 HW 高水位

高水位值的初始值是 Log Start Offset 值。每个 Log 对象都会维护一个 Log Start Offset 值。当首次构建高水位时,它会被赋值成 Log Start Offset 值。它是表示消费者只能看到 <= HW 的值。

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

可以看到它是一个 LogOffsetMetadata 对象

case class LogOffsetMetadata(messageOffset: Long,
                             segmentBaseOffset: Long = Log.UnknownOffset,
                             relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {

  def onOlderSegment(that: LogOffsetMetadata): Boolean = {
    if (messageOffsetOnly)
      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")

    this.segmentBaseOffset < that.segmentBaseOffset
  }

  def onSameSegment(that: LogOffsetMetadata): Boolean = {
    if (messageOffsetOnly)
      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")

    this.segmentBaseOffset == that.segmentBaseOffset
  }

  def offsetDiff(that: LogOffsetMetadata): Long = {
    this.messageOffset - that.messageOffset
  }

  def positionDiff(that: LogOffsetMetadata): Int = {
    if(!onSameSegment(that))
      throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment")
    if(messageOffsetOnly)
      throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")

    this.relativePositionInSegment - that.relativePositionInSegment
  }

  def messageOffsetOnly: Boolean = {
    segmentBaseOffset == Log.UnknownOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition
  }
  • messageOffset 参数:消息位移值,这是最重要的信息。我们总说高水位值,其实指的就是这个变量的值。

  • segmentBaseOffset 参数:保存该位移值所在日志段的起始位移。日志段起始位移值辅助计算两条消息在物理磁盘文件中位置的差值,即两条消息彼此隔了多少字节。

    • 这个计算有个前提条件,即两条消息必须处在同一个日志段对象上,不能跨日志段对象。否则它们就位于不同的物理文件上,计算这个值就没有意义了。
    • 这里的 segmentBaseOffset,就是用来判断两条消息是否处于同一个日志段的。
  • relativePositionInSegment 参数:保存该位移值所在日志段的物理磁盘位置。这个字段在计算两个位移值之间的物理磁盘位置差值时非常有用。

  • onOlderSegment( )方法:判断给定的 LogOffsetMetadata 对象是否是在新的日志段

  • onSameSegment( )方法:来判断给定的两个 LogOffsetMetadata 对象是否处于同一个日志段的

  • offsetDiff( )方法:计算两个LogOffsetMetadata 对象中隔了多少消息(offset 差值)

  • positionDiff( )方法:计算两个 LogOffsetMetadata 对象的物理位置差值。

2.7.2.1 获取和设置高水位

// getter method:读取高水位的位移值
def highWatermark: Long = highWatermarkMetadata.messageOffset

// setter method:设置高水位值
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
    if (newHighWatermark.messageOffset < 0) // 高水位值不能是负数
      throw new IllegalArgumentException("High watermark offset should be non-negative")

    lock synchronized { // 保护Log对象修改的Monitor锁
      highWatermarkMetadata = newHighWatermark // 赋值新的高水位值
      producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) // 处理事务状态管理器的高水位值更新逻辑,忽略它……
      maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事务机制的一部分
    }
    trace(s"Setting high watermark $newHighWatermark")
  }

2.7.2.2 更新高水位

// updateHighWatermark method
def updateHighWatermark(hw: Long): Long = {
    // 新高水位值一定介于[Log Start Offset,Log End Offset]之间
    val newHighWatermark = if (hw < logStartOffset)  
      logStartOffset
    else if (hw > logEndOffset)
      logEndOffset
    else
  hw
    // 调用Setter方法来更新高水位值
    updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
    newHighWatermark  // 最后返回新高水位值
  }

// maybeIncrementHighWatermark method
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
    // 新高水位值不能越过Log End Offset
    if (newHighWatermark.messageOffset > logEndOffset)
      throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
        s"log end offset $logEndOffsetMetadata")

    lock.synchronized {
      val oldHighWatermark = fetchHighWatermarkMetadata  // 获取老的高水位值

      // 新高水位值要比老高水位值大以维持单调增加特性,否则就不做更新!
      // 另外,如果新高水位值在新日志段上,也可执行更新高水位操作
      if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
        (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
        updateHighWatermarkMetadata(newHighWatermark)
        Some(oldHighWatermark) // 返回老的高水位值
      } else {
        None
      }
    }
  }

更新高水位有两个方法一个是 updateHighWatermarkmaybeIncrementHighWatermark

  • updateHighWatermark:主要是Follower 副本从 Leader 副本获取到消息后更新高水位值。一旦拿到立即更新
  • maybeIncrementHighWatermark:主要是Leader 副本接收Producer 消息,它不会立即更新,而是要根据一定的条件来会更新。
    • 新高水位值要比老高水位值大以维持单调增加特性,否则就不做更新。
    • 如果新高水位值在新日志段上,也可执行更新高水位操作。

2.7.2.3 读取高水位

private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
    checkIfMemoryMappedBufferClosed() // 读取时确保日志不能被关闭

    val offsetMetadata = highWatermarkMetadata // 保存当前高水位值到本地变量,避免多线程访问干扰
    if (offsetMetadata.messageOffsetOnly) { //没有获得到完整的高水位元数据
      lock.synchronized {
        val fullOffset = convertToOffsetMetadataOrThrow(highWatermark) // 通过读日志文件的方式把完整的高水位元数据信息拉出来
        updateHighWatermarkMetadata(fullOffset) // 然后再更新一下高水位对象
        fullOffset
      }
    } else { // 否则,直接返回即可
      offsetMetadata
    }
  }

2.7.3 LogSegment 日志分段

通过Java中的 ConcurrentSkipListMap 进行存储,线程安全。

private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

//主要方法
def deleteOldSegments(): Int = {
    if (config.delete) {
      deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
    } else {
      deleteLogStartOffsetBreachedSegments()
    }
  }


// deleteRetentionMsBreachedSegments 方法
private def deleteRetentionMsBreachedSegments(): Int = {
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds
    deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
      reason = s"retention time ${config.retentionMs}ms breach")
  }

// deleteRetentionSizeBreachedSegments 方法
private def deleteRetentionSizeBreachedSegments(): Int = {
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    var diff = size - config.retentionSize
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
      if (diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }

    deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
  }

// deleteLogStartOffsetBreachedSegments 方法
private def deleteLogStartOffsetBreachedSegments(): Int = {
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)

    deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
  }

这三个方法都调用了 deleteOldSegments 方法

private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
    lock synchronized {
      //① 使用传入的函数计算哪些日志段对象能够被删除
      val deletable = deletableSegments(predicate)
      if (deletable.nonEmpty)
        info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
      //② 删除这些日志段
      deleteSegments(deletable)
    }
  }
  • ① 使用传入的函数计算哪些日志段对象能够被删除
    private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
      // 如果当前压根就没有任何日志段对象,直接返回
      if (segments.isEmpty) {
        Seq.empty
      } else {
        val deletable = ArrayBuffer.empty[LogSegment]
        var segmentEntry = segments.firstEntry
        // 从具有最小起始位移值的日志段对象开始遍历,直到满足以下条件之一便停止遍历:
        //1. 测定条件函数predicate = false
        //2. 扫描到包含Log对象高水位值所在的日志段对象
        //3. 最新的日志段对象不包含任何消息
        //最新日志段对象是segments中Key值最大对应的那个日志段,也就是我们常说的Active Segment。
        //完全为空的Active Segment如果被允许删除,后面还要重建它,故代码这里不允许删除大小为空的Active Segment。
        //在遍历过程中,同时不满足以上3个条件的所有日志段都是可以被删除的!
        while (segmentEntry != null) {
          val segment = segmentEntry.getValue
          val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
          val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
            (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
          else
            (null, logEndOffset, segment.size == 0)
    
          if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
            deletable += segment
            segmentEntry = nextSegmentEntry
          } else {
            segmentEntry = null
          }
        }
        deletable
      }
    }
    
  • ② 删除这些日志段
    private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
      maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
        val numToDelete = deletable.size
        if (numToDelete > 0) {
          // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
          // 不允许删除所有日志段对象。如果一定要做,先创建出一个新的来,然后再把前面N个删掉
          if (segments.size == numToDelete)
            roll()
          lock synchronized {
            checkIfMemoryMappedBufferClosed()// 确保Log对象没有被关闭
            // remove the segments for lookups
            // 删除给定的日志段对象以及底层的物理文件
            removeAndDeleteSegments(deletable, asyncDelete = true)
            // 尝试更新日志的Log Start Offset值
            maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
          }
        }
        numToDelete
      }
    }
    

2.7.4 日志位移

Log 对象维护了一些关键位移值数据,比如 Log Start OffsetLEO 等。

//LEO的定义
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _

//更新 LEO 的方法
private def updateLogEndOffset(offset: Long): Unit = {
  nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size)
  if (highWatermark >= offset) {
    updateHighWatermarkMetadata(nextOffsetMetadata)
  }
  if (this.recoveryPoint > offset) {
    this.recoveryPoint = offset
  }
}
  • LEO 会被更新的时机

    • Log 对象初始化时:当 Log 对象初始化时,我们必须要创建一个 LEO 对象,并对其进行初始化。
    • 写入新消息时:以上面的图为例,当不断向 Log 对象插入新消息时,LEO 值就像一个指针一样,需要不停地向右移动,也就是不断地增加。
    • Log 对象发生日志切分(Log Roll)时:在创建一个新的 LogSegment 的时候,并且关闭当前写入的日志段对象。这通常发生在当前日志段对象已满的时候。一旦发生日志切分,说明 Log 对象切换了 Active Segment,那么,LEO 中的起始位移值和段大小数据都要被更新,因此,在进行这一步操作时,我们必须要更新 LEO 对象。
    • 日志截断(Log Truncation)时:日志中的部分消息被删除了,自然可能导致 LEO 值发生变化,从而要更新 LEO 对象。
  • Log Start Offset 会被更新的时机

    • Log 对象初始化时:和 LEO 类似,Log 对象初始化时要给 Log Start Offset 赋值,一般是将第一个日志段的起始位移值赋值给它。
    • 日志截断时:同理,一旦日志中的部分消息被删除,可能会导致 Log Start Offset 发生变化,因此有必要更新该值。
    • Follower 副本同步时:一旦 Leader 副本的 Log 对象的 Log Start Offset 值发生变化。为了维持和 Leader 副本的一致性,Follower 副本也需要尝试去更新该值。
    • 删除日志段时:这个和日志截断是类似的。凡是涉及消息删除的操作都有可能导致 Log Start Offset 值的变化。删除消息时:严格来说,这个更新时机有点本末倒置了。在 Kafka 中,删除消息就是通过抬高 Log Start Offset 值来实现的,因此,删除消息时必须要更新该值。

2.7.5 读写操作

append 写操作

添加消息到 Log 中,相关写操作有3个: appendAsLeaderappendAsFollowerappend ,调用关系如下:

  • appendAsLeader 是用于写 Leader 副本的
  • appendAsFollower 是用于 Follower 副本同步的。
  • 它们的底层都调用了 append 方法。

下图是 append 方法的执行流程:

private def append(records: MemoryRecords,
                     origin: AppendOrigin,
                     interBrokerProtocolVersion: ApiVersion,
                     assignOffsets: Boolean,
                     leaderEpoch: Int): LogAppendInfo = {
    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
      // ① 分析和验证待写入消息集合,并返回校验结果,验证消息的长度、crc32校验码、内部offset 是否单调递增,都是对外层消息进行的,并不会解压内部的压缩消息
      val appendInfo = analyzeAndValidateRecords(records, origin)

      // return if we have no valid messages or if this is a duplicate of the last appended entry
      // 如果压根就不需要写入任何消息,直接返回即可(也就是说在第①步中没有发现有message 这个这里 shallowCount 是0)
      if (appendInfo.shallowCount == 0)
        return appendInfo

      // trim any invalid bytes or partial messages before appending it to the on-disk log
      // ② 消息格式规整,即删除无效格式消息或无效字节,根据步骤 ① 方法返回的LogAppendInfo 对象,将未通过验证的消息截断
      var validRecords = trimInvalidBytes(records, appendInfo)

      // they are valid, insert them in the log
      lock synchronized {
        checkIfMemoryMappedBufferClosed()//确保log对象未关闭
        if (assignOffsets) {//需要分配位移
          // assign offsets to the message set
          //③ 使用当前LEO值作为待写入消息集合的第一条消息位移值
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
          appendInfo.firstOffset = Some(offset.value)
          val now = time.milliseconds
          val validateAndOffsetAssignResult = try {
            LogValidator.validateMessagesAndAssignOffsets(validRecords,
              topicPartition,
              offset,
              time,
              now,
              appendInfo.sourceCodec,
              appendInfo.targetCodec,
              config.compact,
              config.messageFormatVersion.recordVersion.value,
              config.messageTimestampType,
              config.messageTimestampDifferenceMaxMs,
              leaderEpoch,
              origin,
              interBrokerProtocolVersion,
              brokerTopicStats)
          } catch {
            case e: IOException =>
              throw new KafkaException(s"Error validating messages while appending to log $name", e)
          }
          //更新校验结果对象类 LogAppendInfo
          validRecords = validateAndOffsetAssignResult.validatedRecords
          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
          appendInfo.lastOffset = offset.value - 1
          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
            appendInfo.logAppendTime = now

          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
          // format conversion)
          //④ 验证消息,保证消息大小不超限
          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
            for (batch <- validRecords.batches.asScala) {
              if (batch.sizeInBytes > config.maxMessageSize) {
                // we record the original message set size instead of the trimmed size
                // to be consistent with pre-compression bytesRejectedRate recording
                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
              }
            }
          }
        } else {//直接使用给定的移位值,无需自己分配位移值
          // we are taking the offsets we are given
          if (!appendInfo.offsetsMonotonic)//确保消息移植的单调递增性
            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
                                                 records.records.asScala.map(_.offset))

          if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
            // we may still be able to recover if the log is empty
            // one example: fetching from log start offset on the leader which is not batch aligned,
            // which may happen as a result of AdminClient#deleteRecords()
            val firstOffset = appendInfo.firstOffset match {
              case Some(offset) => offset
              case None => records.batches.asScala.head.baseOffset()
            }

            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
            throw new UnexpectedAppendOffsetException(
              s"Unexpected offset in append to $topicPartition. $firstOrLast " +
              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
              firstOffset, appendInfo.lastOffset)
          }
        }

        // update the epoch cache with the epoch stamped onto the message by the leader
        //⑤ 更新Leader Epoch缓存
        validRecords.batches.asScala.foreach { batch =>
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
          } else {
            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
            // to truncation by high watermark after the next leader election.
            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
              cache.clearAndFlush()
            }
          }
        }

        // check messages set size may be exceed config.segmentSize
        //⑥ 确保消息大小不超限
        if (validRecords.sizeInBytes > config.segmentSize) {
          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
        }

        // maybe roll the log if this segment is full
        //⑦ 执行日志切分,当前日志断剩余容量可能无法容纳新消息集合,因此要创建一个(如果满足需要创建一个新的activeSegment,然后返回当前的activeSegment)
        // 1. 当前activeSegment 的日志大小加上本次待追加的消息集合大小,超过配置的LogSegment 的最大长度
        // 2. 当前activeSegment 的寿命超过了配置的 LogSegment 最长存活时间
        // 3. 索引文件满了
        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)

        val logOffsetMetadata = LogOffsetMetadata(
          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
          segmentBaseOffset = segment.baseOffset,
          relativePositionInSegment = segment.size)

        // now that we have valid records, offsets assigned, and timestamps updated, we need to
        // validate the idempotent/transactional state of the producers and collect some metadata
        //⑧ 验证事务状态
        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
          logOffsetMetadata, validRecords, origin)

        maybeDuplicate.foreach { duplicate =>
          appendInfo.firstOffset = Some(duplicate.firstOffset)
          appendInfo.lastOffset = duplicate.lastOffset
          appendInfo.logAppendTime = duplicate.timestamp
          appendInfo.logStartOffset = logStartOffset
          return appendInfo
        }
        //⑨:执行真正的消息写入操作,主要调用日志段对象的append方法实现
        segment.append(largestOffset = appendInfo.lastOffset,
          largestTimestamp = appendInfo.maxTimestamp,
          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
          records = validRecords)

        // ⑩:更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1
        //前面说过,LEO值永远指向下一条不存在的消息
        updateLogEndOffset(appendInfo.lastOffset + 1)

        // update the producer state
        //⑪:更新事务状态
        for (producerAppendInfo <- updatedProducers.values) {
          producerStateManager.update(producerAppendInfo)
        }

        // update the transaction index with the true last stable offset. The last offset visible
        // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
        for (completedTxn <- completedTxns) {
          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
          segment.updateTxnIndex(completedTxn, lastStableOffset)
          producerStateManager.completeTxn(completedTxn)
        }

        // always update the last producer id map offset so that the snapshot reflects the current offset
        // even if there isn't any idempotent data being written
        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

        // update the first unstable offset (which is used to compute LSO)
        maybeIncrementFirstUnstableOffset()

        trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
          s"first offset: ${appendInfo.firstOffset}, " +
          s"next offset: ${nextOffsetMetadata.messageOffset}, " +
          s"and messages: $validRecords")

        // 是否需要手动落盘。一般情况下我们不需要设置Broker端参数log.flush.interval.messages
        // 落盘操作交由操作系统来完成。但某些情况下,可以设置该参数来确保高可靠性
        if (unflushedMessages >= config.flushInterval)
          flush()

        //⑫ 返回写入结果
        appendInfo
      }
    }
  }
  • ① 分析和验证待写入消息集合,并返回校验结果,验证消息的长度、crc32校验码、内部offset 是否单调递增,都是对外层消息进行的,并不会解压内部的压缩消息

    private def analyzeAndValidateRecords(records: MemoryRecords, origin: AppendOrigin): LogAppendInfo = {
      var shallowMessageCount = 0 //记录外层消息的数量
      var validBytesCount = 0 //记录通过验证的 records 的字节数之和
      var firstOffset: Option[Long] = None //记录第一条消息的offset,就是baseOffset
      var lastOffset = -1L //记录最后一条消息的offset
      var sourceCodec: CompressionCodec = NoCompressionCodec //消息的压缩方法
      var monotonic = true //标识生产者为消息分配的内部 offset 是否单调递增,使用浅层迭代器进行迭代,如果是压缩消息,并不会解压缩
      var maxTimestamp = RecordBatch.NO_TIMESTAMP //最大的时间戳
      var offsetOfMaxTimestamp = -1L //最大时间戳的offset
      var readFirstMessage = false //是否已经读了第一条消息的 offset。也就是这个batch的 baseOffset
      var lastOffsetOfFirstBatch = -1L //和fistOffset 一个意思,只是这个是旧版本 < v2 版本
    
      for (batch <- records.batches.asScala) {
        // we only validate V2 and higher to avoid potential compatibility issues with older clients
        // 消息格式Version 2的消息批次,起始位移值必须从0开始
        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == AppendOrigin.Client && batch.baseOffset != 0)
          throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
            s"be 0, but it is ${batch.baseOffset}")
    
        //一开始readFirstMessage 是 false 会去获取这个 batch 的baseOffset。
        //这里有两种获取方式:
        //1. 如果是老版本的kafka 那么通过这个batch的 lastOffset。
        //2. 如果是新版本的 >= v2,那么在Batch的Header 里面就有这个baseOffset,可以直接获取
        //然后修改 readFirstMessage 为true
        if (!readFirstMessage) {
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
            firstOffset = Some(batch.baseOffset) // 更新firstOffset字段
          lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段
          readFirstMessage = true
        }
    
        // check that offsets are monotonically increasing
        //检查1: 一旦出现当前lastOffset不小于下一个batch的lastOffset,说明上一个batch中有消息的位移值大于后面batch的消息
        // 这违反了位移值单调递增性
        if (lastOffset >= batch.lastOffset)
          monotonic = false
    
        // update the last offset seen
        // 使用当前batch最后一条消息的位移值去更新lastOffset
        lastOffset = batch.lastOffset
    
        // Check if the message sizes are valid.
        //检查2: 检查消息批次总字节数大小是否超限,即是否大于Broker端参数max.message.bytes值
        val batchSize = batch.sizeInBytes
        if (batchSize > config.maxMessageSize) {
          brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
          brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
          throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
            s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
        }
    
        // check the validity of the message by checking CRC
        //检查3: 执行消息批次校验,包括格式是否正确以及CRC校验
        if (!batch.isValid) {
          brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
          throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.")
        }
    
        // 更新maxTimestamp字段和offsetOfMaxTimestamp
        if (batch.maxTimestamp > maxTimestamp) {
          maxTimestamp = batch.maxTimestamp
          offsetOfMaxTimestamp = lastOffset
        }
    
        // 累加消息批次计数器以及有效字节数,更新shallowMessageCount字段
        shallowMessageCount += 1
        validBytesCount += batchSize
    
        // 从消息批次中获取压缩器类型
        val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
        if (messageCodec != NoCompressionCodec)
          sourceCodec = messageCodec
      }
    
      // Apply broker-side compression if any
      // 获取Broker端设置的压缩器类型,即Broker端参数compression.type值。
      // 该参数默认值是producer,表示sourceCodec用的什么压缩器,targetCodec就用什么
      val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
      // 最后生成LogAppendInfo对象并返回
      LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
        RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
    }
    
  • ② 消息格式规整,即删除无效格式消息或无效字节,根据步骤 方法返回的 LogAppendInfo 对象,将未通过验证的消息截断

    private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = {
      //写入消息总字节数
      val validBytes = info.validBytes
      if (validBytes < 0)
        throw new CorruptRecordException(s"Cannot append record batch with illegal length $validBytes to " +
          s"log for $topicPartition. A possible cause is a corrupted produce request.")
      if (validBytes == records.sizeInBytes) {
        records
      } else {
        // trim invalid bytes
        //超过部分就会被截断
        val validByteBuffer = records.buffer.duplicate()
        validByteBuffer.limit(validBytes)
        MemoryRecords.readableRecords(validByteBuffer)
      }
    }
    
  • ③ 使用当前LEO值作为待写入消息集合的第一条消息位移值

    val offset = new LongRef(nextOffsetMetadata.messageOffset)
    appendInfo.firstOffset = Some(offset.value)
    
  • ④ 验证消息,保证消息大小不超限

  • ⑤ 更新Leader Epoch缓存

  • ⑥ 确保消息大小不超限

  • ⑦ 执行日志切分,当前日志断剩余容量可能无法容纳新消息集合,因此要创建一个(如果满足需要创建一个新的activeSegment,然后返回当前的activeSegment)

      1. 当前 activeSegment 的日志大小加上本次待追加的消息集合大小,超过配置的 LogSegment 的最大长度
      1. 当前 activeSegment 的寿命超过了配置的 LogSegment 最长存活时间
      1. 索引文件满了
    private def maybeRoll(messagesSize: Int, appendInfo: LogAppendInfo): LogSegment = {
      val segment = activeSegment
      val now = time.milliseconds
    
      val maxTimestampInMessages = appendInfo.maxTimestamp
      val maxOffsetInMessages = appendInfo.lastOffset
    
      // 在shouldRoll 方法中 判断是否需要创建一个 segment
      if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) {
          
        appendInfo.firstOffset match {
            //创建新的activeSegment 调用 roll 方法
            //这里的 activeSegment 其实就是 LogSegment,由于是Kafka 是顺序添加的,所以最后一个LogSegment 就是正在被添加 record 的 Segment 称作为 activeSegment
          case Some(firstOffset) => roll(Some(firstOffset))
          case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE))
        }
      } else {
        //通过上面的 shouldRoll 方法得出不需要重新创建activeSegment ,则直接返回
        segment
      }
    }
    
    //是否要重新创建一个 segment
    def shouldRoll(rollParams: RollParams): Boolean = {
      val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
      size > rollParams.maxSegmentBytes - rollParams.messagesSize ||  //条件1 logSegment 满了
        (size > 0 && reachedRollMs) || //条件2 当前activeSegment 的寿命超过了配置的 LogSegment 最长存活时间
        //条件3 索引满了
        offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
    }
    
    
    //这里是roll方法,通过shouldRoll 方法如果满足条件需要创建一个Segment 则会调用此方法
    def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
      maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
        val start = time.hiResClockMs()
        lock synchronized { // 加锁 多Handler 线程操作会引发线程问题
          checkIfMemoryMappedBufferClosed()
          //获取 LEO: LEO是即将要插入的数据 log end offset
          val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
          //新日志文件的文件名是 [LEO].log
          val logFile = Log.logFile(dir, newOffset)
    
          if (segments.containsKey(newOffset)) {
            // segment with the same base offset already exists and loaded
            if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
              // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
              // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0).
              warn(s"Trying to roll a new log segment with start offset $newOffset " +
                   s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
                   s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
                   s" size of offset index: ${activeSegment.offsetIndex.entries}.")
              removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true)
            } else {
              throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
                                       s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
                                       s"segment is ${segments.get(newOffset)}.")
            }
          } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
            throw new KafkaException(
              s"Trying to roll a new log segment for topic partition $topicPartition with " +
              s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
          } else {
            val offsetIdxFile = offsetIndexFile(dir, newOffset)
            val timeIdxFile = timeIndexFile(dir, newOffset)
            val txnIdxFile = transactionIndexFile(dir, newOffset)
    
            for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
              warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
              Files.delete(file.toPath)
            }
    
            Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
          }
    
          producerStateManager.updateMapEndOffset(newOffset)
          producerStateManager.takeSnapshot()
    
          //新创建的LogSegment
          val segment = LogSegment.open(dir,
            baseOffset = newOffset,
            config,
            time = time,
            fileAlreadyExists = false,
            initFileSize = initFileSize,
            preallocate = config.preallocate)
          //将新创建的Segment 添加到 segments 这个跳表中。
          addSegment(segment)
    
          //更新 nextOffsetMetadata 这次更新的目的是为了更新其中记录的 activeSegment.baseOffset 和 activeSegment.size ,而 LEO 并不会改变。
          updateLogEndOffset(nextOffsetMetadata.messageOffset)
    
          // schedule an asynchronous flush of the old segment
          //定时任务 执行flush 操作
          scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
    
          info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")
    
          segment //返回新建的 activeSegment
        }
      }
    }
    
  • ⑧ 验证事务状态

  • ⑨:执行真正的消息写入操作,主要调用 LogSegment 对象的 append 方法实现

    segment.append(largestOffset = appendInfo.lastOffset,
            largestTimestamp = appendInfo.maxTimestamp,
            shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
            records = validRecords)
    
  • ⑩:更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1

    private def updateLogEndOffset(offset: Long): Unit = {
      nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size)
    
      // Update the high watermark in case it has gotten ahead of the log end offset following a truncation
      // or if a new segment has been rolled and the offset metadata needs to be updated.
      if (highWatermark >= offset) {
        updateHighWatermarkMetadata(nextOffsetMetadata)
      }
    
      if (this.recoveryPoint > offset) {
        this.recoveryPoint = offset
      }
    }
    
    • 更新 HW2.7.2.2 更新高水位小节中有分析
  • ⑪:更新事务状态

  • ⑫:返回写入结果


read

def read(startOffset: Long,
           maxLength: Int,
           isolation: FetchIsolation,
           minOneMessage: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")

      val includeAbortedTxns = isolation == FetchTxnCommitted

      // 读取消息时没有使用Monitor锁同步机制,因此这里取巧了,用本地变量的方式把LEO对象保存起来,避免争用(race condition)
      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = nextOffsetMetadata.messageOffset
      //① 如果从LEO处开始读取,那么自然不会返回任何数据,直接返回空消息集合即可
      if (startOffset == endOffset)
        return emptyFetchDataInfo(endOffsetMetadata, includeAbortedTxns)

      //② 找到startOffset值所在的日志段对象。注意要使用floorEntry方法
      var segmentEntry = segments.floorEntry(startOffset)

      // return error on attempt to read beyond the log end offset or read below log start offset
      //③ 满足以下条件之一将被视为消息越界,即你要读取的消息不在该Log对象中:
      // 1. 要读取的消息位移超过了LEO值
      // 2. 没找到对应的日志段对象
      // 3. 要读取的消息在Log Start Offset之下,同样是对外不可见的消息
      if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments in the range $logStartOffset to $endOffset.")

      //④ 查看一下读取隔离级别设置。
      // 普通消费者能够看到[Log Start Offset, LEO)之间的消息
      // 事务型消费者只能看到[Log Start Offset, Log Stable Offset]之间的消息。Log Stable Offset(LSO)是比LEO值小的位移值,为Kafka事务使用
      // Follower副本消费者能够看到[Log Start Offset,高水位值]之间的消息
      val maxOffsetMetadata = isolation match {
        case FetchLogEnd => nextOffsetMetadata
        case FetchHighWatermark => fetchHighWatermarkMetadata
        case FetchTxnCommitted => fetchLastStableOffsetMetadata
      }

      //⑤ 如果要读取的起始位置超过了能读取的最大位置,返回空的消息集合,因为没法读取任何消息
      if (startOffset > maxOffsetMetadata.messageOffset) {
        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
      }

      //⑥ 开始遍历日志段对象,直到读出东西来或者读到日志末尾
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue

        val maxPosition = {
          // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
            maxOffsetMetadata.relativePositionInSegment
          } else {
            segment.size
          }
        }

        //⑦ 调用Segment.read方法执行真正的读取消息操作
        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
        if (fetchInfo == null) { // 如果没有返回任何消息,去下一个日志段对象试试
          segmentEntry = segments.higherEntry(segmentEntry.getKey)
        } else { // 否则返回
          return if (includeAbortedTxns)
            addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
          else
            fetchInfo
        }
      }

      //⑧ 已经读到日志末尾还是没有数据返回,只能返回空消息集合
      FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
    }
  }
  • startOffset 参数:开始读取消息的offset位置

  • maxLength 参数:最大读取多少字节

  • isolation 参数:提取隔离,控制我们允许读取的最大偏移量,它是 FetchIsolation 有三个实现类如下:

    • FetchHighWatermark: 只能消费到 HW 的位置
    • FetchLogEnd: 能消费到LEO的位置
    • FetchTxnCommitted: 只能消费到事务提交的offset的位置。
  • minOneMessage 参数:拉取的消息如果超过 maxLength 设置的值,是否要返回,true是返回。

  • ① 如果从LEO处开始读取,那么自然不会返回任何数据,直接返回空消息集合即可

  • ② 找到startOffset值所在的日志段对象。注意要使用floorEntry方法

  • ③ 满足以下条件之一将被视为消息越界,即你要读取的消息不在该Log对象中:

    1. 要读取的消息位移超过了LEO值
    2. 没找到对应的日志段对象
    3. 要读取的消息在Log Start Offset之下,同样是对外不可见的消息
  • ④ 查看一下读取隔离级别设置。根据 isolation 参数

  • ⑤ 如果要读取的起始位置超过了能读取的最大位置,返回空的消息集合,因为没法读取任何消息

  • ⑥ 开始遍历日志段对象,直到读出东西来或者读到日志末尾

  • ⑦ 调用Segment#read方法执行真正的读取消息操作

  • ⑧ 已经读到日志末尾还是没有数据返回,只能返回空消息集合


由于篇幅问题,在 跳表在 Kafka 中的实现 一文中会详细分析跳表在Kafka中的使用。


参考:

Kafka-2.5.0 源码

Kafka 官网

《Apache Kafka源码剖析》

《深入理解Kafka:核心设计与实践》