概述
Kafka中存在大量的延迟操作,比如延迟生产、延迟拉取以及延迟删除等,DelayedOperationPurgatory
则是来管理这些延迟操作的。
Kafka并没有使用JDK自带的Timer
或者DelayQueue
来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer
)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n))
,并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)
。
Kafka中的时间轮
(TimingWheel)是一个存储定时任务的环形队列
,底层采用数组
实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList
是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask,并且提供多级时间轮的概念。
如下图所示:
时间轮的描述可以参考厮大的文章 Kafka解惑之时间轮(TimingWheel)
源码分析
根据上面对时间轮的描述,那么现在通过源码来看看时间轮的具体实现。
TimingWheel
时间轮的具体实现
TimingWheel 属性
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
private[this] val interval = tickMs * wheelSize
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
@volatile private[this] var overflowWheel: TimingWheel = null
}
tickMs
:当前时间轮中一个时间格表示的时间跨度wheelSize
: 当前时间轮的格数,也是buckets
数组的大小taskCounter
:各层级时间轮中任务的总数startMs
:当前时间轮创建的时间queue
:DelayQueue 类型,整个层级时间轮共用一个任务队列,其元素类型就是TimerTaskList
interval
:当前时间轮的跨度。当前时间轮只能处理currentTime
~currentTime + tickMs * wheelSize
之间的定时任务,超过了这个范围,则需要将任务添加到上层时间轮中buckets
:每一项都对应时间轮中的一个时间格,用于保存TimerTaskList
的数据,在TimeWheel 中,同一个TimerTaskList
中的不同定时任务的到期时间可能不同,但是相差时间在一个时间格的范围内currentTime
:时间轮的指针,将整个时间轮划分为到期部分和未到期部分。在初始化时,currentTime 被修剪成 tickMs 的倍数,近似等于创建时间,但并不是严格的创建时间。overflowWheel
:上层时间轮的引用
add 方法 向时间轮中添加定时任务,也会检查待添加的任务是否已经到期
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
//如果任务已经取消,则直接返回 false
if (timerTaskEntry.cancelled) {
// Cancelled
false
//任务已经到期,则直接返回 false
} else if (expiration < currentTime + tickMs) {
// Already expired
false
//任务在当前时间轮的跨度范围内
} else if (expiration < currentTime + interval) {
// Put in its own bucket
//按照任务的到期时间查找此任务属于的时间格,并将任务添加到对应的 TimerTaskList 中
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
// Set the bucket expiration time
//设置bucket 的到期时间
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
//如果超过了当前时间轮最大的跨度范围,则将任务添加到上层时间轮处理
} else {
// Out of the interval. Put it into the parent timer
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
addOverflowWheel 方法
创建上层时间轮,上层时间轮的 tickMs
是当前整个时间轮的时间跨度 interval
private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
//创建上层时间轮,注意,上层时间轮的tickMs更大, wheelSize 不变,则表示时间跨度也就越大
//随着上层时间轮的表针转动,任务还是会回到最底层的时间轮上,等待最终的超时。
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,//全局唯一的任务计数器
queue//全局唯一的任务队列
)
}
}
}
advanceClock 方法
尝试推进当前时间轮的表针 currentTime
,同时也会尝试推进上层的时间轮的表针,随着当前时间轮的表针不断被推进,上层时间轮的表针也早晚会被推进成功
def advanceClock(timeMs: Long): Unit = {
//尝试移动表征 currentTime ,推进可能不止一格
if (timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs)
// Try to advance the clock of the overflow wheel if present
//尝试推进上层时间轮的表针
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
SystemTimer
SystemTimer
是 Kafka 中的定时器实现,它在 TimingWheel
的基础上添加了执行到期任务、堵塞等待最近到期任务的功能。
SystemTimer 属性
class SystemTimer(executorName: String,
tickMs: Long = 1,
wheelSize: Int = 20,
startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {
// timeout timer
private[this] val taskExecutor = Executors.newFixedThreadPool(1,
(runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
private[this] val taskCounter = new AtomicInteger(0)
private[this] val timingWheel = new TimingWheel(
tickMs = tickMs,//1
wheelSize = wheelSize,//20
startMs = startMs,
taskCounter = taskCounter,
delayQueue
)
// Locks used to protect data structures while ticking
private[this] val readWriteLock = new ReentrantReadWriteLock()
private[this] val readLock = readWriteLock.readLock()
private[this] val writeLock = readWriteLock.writeLock()
}
taskExecutor
:jdk 提供的固定线程池delayQueue
:各个层级的时间轮共用的DelayQueue
队列,主要作用是堵塞推进时间轮表针的线程(ExpiredOperationReaper),等待最近到期的任务到期。taskCounter
:各个层级时间轮共用的任务个数计数器timingWheel
:最底层的时间轮readWriteLock
:读写锁
add 方法 添加任务,如果任务未到期,则调用 TimingWheel 的 add 方法添加到时间轮中等待后期执行。
def add(timerTask: TimerTask): Unit = {
readLock.lock()
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
} finally {
readLock.unlock()
}
}
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
//向时间轮提交添加任务失败,任务可能已经到期或已经取消
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled)
//将到期任务提交到 taskExecutor 执行
taskExecutor.submit(timerTaskEntry.timerTask)
}
}
advanceClock 方法
def advanceClock(timeoutMs: Long): Boolean = {
//堵塞等待
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
//在堵塞期间,有TimerTaskList 到期
if (bucket != null) {
writeLock.lock()
try {
while (bucket != null) {
//推进时间轮表针
timingWheel.advanceClock(bucket.getExpiration())
//尝试将 bucket 中的任务重新添加到时间轮,此过程不一定是将任务提交给 taskExecutor 执行,
//对未到期的任务只是从原来的时间轮降级到下层时间轮继续等待
bucket.flush(reinsert)
//不会堵塞
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}
//重新添加 timerTaskEntry 到时间轮中
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)
DelayedOperation
Kafka 通过 SystemTimer
来定期检查请求是否超时,但这些请求真正的目的不是为了超时执行,而是为了满足其他条件后执行,比如说生产者发送消息、消费者消费消息等。这些需求SystemTimer 就无法满足了,所以提供了 DelayedOperation 这个抽象类,来满足这些延迟操作。
下图就是 DelayedOperation 的一些实现类:
//模板方法 由子类具体的实现。
abstract class DelayedOperation(override val delayMs: Long,
lockOpt: Option[Lock] = None)
extends TimerTask with Logging {
//用来表示 TimerTask 是否完成
private val completed = new AtomicBoolean(false)
//用来表示 TimerTask 正在执行 maybeTryComplete 方法,尝试完成
private val tryCompletePending = new AtomicBoolean(false)
// Visible for testing
private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
/*
* Force completing the delayed operation, if not already completed.
* This function can be triggered when
*
* 1. The operation has been verified to be completable inside tryComplete()
* 2. The operation has expired and hence needs to be completed right now
*
* Return true iff the operation is completed by the caller: note that
* concurrent threads can try to complete the same operation, but only
* the first thread will succeed in completing the operation and return
* true, others will still return false
*/
def forceComplete(): Boolean = {
if (completed.compareAndSet(false, true)) {
// cancel the timeout timer
cancel()
onComplete()
true
} else {
false
}
}
/**
* Check if the delayed operation is already completed
*/
def isCompleted: Boolean = completed.get()
/**
* Call-back to execute when a delayed operation gets expired and hence forced to complete.
*/
def onExpiration(): Unit
/**
* Process for completing an operation; This function needs to be defined
* in subclasses and will be called exactly once in forceComplete()
*/
def onComplete(): Unit
/**
* Try to complete the delayed operation by first checking if the operation
* can be completed by now. If yes execute the completion logic by calling
* forceComplete() and return true iff forceComplete returns true; otherwise return false
*
* This function needs to be defined in subclasses
*/
def tryComplete(): Boolean
/**
* Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired
* without blocking.
*
* If threadA acquires the lock and performs the check for completion before completion criteria is met
* and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not
* yet released the lock, we need to ensure that completion is attempted again without blocking threadA
* or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one
* of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that
* every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until
* the operation is actually completed.
*/
private[server] def maybeTryComplete(): Boolean = {
var retry = false
var done = false
do {
if (lock.tryLock()) {
try {
tryCompletePending.set(false)
done = tryComplete()
} finally {
lock.unlock()
}
// While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
// `tryCompletePending`. In this case we should retry.
retry = tryCompletePending.get()
} else {
// Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
// acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
// Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
// released the lock and returned by the time the flag is set.
retry = !tryCompletePending.getAndSet(true)
}
} while (!isCompleted && retry)
done
}
/*
* run() method defines a task that is executed on timeout
*/
override def run(): Unit = {
if (forceComplete())
onExpiration()
}
}
DelayedOperation 因为到期而被提交到SystemTimer.taskExecutor 线程池中执行,也可能在其他线程检测其执行条件时发现已经满足执行条件,而将其执行。
如下图所示:
DelayedOperationPurgatory
DelayedOperationPurgatory
它是一个辅助类,提供管理 DelayedOperation 。
DelayedOperationPurgatory 属性
final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
timeoutTimer: Timer,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true,
timerEnabled: Boolean = true)
extends Logging with KafkaMetricsGroup {
/* a list of operation watching keys */
private class WatcherList {
//
val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
val watchersLock = new ReentrantLock()
/*
* Return all the current watcher lists,
* note that the returned watchers may be removed from the list by other threads
*/
def allWatchers = {
watchersByKey.values
}
}
private val watcherLists = Array.fill[WatcherList](DelayedOperationPurgatory.Shards)(new WatcherList)
private def watcherList(key: Any): WatcherList = {
watcherLists(Math.abs(key.hashCode() % watcherLists.length))
}
// the number of estimated total operations in the purgatory
//
private[this] val estimatedTotalOperations = new AtomicInteger(0)
/* background thread expiring operations that have timed out */
private val expirationReaper = new ExpiredOperationReaper()
}
timeoutTimer
:systemTimer对象watchersByKey
:管理 Watchers 的pool 对象,底层是ConcurrentHashMapestimatedTotalOperations
:记录了该 DelayedOperationPurgatory 中的 DelayedOperation 个数expirationReaper
:一个ShutdownableThread
线程对象,主要有两个功能:- 一个是推进时间轮表针
- 二是定期清理 watchersByKey 中已完成的 DelayedOperation。
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
false) {
override def doWork(): Unit = {
//最长堵塞时间200ms
advanceClock(200L)
}
}
def advanceClock(timeoutMs: Long): Unit = {
//调用SystemTimer.advanceClock 方法尝试推进时间轮的表针
timeoutTimer.advanceClock(timeoutMs)
// DelayedOperation 到期后 被Systemer.taskExecutor完成后,并不会通知 DelayedOperationPurgatory 删除DelayedOperation
// DelayedOperationPurgatory 与 SystemTimer 中的 DelayedOperation 数量相差到一个阈值时,会调用 purgeCompleted 方法执行清理工作
if (estimatedTotalOperations.get - numDelayed > purgeInterval) {
estimatedTotalOperations.getAndSet(numDelayed)
debug("Begin purging watch lists")
//调用 watchers.purgeCompleted 方法清理已完成的 DelayedOperation
val purged = watcherLists.foldLeft(0) {
case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
}
debug("Purged %d elements from watch lists.".format(purged))
}
}
tryCompleteElseWatch 方法
检查 DelayedOperation
是否已经完成,若未完成则添加到 watchersByKey
以及 SystemTimer 中。
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
assert(watchKeys.nonEmpty, "The watch key list can't be empty")
//尝试完成延迟操作
var isCompletedByMe = operation.tryComplete()
if (isCompletedByMe)//已完成,返回
return true
var watchCreated = false
//传入的key 可能是多个,循环把未完成的添加到 watchersByKey
for(key <- watchKeys) {
// If the operation is already completed, stop adding it to the rest of the watcher list.
if (operation.isCompleted)
return false
watchForOperation(key, operation)
if (!watchCreated) {
watchCreated = true
//自增
estimatedTotalOperations.incrementAndGet()
}
}
//第二次尝试完成
isCompletedByMe = operation.maybeTryComplete()
if (isCompletedByMe)
return true
// if it cannot be completed by now and hence is watched, add to the expire queue also
//没完成加入 SystemTimer 时间轮
if (!operation.isCompleted) {
if (timerEnabled)
timeoutTimer.add(operation)
//完成 删除。
if (operation.isCompleted) {
// cancel the timer task
operation.cancel()
}
}
false
}