Kafka 延迟操作(一)DelayedOperationPurgatory

2020年5月7日 | 作者 Siran | 3800字 | 阅读大约需要8分钟
归档于 消息队列 | 标签 #kafka

概述

Kafka中存在大量的延迟操作,比如延迟生产、延迟拉取以及延迟删除等,DelayedOperationPurgatory 则是来管理这些延迟操作的。

Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)

Kafka中的时间轮TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask,并且提供多级时间轮的概念。

如下图所示:

时间轮的描述可以参考厮大的文章 Kafka解惑之时间轮(TimingWheel)


源码分析

根据上面对时间轮的描述,那么现在通过源码来看看时间轮的具体实现。

TimingWheel

时间轮的具体实现

TimingWheel 属性

private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {

  private[this] val interval = tickMs * wheelSize
  
  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }

  private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs

  @volatile private[this] var overflowWheel: TimingWheel = null
}  
  • tickMs:当前时间轮中一个时间格表示的时间跨度
  • wheelSize: 当前时间轮的格数,也是buckets 数组的大小
  • taskCounter:各层级时间轮中任务的总数
  • startMs:当前时间轮创建的时间
  • queue:DelayQueue 类型,整个层级时间轮共用一个任务队列,其元素类型就是 TimerTaskList
  • interval:当前时间轮的跨度。当前时间轮只能处理 currentTime ~ currentTime + tickMs * wheelSize 之间的定时任务,超过了这个范围,则需要将任务添加到上层时间轮中
  • buckets:每一项都对应时间轮中的一个时间格,用于保存 TimerTaskList 的数据,在TimeWheel 中,同一个 TimerTaskList 中的不同定时任务的到期时间可能不同,但是相差时间在一个时间格的范围内
  • currentTime:时间轮的指针,将整个时间轮划分为到期部分和未到期部分。在初始化时,currentTime 被修剪成 tickMs 的倍数,近似等于创建时间,但并不是严格的创建时间。
  • overflowWheel:上层时间轮的引用

add 方法 向时间轮中添加定时任务,也会检查待添加的任务是否已经到期

def add(timerTaskEntry: TimerTaskEntry): Boolean = {
    val expiration = timerTaskEntry.expirationMs

    //如果任务已经取消,则直接返回 false
    if (timerTaskEntry.cancelled) {
      // Cancelled
      false
    //任务已经到期,则直接返回 false
    } else if (expiration < currentTime + tickMs) {
      // Already expired
      false
    //任务在当前时间轮的跨度范围内
    } else if (expiration < currentTime + interval) {
      // Put in its own bucket
      //按照任务的到期时间查找此任务属于的时间格,并将任务添加到对应的 TimerTaskList 中
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry)

      // Set the bucket expiration time
      //设置bucket 的到期时间
      if (bucket.setExpiration(virtualId * tickMs)) {
        queue.offer(bucket)
      }
      true
    //如果超过了当前时间轮最大的跨度范围,则将任务添加到上层时间轮处理
    } else {
      // Out of the interval. Put it into the parent timer
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTaskEntry)
    }
  }

addOverflowWheel 方法 创建上层时间轮,上层时间轮的 tickMs 是当前整个时间轮的时间跨度 interval

private[this] def addOverflowWheel(): Unit = {
    synchronized {
      if (overflowWheel == null) {
        //创建上层时间轮,注意,上层时间轮的tickMs更大, wheelSize 不变,则表示时间跨度也就越大
        //随着上层时间轮的表针转动,任务还是会回到最底层的时间轮上,等待最终的超时。
        overflowWheel = new TimingWheel(
          tickMs = interval,
          wheelSize = wheelSize,
          startMs = currentTime,
          taskCounter = taskCounter,//全局唯一的任务计数器
          queue//全局唯一的任务队列
        )
      }
    }
  }

advanceClock 方法 尝试推进当前时间轮的表针 currentTime,同时也会尝试推进上层的时间轮的表针,随着当前时间轮的表针不断被推进,上层时间轮的表针也早晚会被推进成功

def advanceClock(timeMs: Long): Unit = {
    //尝试移动表征 currentTime ,推进可能不止一格
    if (timeMs >= currentTime + tickMs) {
      currentTime = timeMs - (timeMs % tickMs)

      // Try to advance the clock of the overflow wheel if present
      //尝试推进上层时间轮的表针
      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
    }
  }

SystemTimer

SystemTimer 是 Kafka 中的定时器实现,它在 TimingWheel 的基础上添加了执行到期任务堵塞等待最近到期任务的功能。

SystemTimer 属性

class SystemTimer(executorName: String,
                  tickMs: Long = 1,
                  wheelSize: Int = 20,
                  startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {

  // timeout timer
  private[this] val taskExecutor = Executors.newFixedThreadPool(1,
    (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))

  private[this] val delayQueue = new DelayQueue[TimerTaskList]()

  private[this] val taskCounter = new AtomicInteger(0)
  
  private[this] val timingWheel = new TimingWheel(
    tickMs = tickMs,//1
    wheelSize = wheelSize,//20
    startMs = startMs,
    taskCounter = taskCounter,
    delayQueue
  )

  // Locks used to protect data structures while ticking
  private[this] val readWriteLock = new ReentrantReadWriteLock()
  private[this] val readLock = readWriteLock.readLock()
  private[this] val writeLock = readWriteLock.writeLock()
}  
  • taskExecutor:jdk 提供的固定线程池
  • delayQueue:各个层级的时间轮共用的 DelayQueue 队列,主要作用是堵塞推进时间轮表针的线程(ExpiredOperationReaper),等待最近到期的任务到期。
  • taskCounter:各个层级时间轮共用的任务个数计数器
  • timingWheel:最底层的时间轮
  • readWriteLock:读写锁

add 方法 添加任务,如果任务未到期,则调用 TimingWheel 的 add 方法添加到时间轮中等待后期执行。

def add(timerTask: TimerTask): Unit = {
    readLock.lock()
    try {
      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
    } finally {
      readLock.unlock()
    }
  }

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
    //向时间轮提交添加任务失败,任务可能已经到期或已经取消
    if (!timingWheel.add(timerTaskEntry)) {
      // Already expired or cancelled
      if (!timerTaskEntry.cancelled)
        //将到期任务提交到 taskExecutor 执行
        taskExecutor.submit(timerTaskEntry.timerTask)
    }
  }  

advanceClock 方法

def advanceClock(timeoutMs: Long): Boolean = {
    //堵塞等待
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
    //在堵塞期间,有TimerTaskList 到期
    if (bucket != null) {
      writeLock.lock()
      try {
        while (bucket != null) {
          //推进时间轮表针
          timingWheel.advanceClock(bucket.getExpiration())
          //尝试将 bucket 中的任务重新添加到时间轮,此过程不一定是将任务提交给 taskExecutor 执行,
          //对未到期的任务只是从原来的时间轮降级到下层时间轮继续等待
          bucket.flush(reinsert)
          //不会堵塞
          bucket = delayQueue.poll()
        }
      } finally {
        writeLock.unlock()
      }
      true
    } else {
      false
    }
  }

//重新添加 timerTaskEntry 到时间轮中
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)  

DelayedOperation

Kafka 通过 SystemTimer 来定期检查请求是否超时,但这些请求真正的目的不是为了超时执行,而是为了满足其他条件后执行,比如说生产者发送消息消费者消费消息等。这些需求SystemTimer 就无法满足了,所以提供了 DelayedOperation 这个抽象类,来满足这些延迟操作。

下图就是 DelayedOperation 的一些实现类:

//模板方法 由子类具体的实现。
abstract class DelayedOperation(override val delayMs: Long,
                                lockOpt: Option[Lock] = None)
  extends TimerTask with Logging {

  //用来表示 TimerTask 是否完成
  private val completed = new AtomicBoolean(false)
  //用来表示 TimerTask 正在执行 maybeTryComplete 方法,尝试完成
  private val tryCompletePending = new AtomicBoolean(false)
  // Visible for testing
  private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)

  /*
   * Force completing the delayed operation, if not already completed.
   * This function can be triggered when
   *
   * 1. The operation has been verified to be completable inside tryComplete()
   * 2. The operation has expired and hence needs to be completed right now
   *
   * Return true iff the operation is completed by the caller: note that
   * concurrent threads can try to complete the same operation, but only
   * the first thread will succeed in completing the operation and return
   * true, others will still return false
   */
  def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
      // cancel the timeout timer
      cancel()
      onComplete()
      true
    } else {
      false
    }
  }

  /**
   * Check if the delayed operation is already completed
   */
  def isCompleted: Boolean = completed.get()

  /**
   * Call-back to execute when a delayed operation gets expired and hence forced to complete.
   */
  def onExpiration(): Unit

  /**
   * Process for completing an operation; This function needs to be defined
   * in subclasses and will be called exactly once in forceComplete()
   */
  def onComplete(): Unit

  /**
   * Try to complete the delayed operation by first checking if the operation
   * can be completed by now. If yes execute the completion logic by calling
   * forceComplete() and return true iff forceComplete returns true; otherwise return false
   *
   * This function needs to be defined in subclasses
   */
  def tryComplete(): Boolean

  /**
   * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired
   * without blocking.
   *
   * If threadA acquires the lock and performs the check for completion before completion criteria is met
   * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not
   * yet released the lock, we need to ensure that completion is attempted again without blocking threadA
   * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one
   * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that
   * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until
   * the operation is actually completed.
   */
  private[server] def maybeTryComplete(): Boolean = {
    var retry = false
    var done = false
    do {
      if (lock.tryLock()) {
        try {
          tryCompletePending.set(false)
          done = tryComplete()
        } finally {
          lock.unlock()
        }
        // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
        // `tryCompletePending`. In this case we should retry.
        retry = tryCompletePending.get()
      } else {
        // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
        // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
        // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
        // released the lock and returned by the time the flag is set.
        retry = !tryCompletePending.getAndSet(true)
      }
    } while (!isCompleted && retry)
    done
  }

  /*
   * run() method defines a task that is executed on timeout
   */
  override def run(): Unit = {
    if (forceComplete())
      onExpiration()
  }
}

DelayedOperation 因为到期而被提交到SystemTimer.taskExecutor 线程池中执行,也可能在其他线程检测其执行条件时发现已经满足执行条件,而将其执行。

如下图所示:


DelayedOperationPurgatory

DelayedOperationPurgatory 它是一个辅助类,提供管理 DelayedOperation 。

DelayedOperationPurgatory 属性

final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
                                                             timeoutTimer: Timer,
                                                             brokerId: Int = 0,
                                                             purgeInterval: Int = 1000,
                                                             reaperEnabled: Boolean = true,
                                                             timerEnabled: Boolean = true)
        extends Logging with KafkaMetricsGroup {
  /* a list of operation watching keys */
  private class WatcherList {
    //
    val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))

    val watchersLock = new ReentrantLock()

    /*
     * Return all the current watcher lists,
     * note that the returned watchers may be removed from the list by other threads
     */
    def allWatchers = {
      watchersByKey.values
    }
  }

  private val watcherLists = Array.fill[WatcherList](DelayedOperationPurgatory.Shards)(new WatcherList)
  private def watcherList(key: Any): WatcherList = {
    watcherLists(Math.abs(key.hashCode() % watcherLists.length))
  }

  // the number of estimated total operations in the purgatory
  //
  private[this] val estimatedTotalOperations = new AtomicInteger(0)

  /* background thread expiring operations that have timed out */
  private val expirationReaper = new ExpiredOperationReaper()
}  
  • timeoutTimer:systemTimer对象
  • watchersByKey:管理 Watchers 的pool 对象,底层是ConcurrentHashMap
  • estimatedTotalOperations:记录了该 DelayedOperationPurgatory 中的 DelayedOperation 个数
  • expirationReaper:一个ShutdownableThread 线程对象,主要有两个功能:
    • 一个是推进时间轮表针
    • 二是定期清理 watchersByKey 中已完成的 DelayedOperation。
private class ExpiredOperationReaper extends ShutdownableThread(
    "ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
    false) {

    override def doWork(): Unit = {
      //最长堵塞时间200ms
      advanceClock(200L)
    }
  }
def advanceClock(timeoutMs: Long): Unit = {
    //调用SystemTimer.advanceClock 方法尝试推进时间轮的表针
    timeoutTimer.advanceClock(timeoutMs)

    // DelayedOperation 到期后 被Systemer.taskExecutor完成后,并不会通知 DelayedOperationPurgatory 删除DelayedOperation
    // DelayedOperationPurgatory 与 SystemTimer 中的 DelayedOperation 数量相差到一个阈值时,会调用 purgeCompleted 方法执行清理工作
    if (estimatedTotalOperations.get - numDelayed > purgeInterval) {
      estimatedTotalOperations.getAndSet(numDelayed)
      debug("Begin purging watch lists")
      //调用 watchers.purgeCompleted 方法清理已完成的 DelayedOperation
      val purged = watcherLists.foldLeft(0) {
        case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
      }
      debug("Purged %d elements from watch lists.".format(purged))
    }
  }  

tryCompleteElseWatch 方法 检查 DelayedOperation 是否已经完成,若未完成则添加到 watchersByKey 以及 SystemTimer 中。

def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
    assert(watchKeys.nonEmpty, "The watch key list can't be empty")

    //尝试完成延迟操作
    var isCompletedByMe = operation.tryComplete()
    if (isCompletedByMe)//已完成,返回
      return true

    var watchCreated = false
    //传入的key 可能是多个,循环把未完成的添加到 watchersByKey
    for(key <- watchKeys) {
      // If the operation is already completed, stop adding it to the rest of the watcher list.
      if (operation.isCompleted)
        return false
      watchForOperation(key, operation)

      if (!watchCreated) {
        watchCreated = true
        //自增
        estimatedTotalOperations.incrementAndGet()
      }
    }

    //第二次尝试完成
    isCompletedByMe = operation.maybeTryComplete()
    if (isCompletedByMe)
      return true

    // if it cannot be completed by now and hence is watched, add to the expire queue also
    //没完成加入 SystemTimer 时间轮
    if (!operation.isCompleted) {
      if (timerEnabled)
        timeoutTimer.add(operation)
      //完成 删除。
      if (operation.isCompleted) {
        // cancel the timer task
        operation.cancel()
      }
    }

    false
  }