1. 概述
Kafka 的网络层使用了 Reactor
模式,其中又有两种类型:
- date-plane:专门处理来自
客户端
和broker
的请求- 一个
Acceptor
线程,用于接收并处理所有的新连接 - N个
Processor
线程,每一个Processor
线程中又有自己的Selector
,用于从连接中读取请求和写会响应 - M个
Handler
线程用来处理请求和生成响应,用于处理请求并将产生的响应返回给Processor。
- 一个
- control-plane:专门处理来自
controller
的请求,需要通过control.plane.listener.name
显示配置,如果不配置默认使用date-plane的线程模型- 一个
Acceptor
线程,用于接收并处理所有的新连接 - 一个
Processor
线程,每一个Processor
线程中又有自己的Selector
,用于从连接中读取请求和写会响应 - 一个
Handler
线程用来处理请求和生成响应,用于处理请求并将产生的响应返回给Processor。 - 就是通过
KafkaController
和 Broker 之间做交互的时候使用。
- 一个
如果不熟悉Reactor模型
的可以看以下两篇博客:
下面是 data-plane
模式的结构:
SocketServer
是核心类,其中包含一个Acceptor
线程用于接收并处理所有的新连接- 每个Acceptor又包含多个
Processor
线程 - 每一个
Processor
线程中又有自己的Selector
,用于从连接中读取请求和写会响应 - 每个
Acceptor
又包含多个Handler
线程用来处理请求和生成响应,用于处理请求并将产生的响应返回给Processor。 - 而Processor 线程与Handler线程之间通过
RequestChannel
进行通信。
control-plane
就是把多个Processor 和 多个 Handler 都变成了一个,接下来开始分析源码。
2. 源码解析
2.1 SocketServer
class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
//① 在 RequestChannel 的 requestQueue 中缓存的最大请求个数, 默认是500 可以通过queued.max.requests进行配置
private val maxQueuedRequests = config.queuedMaxRequests
//micrometer 用于指标采集
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
//② 内存池,和RecordAccumualtor中的BufferPool 一个意思
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
//③ data-plane 数据平面的Processor 集合
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
//④ 数据平面 EndPoint Acceptor的映射关系:
//EndPoint :一般服务器都有多个网卡,可以配置多个IP,kafka 可以监听多个端口,EndPoint 封装了多个需要监听的Host、Port及使用的网络协议
//每个EndPoint 都会创建对应的Acceptor,也就是说一个指定的IP,Port 都有一个Acceptor监听
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
//⑤ 数据平面中用于存在R'e'q'e
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix)
// control-plane
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, ControlPlaneMetricPrefix))
//⑥ 下一个Processor Id
private var nextProcessorId = 0
//⑦ 提供了控制每个 IP 上的最大连接数的功能。底层通过Map对象,记录每个IP 地址上建立的连接数。
//创建新Connect 时与 maxConnectionsPerIpOverrides(或者maxConnectionsPerIp) 指定的最大值进行比较。
//超过则报错,因为有多个Acceptor 线程并发访问底层的 Map 对象,则需要synchronized 进行同步
private var connectionQuotas: ConnectionQuotas = _
private var stoppedProcessingRequests = false
- ①
maxQueuedRequests
:在 RequestChannel 的 requestQueue 中缓存的最大请求个数, 默认是500 可以通过queued.max.requests
进行配置 - ②
memoryPool
:内存池,和RecordAccumualtor中的BufferPool 一个意思socket.request.max.bytes
:最大字节数,超过会报错。默认是100M
- ③
dataPlaneProcessors
:data-plane 中的 Processor 线程集合,通过 ConcurrentHashMap 进行存储。 - ④
dataPlaneAcceptors
:数据平面 EndPoint Acceptor的映射关系,通过 ConcurrentHashMap 进行存储。EndPoint
:一般服务器都有多个网卡,可以配置多个IP,kafka 可以监听多个端口,EndPoint 封装了多个需要监听的Host、Port及使用的网络协议- 每个EndPoint 都会创建对应的
Acceptor
,也就是说一个指定的 ip,port 都有一个Acceptor监听
- ⑤
dataPlaneRequestChannel
:Processor 线程与 Handler 线程交互的信息的队列,能缓存的最大值就是maxQueuedRequests
- ⑥
nextProcessorId
: 下一个Processor 的ID(轮训算法) - ⑦
connectionQuotas
:提供了控制每个 IP 上的最大连接数的功能。底层通过Map对象,记录每个IP 地址上建立的连接数。创建新Connect 时与 maxConnectionsPerIpOverrides(或者maxConnectionsPerIp) 指定的最大值进行比较。超过则报错,因为有多个Acceptor 线程并发访问底层的 Map 对象,则需要 synchronized 进行同步
2.1.1 startup
初始化方法
def startup(startupProcessors: Boolean = true): Unit = {
//① 同步
this.synchronized {
//② 创建 ConnectionQuotas
connectionQuotas = new ConnectionQuotas(config, time)
//③ 创建控制平面 processor 线程 、 acceptor 会在 创建数据/控制平面的时候进行创建
createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
// 创建数据平面
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
//④ 是否启动Processor线程
if (startupProcessors) {
//启动线程
startControlPlaneProcessor(Map.empty)
startDataPlaneProcessors(Map.empty)
}
}
}
- ①
synchronized
进行同步 - ② 初始化
ConnectionQuotas
- ③ 初始化
Acceptor
线程 和Processor
线程 - ④ 启动
Processor
线程
注:SocketServer.startup: Kafka#mian ——> KafkaServerStartable#startup ——> KafkaServer#startup。
也就是说在Kafka启动就会调用
SocketServer#startup
方法进行初始化。
2.1.2 初始化 ConnectionQuotas
SocketServer的内部类,用于控制每个IP最多的连接数
class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
//① 每个IP上能创建的最大连接数 通过max.connections.per.ip 配置 默认值 Integer.MAX_VALUE(2的31次方-1)
@volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
//② 具体指定某IP上最大的连接数,这里指定的最大连接数会覆盖上面 maxConnectionsPerIp 字段的值,通过max.connections.per.ip.overrides配置,默认是 ""
@volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
//③ broker 的最大连接数, max.connections 默认值 Integer.MAX_VALUE(2的31次方-1)
@volatile private var brokerMaxConnections = config.maxConnections
//④ ip 与 连接数的映射关系
private val counts = mutable.Map[InetAddress, Int]()
//⑤ 监听器 与 对应数量的映射关系
private val listenerCounts = mutable.Map[ListenerName, Int]()
//⑥ 监听器与最大连接数的映射关系
private val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]()
@volatile private var totalCount = 0
}
- ①
defaultMaxConnectionsPerIp
: 每个IP上能创建的最大连接数 通过max.connections.per.ip
配置 默认值 Integer.MAX_VALUE(2的31次方-1) - ②
maxConnectionsPerIpOverrides
:具体指定某IP上最大的连接数,这里指定的最大连接数会覆盖上面 maxConnectionsPerIp 字段的值,通过max.connections.per.ip.overrides
配置,默认是 "” - ③
brokerMaxConnections
:broker 的最大连接数,max.connections
默认值 Integer.MAX_VALUE(2的31次方-1) - ④
counts
: ip 与 连接数的映射关系 - ⑤ 监听器 与 对应数量的映射关系
- ⑥ 监听器与最大连接数的映射关系
2.1.3 createDataPlaneAcceptorsAndProcessors
初始化Acceptor 线程与 Processor 线程 ,以 Data-Plane
为例子
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = synchronized {
//循环
endpoints.foreach { endpoint =>
//③ 在connectionQuotas监听器添加监听器,如果这个端点之前已经有监听器监听了,那么直接唤醒,否则创建加入唤醒
connectionQuotas.addListener(config, endpoint.listenerName)
//④ 创建acceptor线程
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
//⑤ 给创建的acceptor 线程添加processor 线程
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
//⑥ 启动dataPlaneAcceptor线程
KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
//⑦ 调用countdownLatch#await 方法等待唤醒,当Acceptor线程启动的时候 会调用countdown 方法唤醒。
dataPlaneAcceptor.awaitStartup()
//⑧ 放入dataPlaneAcceptors中
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s"Created data-plane acceptor and processors for endpoint : $endpoint")
}
}
-
①
dataProcessorsPerListener 参数
: -
②
endpoints 参数
: -
③
connectionQuotas#addListener()方法
:在connectionQuotas监听器添加监听器,如果这个端点之前已经有监听器监听了,那么直接唤醒,否则创建加入唤醒private[network] def addListener(config: KafkaConfig, listenerName: ListenerName): Unit = { //同步 counts.synchronized { //是否包含,不包含创建 if (!maxConnectionsPerListener.contains(listenerName)) { val newListenerQuota = new ListenerConnectionQuota(counts, listenerName) maxConnectionsPerListener.put(listenerName, newListenerQuota) listenerCounts.put(listenerName, 0) config.addReconfigurable(newListenerQuota) } //唤醒 counts.notifyAll() } }
-
④
createAcceptor()方法
:创建acceptor线程private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = synchronized { val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId //创建Acceptor线程 new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas, metricPrefix) }
sendBufferSize
: 发送最大的字节数 默认是100 * 1024 可以通过socket.send.buffer.bytes
配置recvBufferSize
:接收最大的字节数 默认是100 * 1024 可以通过socket.receive.buffer.bytes
配置
-
⑤
addDataPlaneProcessors()方法
:给创建的acceptor 线程添加 processor 线程private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized { //监听的端点 val listenerName = endpoint.listenerName //该端点的协议 val securityProtocol = endpoint.securityProtocol //创建该端点的Processor 数组 val listenerProcessors = new ArrayBuffer[Processor]() //注意:这个创建多少个Processor 通过 num.network.threads 配置,默认3个 for (_ <- 0 until newProcessorsPerListener) { val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool) listenerProcessors += processor dataPlaneRequestChannel.addProcessor(processor) nextProcessorId += 1 } listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p)) acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix) }
注意
:newProcessorsPerListener 这个创建多少个Processor 通过num.network.threads
配置,默认3个。也就是说该参数设置的Processor的数量。Acceptor数量 始终是一个。
-
⑥ 启动
dataPlaneAcceptor
线程,名字为 -
⑦ 调用
countdownLatch#await
方法等待唤醒,当Acceptor线程启动的时候会调用countdown
方法唤醒。def awaitStartup(): Unit = startupLatch.await
-
⑧ 放入
dataPlaneAcceptors
中,就是调用ConcurrentHashMap#put
方法
2.1.5 startDataPlaneProcessors
启动 Processor 线程 ,以 Data-Plane
为例子
def startDataPlaneProcessors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = synchronized {
val interBrokerListener = dataPlaneAcceptors.asScala.keySet
.find(_.listenerName == config.interBrokerListenerName)
.getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
dataPlaneAcceptors.asScala.filterKeys(_ != interBrokerListener).values
orderedAcceptors.foreach { acceptor =>
val endpoint = acceptor.endPoint
debug(s"Wait for authorizer to complete start up on listener ${endpoint.listenerName}")
waitForAuthorizerFuture(acceptor, authorizerFutures)
debug(s"Start processors on listener ${endpoint.listenerName}")
//看这里就行
acceptor.startProcessors(DataPlaneThreadPrefix)
}
info(s"Started data-plane processors for ${dataPlaneAcceptors.size} acceptors")
}
private[network] def startProcessors(processorThreadPrefix: String): Unit = synchronized {
if (!processorsStarted.getAndSet(true)) {
startProcessors(processors, processorThreadPrefix)
}
}
也就是说在 2.1.4的addDataPlaneProcessors
创建的Processor数组,这里会循环的一一启动。
在这里初始化就结束了。接下来逐一分析 Acceptor 线程
和 Processor 线程
是怎么工作的。
2.2 AbstractServerThread
AbstractServerThread
提供了启动关闭相关的方法并且实现了Runnable 接口
的抽象类,而 Acceptor 线程
和 Processor 线程
都继承它。
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
//① count 为 1 的CountDownLatch 对象,标识了当前线程的startup操作是否完成,在2.1.3中的第⑦步有体现。
private val startupLatch = new CountDownLatch(1)
//② count 为 0 的CountDownLatch 对象,标识了当前线程的shutdown操作是否完成
@volatile private var shutdownLatch = new CountDownLatch(0)
//③ 表示当前线程是否存活,初始化的时候为true,在shutdown 方法的时候会cas的改为false
private val alive = new AtomicBoolean(true)
//④ wakeup 方法,由子类实现
def wakeup(): Unit
//⑤ shutdown 方法,提供默认实现
def shutdown(): Unit = {
if (alive.getAndSet(false))
wakeup()
shutdownLatch.await()
}
//⑥ awaitStartup 方法 提供默认实现
def awaitStartup(): Unit = startupLatch.await
//⑦ startupComplete方法 ,提供默认实现
protected def startupComplete(): Unit = {
// Replace the open latch with a closed one
shutdownLatch = new CountDownLatch(1)
startupLatch.countDown()
}
//⑧ `shutdownComplete` 方法,提供默认实现
protected def shutdownComplete(): Unit = shutdownLatch.countDown()
protected def isRunning: Boolean = alive.get
def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
if (channel != null) {
debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress()}")
//关闭 socketChannel 并减少 记录的连接数
connectionQuotas.dec(listenerName, channel.socket.getInetAddress)
CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
CoreUtils.swallow(channel.close(), this, Level.ERROR)
}
}
}
- ①
startupLatch
:count 为 1 的CountDownLatch
对象,标识了当前线程的startup操作是否完成,在2.1.3中的第⑦步
有体现。 - ②
shutdownLatch
: count 为 0 的CountDownLatch 对象,标识了当前线程的shutdown操作是否完成 - ③
alive
: 表示当前线程是否存活,初始化的时候为true,在shutdown 方法的时候会cas
的改为false - ④
wakeup
方法,由子类实现 - ⑤
shutdown
方法,提供默认实现 - ⑥
awaitStartup
方法 提供默认实现 - ⑦
startupComplete
方法 ,提供默认实现 - ⑧
shutdownComplete
方法,提供默认实现
方法都比较简单,下面通过它的两个实现类 Acceptor
和 Processor
来看看这些方法是怎么实现的。
2.3 Acceptor
主要接收客户端建立连接的请求,创建Socket 连接并分配给Processor 处理。
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
//① java nio selector
private val nioSelector = NSelector.open()
//② 接收客户端请求的 ServerSocketChannel
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
//③ processors 类型的数组
private val processors = new ArrayBuffer[Processor]()
//④ 标识 processors 线程是否启动
private val processorsStarted = new AtomicBoolean
private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent",
"blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))
}
- ①
nioSelector
: java nio selector - ②
serverChannel
: 接收客户端请求的 ServerSocketChannel - ③
processors
: processors 类型的数组 - ④
processorsStarted
: 标识 processors 线程是否启动
初始化 Selector
、创建用于接收客户端请求的 ServerSocketChannel
对象,创建Acceptor管理的 Processor
数组。
2.3.1 startProcessors
启动Processors 数组中的所有Processor 线程
private[network] def startProcessors(processorThreadPrefix: String): Unit =
synchronized {//同步
if (!processorsStarted.getAndSet(true)) {
startProcessors(processors, processorThreadPrefix)
}
}
private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
//启动线程
processors.foreach { processor =>
KafkaThread.nonDaemon(s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor).start()
}
}
2.3.2 addProcessors
添加Processor 线程
private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
processors ++= newProcessors
if (processorsStarted.get)
startProcessors(newProcessors, processorThreadPrefix)
}
2.3.3 removeProcessors
删除Processor 线程
private[network] def removeProcessors(removeCount: Int, requestChannel: RequestChannel): Unit = synchronized {
val toRemove = processors.takeRight(removeCount)
processors.remove(processors.size - removeCount, removeCount)
toRemove.foreach(_.shutdown())
toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
}
2.3.4 shutdown
关闭方法
override def shutdown(): Unit = {
//调用父类的shutdown
super.shutdown()
synchronized {
//调用Processors 数组中的Processor 的shutdown 关闭processor线程
processors.foreach(_.shutdown())
}
}
2.3.5 run
核心方法:由于Acceptor的父类AbstractServerThread继承了Runnable 所以这个run方法是主要方法。就是对OP_ACCEPT 事件的处理
def run(): Unit = {
//① 在channel 中注册accept事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
//② 标识当前线程启动操作已经完成
startupComplete()
try {
var currentProcessorIndex = 0
while (isRunning) {//③ 检测线程运行状态
try {
val ready = nioSelector.select(500) //④ 等待关注的事件
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) { //⑤ 如果是 accept 事件,则调用accept 来处理事件,不是则报错
accept(key).foreach { socketChannel =>
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
do {
retriesLeft -= 1
processor = synchronized {
//⑥ 更新下一个processor 的id,使用的是训轮算法
currentProcessorIndex = currentProcessorIndex % processors.length
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
//⑦ 通过 assignNewConnection方法 给Processor 线程分配connection
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
shutdownComplete()
}
}
- ① 在channel 中注册accept事件
- ② 标识当前线程启动操作已经完成
- ③ 检测线程运行状态
- ④ 等待关注的事件
- ⑤ 如果是 accept 事件,则调用
accept
来处理事件,不是则报错//实现对 `OP_ACCEPT` 事件的处理,它会创建 `SocketChannel` 同时还会增加 `ConnectionQuotas` 中记录的连接数。 private def accept(key: SelectionKey): Option[SocketChannel] = { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] //① 创建socketChannel val socketChannel = serverSocketChannel.accept() try { //② 增加 connectionQuotas 中连接数记录 connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter) //非堵塞 socketChannel.configureBlocking(false) //tcp连接不允许延迟,设置keepAlive 和 sendBufferSize socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socketChannel.socket().setSendBufferSize(sendBufferSize) Some(socketChannel) } catch { case e: TooManyConnectionsException => info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.") close(endPoint.listenerName, socketChannel) None } }
- ⑥ 更新下一个 processor 的id,使用的是训轮算法
- ⑦ 通过
assignNewConnection
方法 给Processor 线程分配connection,与我之前看的2.1.1版本有所不同(直接调用Processor#accept)
2.4 Processor
主要用于完成读取请求和写回响应的操作,具体的业务操作通过 Handler 线程
来处理
//① ArrayBlockingQueue ,其中保存了由此Processor 处理的新建的SocketChannel
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
//② 保存未发送的响应,与客户端的inflightRequest 的不同的是 客户端并不会对服务端发送的响应消息再次发送确认,
//所以inflightResponse 中的响应会在发送成功后成功后移除,而InFlightRequest中的请求是在收到响应后才会移除
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
//③ LinkedBlockingDeque 由此 Processor 需要响应请求的队列
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
//④ 因连接超时kill的数量
val expiredConnectionsKilledCount = new CumulativeSum()
//⑤ 创建selector KSelector 类型,通过builder 模式 负责管理网络连接
private val selector = createSelector(
ChannelBuilders.serverChannelBuilder(listenerName,
listenerName == config.interBrokerListenerName,
securityProtocol,
config,
credentialProvider.credentialCache,
credentialProvider.tokenCache,
time,
logContext))
- ①
newConnections
:ArrayBlockingQueue ,其中保存了由此Processor 处理的新建的SocketChannel - ②
inflightResponses
:保存未发送的响应,与客户端的inflightRequest 的不同的是 客户端并不会对服务端发送的响应消息再次发送确认,所以inflightResponse 中的响应会在发送成功后成功后移除,而InFlightRequest中的请求是在收到响应后才会移除 - ③
responseQueue
:LinkedBlockingDeque 由此 Processor 需要响应请求的队列 - ④
expiredConnectionsKilledCount
:因连接超时kill的数量 - ⑤
selector
:创建selector KSelector 类型,通过builder 模式 负责管理网络连接
2.4.1 run 方法
核心方法:和 Acceptor 一样继承 AbstractServerThread
override def run(): Unit = {
//① 标识Processor的初始化流程已经结束,唤醒堵塞等待此Processor初始化完成得线程
startupComplete()
try {
while (isRunning) {
try {
//② 处理newConnections 队列中新建的 SocketChannel,队列中每个SocketChannel 都要在nioSelector 上注册read事件。
//socketChannel 会被封装成KafkaChannel,并附加到selectionKey上,所以后面出发read 事件时,从selectionKey 获取的是KafkaChannel 类型的对象,
configureNewConnections()
//③ 获取RequestChannel 中对应的requestQueue 队列, 并处理其中缓存的Response、
//如果response 是SendAction 类型,表示该Response 需要发送给客户端,则查找对应的kafkaChannel,为其注册write事件,并将kafkaChannel.send字段指向待发送的response对象。
//同时还会将response 从 responseQueue队列中移除,放入inflightResponse中,当发送完一个完成的响应后,会取消此连接注册的write事件。
//如果response 是NoOpAction类型,表示此连接暂无响应需要发送,则KafkaClient注册read,允许其继续读取请求
//如果response 是closeConnectionAction 类型,则关闭对应的连接
processNewResponses()
//④ 发送响应 调用的 networkclient的 Kselector.poll 发送成功后加入 completedRecevices, completedSends, disConnected中等待处理
poll()
//⑤ 处理KSelector.completedReceive队列。遍历completedReceive,将networkreceive、processorid、身份认证信息一起封装成
//RequestChannel.Request对象并放入requestChannel.requestQueue队列中,等待Handler线程的后续处理。
//之后取消对应KafkaChannel注册的read事件,表示在发送响应之前此连接不能在读取任何请求了
processCompletedReceives()
//⑥ 处理KSelector.completedSend队列。将inflightResponse 中保存对应的response删除,为其连接重新注册read事件,允许从该连接读取数据
processCompletedSends()
//⑦ 处理KSelector.disconnected队列,先从inflightResponse 中删除该连接对应的所有Response。然后减少ConnectionQuotas中记录的连接数,为后续新建连接做准备
processDisconnected()
//⑧ 调用close 关闭
closeExcessConnections()
} catch {
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}
- ①
startupComplete方法
:标识Processor的初始化流程已经结束,唤醒堵塞等待此Processor初始化完成得线程 - ②
configureNewConnections方法
:处理newConnections
队列中新建的 SocketChannel,队列中每个SocketChannel
都要在 nioSelector 上注册read事件。socketChannel 会被封装成 KafkaChannel,并附加到selectionKey上,所以后面出发read
事件时,从selectionKey 获取的是KafkaChannel 类型的对象 - ③
processNewResponses方法
:获取 RequestChannel 中对应的requestQueue 队列, 并处理其中缓存的Response、如果response 是SendAction 类型,表示该Response 需要发送给客户端,则查找对应的kafkaChannel,为其注册write
事件,并将kafkaChannel.send字段指向待发送的response对象。同时还会将response 从 responseQueue队列中移除,放入inflightResponse
中,当发送完一个完成的响应后,会取消此连接注册的write事件。- 如果response 是NoOpAction类型,表示此连接暂无响应需要发送,则KafkaClient注册read,允许其继续读取请求
- 如果response 是closeConnectionAction 类型,则关闭对应的连接
- ④
poll方法
:发送响应 调用的 networkclient的 Kselector.poll 发送成功后加入 completedRecevices, completedSends, disConnected中等待处理 - ⑤
processCompletedReceives方法
:处理KSelector.completedReceive队列。遍历completedReceive,将networkreceive、processorid、身份认证信息一起封装成RequestChannel.Request
对象并放入requestChannel.requestQueue队列中,等待Handler线程的后续处理。之后取消对应KafkaChannel注册的read事件,表示在发送响应之前此连接不能在读取任何请求了 - ⑥
processCompletedSends方法
:处理KSelector.completedSend
队列。将inflightResponse
中保存对应的response删除,为其连接重新注册read事件,允许从该连接读取数据 - ⑦
processDisconnected方法
:处理KSelector.disconnected
队列,先从inflightResponse 中删除该连接对应的所有Response。然后减少ConnectionQuotas中记录的连接数,为后续新建连接做准备 - ⑧
closeExcessConnections方法
:调用close 关闭
2.4.2 startupComplete 方法
标识Processor的初始化流程已经结束,唤醒堵塞等待此Processor初始化完成得线程
protected def startupComplete(): Unit = {
//①创建count数为1的shutdownLatch
shutdownLatch = new CountDownLatch(1)
//②唤醒被startupLatch 堵塞的线程
startupLatch.countDown()
}
- ① 这里解释了为什么
2.2 AbstractServerThread
中的 shutdownLatch 初始值是0。 - ② 唤醒被
startupLatch
堵塞的线程
2.4.3 configureNewConnections 方法
处理 newConnections
队列中新建的 SocketChannel,队列中每个 SocketChannel
都要在 nioSelector 上注册read事件。socketChannel 会被封装成 KafkaChannel,并附加到selectionKey上,所以后面出发 read
事件时,从selectionKey 获取的是KafkaChannel 类型的对象
private def configureNewConnections(): Unit = {
var connectionsProcessed = 0
while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {//遍历newConnections 集合
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
//注册OP_READ事件
selector.register(connectionId(channel.socket), channel)
connectionsProcessed += 1
} catch {
// We explicitly catch all exceptions and close the socket to avoid a socket leak.
case e: Throwable =>
val remoteAddress = channel.socket.getRemoteSocketAddress
// need to close the channel here to avoid a socket leak.
close(listenerName, channel)
processException(s"Processor $id closed connection from $remoteAddress", e)
}
}
}
2.4.4 processNewResponses 方法
processNewResponses方法
:获取 RequestChannel 中对应的requestQueue 队列, 并处理其中缓存的Response、如果response 是SendAction 类型,表示该Response 需要发送给客户端,则查找对应的kafkaChannel,为其注册 write
事件,并将kafkaChannel.send字段指向待发送的response对象。同时还会将response 从 responseQueue队列中移除,放入 inflightResponse
中,当发送完一个完成的响应后,会取消此连接注册的write事件。
- 如果response 是NoOpAction类型,表示此连接暂无响应需要发送,则KafkaClient注册read,允许其继续读取请求
- 如果response 是closeConnectionAction 类型,则关闭对应的连接
private def processNewResponses(): Unit = {
var currentResponse: RequestChannel.Response = null
//dequeueResponse 方法 从responseQueue 队列中弹出一个response
while ({currentResponse = dequeueResponse(); currentResponse != null}) {
val channelId = currentResponse.request.context.connectionId
try {
currentResponse match {
case response: NoOpResponse =>
updateRequestMetrics(response)
trace(s"Socket server received empty response to send, registering for read: $response")
handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
tryUnmuteChannel(channelId)
case response: SendResponse =>
sendResponse(response, response.responseSend)
case response: CloseConnectionResponse =>
updateRequestMetrics(response)
trace("Closing socket connection actively according to the response code.")
close(channelId)
case _: StartThrottlingResponse =>
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
case _: EndThrottlingResponse =>
// Try unmuting the channel. The channel will be unmuted only if the response has already been sent out to
// the client.
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
tryUnmuteChannel(channelId)
case _ =>
throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
}
} catch {
case e: Throwable =>
processChannelException(channelId, s"Exception while processing response for $channelId", e)
}
}
}
2.4.5 poll 方法
发送响应 调用的 networkclient的 Kselector.poll 发送成功后加入 completedRecevices, completedSends, disConnected中等待处理。 和KafkaProducer 中发送消息是一样的。
private def poll(): Unit = {
val pollTimeout = if (newConnections.isEmpty) 300 else 0
try selector.poll(pollTimeout)
catch {
case e @ (_: IllegalStateException | _: IOException) =>
error(s"Processor $id poll failed", e)
}
}
2.4.6 processCompletedReceives 方法
处理KSelector.completedReceive
队列。遍历completedReceive,将networkreceive、processorid、身份认证信息一起封装成RequestChannel.Request
对象并放入requestChannel.requestQueue队列中,等待Handler线程的后续处理。之后取消对应KafkaChannel注册的read事件,表示在发送响应之前此连接不能在读取任何请求了
private def processCompletedReceives(): Unit = {
selector.completedReceives.asScala.foreach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
val header = RequestHeader.parse(receive.payload)
if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
trace(s"Begin re-authentication: $channel")
else {
val nowNanos = time.nanoseconds()
if (channel.serverAuthenticationSessionExpired(nowNanos)) {
// be sure to decrease connection count and drop any in-flight responses
debug(s"Disconnecting expired channel: $channel : $header")
close(channel.id)
expiredConnectionsKilledCount.record(null, 1, 0)
} else {
val connectionId = receive.source
val context = new RequestContext(header, connectionId, channel.socketAddress,
channel.principal, listenerName, securityProtocol,
channel.channelMetadataRegistry.clientInformation)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
// KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
// and version. It is done here to avoid wiring things up to the api layer.
if (header.apiKey == ApiKeys.API_VERSIONS) {
val apiVersionsRequest = req.body[ApiVersionsRequest]
if (apiVersionsRequest.isValid) {
channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
apiVersionsRequest.data.clientSoftwareName,
apiVersionsRequest.data.clientSoftwareVersion))
}
}
requestChannel.sendRequest(req)
selector.mute(connectionId)
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
}
}
case None =>
// This should never happen since completed receives are processed immediately after `poll()`
throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
}
} catch {
// note that even though we got an exception, we can assume that receive.source is valid.
// Issues with constructing a valid receive object were handled earlier
case e: Throwable =>
processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
}
}
}
2.4.7 processCompletedSends 方法
处理KSelector.completedSend
队列。将 inflightResponse
中保存对应的response删除,为其连接重新注册read事件,允许从该连接读取数据
private def processCompletedSends(): Unit = {
selector.completedSends.asScala.foreach { send =>
try {
val response = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
updateRequestMetrics(response)
response.onComplete.foreach(onComplete => onComplete(send))
handleChannelMuteEvent(send.destination, ChannelMuteEvent.RESPONSE_SENT)
tryUnmuteChannel(send.destination)
} catch {
case e: Throwable => processChannelException(send.destination,
s"Exception while processing completed send to ${send.destination}", e)
}
}
}
2.4.8 processDisconnected 方法
处理KSelector.disconnected
队列,先从inflightResponse 中删除该连接对应的所有Response。然后减少ConnectionQuotas中记录的连接数,为后续新建连接做准备
private def processDisconnected(): Unit = {
selector.disconnected.keySet.asScala.foreach { connectionId =>
try {
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
} catch {
case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e)
}
}
}
2.4.9 closeExcessConnections 方法
调用close 关闭
private def closeExcessConnections(): Unit = {
if (connectionQuotas.maxConnectionsExceeded(listenerName)) {
val channel = selector.lowestPriorityChannel()
if (channel != null)
close(channel.id)
}
}
2.5 RequestChannel
Processor 线程与 Handler线程之间传递数据是通过RequestChannel 完成的。
class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics
//① processor 线程向 handler 线程传递请求的队列,因为多个processor 线程和多个handler线程并发操作,所以选择线程安全的队列。
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
//② processors 线程
private val processors = new ConcurrentHashMap[Int, Processor]()
//③ 相对应的responseQueue 位于Processor中
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
}
- ①
requestQueue
: processor 线程向 handler 线程传递请求的队列,因为多个processor 线程和多个handler线程并发操作,所以选择线程安全的队列。 - ② processors 线程
- ③
responseQueue
:相对应的responseQueue 位于Processor中
2.5.1 sendRequest
把请求塞入 requestQueue 中,由KafkaRequestHandler线程
不断的调用 receiveRequest方法获取。
def sendRequest(request: RequestChannel.Request): Unit = {
requestQueue.put(request)
}
2.5.2 receiveRequest
此方法会被 KafkaRequestHandler线程
不断的调用。
def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
2.5.3 sendResponse
由 KafkaRequestHandler线程
塞入responseQueue中。
def sendResponse(response: RequestChannel.Response): Unit = {
if (isTraceEnabled) {
val requestHeader = response.request.header
val message = response match {
case sendResponse: SendResponse =>
s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${sendResponse.responseSend.size} bytes."
case _: NoOpResponse =>
s"Not sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} as it's not required."
case _: CloseConnectionResponse =>
s"Closing connection for client ${requestHeader.clientId} due to error during ${requestHeader.apiKey}."
case _: StartThrottlingResponse =>
s"Notifying channel throttling has started for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
case _: EndThrottlingResponse =>
s"Notifying channel throttling has ended for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
}
trace(message)
}
val processor = processors.get(response.processor)
// The processor may be null if it was shutdown. In this case, the connections
// are closed, so the response is dropped.
if (processor != null) {
processor.enqueueResponse(response)
}
}
private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
//入队列
responseQueue.put(response)
//唤醒
wakeup()
}
都是一些很简单的方法。
2.6 NetworkClient
用于生产者发送消息,用于消费者消费消息以及服务端Broker之间的通信。
//重要的字段
public class NetworkClient implements KafkaClient {
//selector
private final Selectable selector;
//通过Metadataupdater 来判断现在是否正在更新元数据或者是需要更新的状态
private final MetadataUpdater metadataUpdater;
//每个节点的连接状态,底层使用 HashMap 管理
private final ClusterConnectionStates connectionStates;
//缓存发送出去的消息但是还没有得到响应的请求
private final InFlightRequests inFlightRequests;
//发送最大的字节数 默认是100 * 1024 可以通过`socket.send.buffer.bytes`配置
private final int socketSendBuffer;
//接收最大的字节数 默认是100 * 1024 可以通过 `socket.receive.buffer.bytes` 配置
private final int socketReceiveBuffer;
//默认超时时间
private final int defaultRequestTimeoutMs;
//重试 需要等待的时间
private final long reconnectBackoffMs;
}
2.6.1 ready 方法
该节点是否可以发送。
public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
if (isReady(node, now))
return true;
if (connectionStates.canConnect(node.idString(), now))
// if we are interested in sending to a node and we don't have a connection to it, initiate one
initiateConnect(node, now);
return false;
}
满足可以发送的要求有以下几点:
- ①
isReady
:现在不能是正在更新元数据或者是需要更新的状态public boolean isReady(Node node, long now) { return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now); }
- ②
canSendRequest
:当前状态要是ready
状态,并且KafkaChannel 已经创建好了 并且 inFlightRequests 没有缓存满可以继续缓存请求。private boolean canSendRequest(String node, long now) { return connectionStates.isReady(node, now) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); }
- ③
canConnect
: 是否可以进行连接,如果状态是disconnect那么就无法进行连接 - ④
initiateConnect
:建立连接。private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); try { //状态改为正在连接 connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup); InetAddress address = connectionStates.currentAddress(nodeConnectionId); log.debug("Initiating connection to node {} using address {}", node, address); //调用selector.connect进行连接 selector.connect(nodeConnectionId, new InetSocketAddress(address, node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { //发生错误状态改为disconnected,并且更新元数据 log.warn("Error connecting to node {}", node, e); connectionStates.disconnected(nodeConnectionId, now); metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty()); } }
2.6.2 send 方法
主要是将请求设置到KafkaChannel.send字段,同时添加到InFlightRequests队列中等待响应。
public void send(ClientRequest request, long now) {
doSend(request, false, now);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
ensureActive();
String nodeId = clientRequest.destination();
//① 检查是否能够向指定 Node 发送请求
if (!isInternalRequest) {
if (!canSendRequest(nodeId, now))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
//② 封装成ClientRequest请求。
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException unsupportedVersionException) {
log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
if (!isInternalRequest)
abortedSends.add(clientResponse);
else if (clientRequest.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
}
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
//给ClientRequest加请求头
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
clientRequest.correlationId(), destination);
} else {
log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
}
}
Send send = request.toSend(destination, header);
//③ 放入inFlightRequests中
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
//④ 放入Send字段
selector.send(send);
}
- ① 检查是否能够向指定
Node
发送请求 - ② 封装成
ClientRequest
请求。 - ③ 放入
inFlightRequests
中 - ④ 放入
KafkaChannel.Send
字段//org.apache.kafka.common.network.selector public void send(Send send) { String connectionId = send.destination(); KafkaChannel channel = openOrClosingChannelOrFail(connectionId); if (closingChannels.containsKey(connectionId)) { this.failedSends.add(connectionId); } else { try { //放入 `KafkaChannel.Send` 字段 channel.setSend(send); } catch (Exception e) { channel.state(ChannelState.FAILED_SEND); this.failedSends.add(connectionId); close(channel, CloseMode.DISCARD_NO_NOTIFY); if (!(e instanceof CancelledKeyException)) { log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", connectionId, e); throw e; } } } }
2.6.3 poll 方法
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
//① 如果拒绝发送队列里有数据,那么进行处理
if (!abortedSends.isEmpty()) {
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
//② 更新元数据
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//③ 执行IO操作
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
//④ 开始处理一系列的队列
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
}
- ① 如果拒绝发送队列里有数据,那么进行处理
- ② 更新元数据
- ③ 执行IO操作,此操作为核心方法 用于read或者write,详情可以看一开头的两篇博客中的NIO的Selector。
- ④ 开始处理一系列的队列,就是发送请求或者接收响应之后做一些后续的处理改变状态之类的。在下面一一分析
2.6.4 handleCompletedSends 方法
处理发送成功的请求。首先,InFlightRequests
保存的是已发送的但还没有收到响应的请求,completedSends
保存的是最近一次poll()
方法中发送成功的请求。所以 InFlightRequests
与 completedSends
中对应队列的最后一个请求是一致的。
private void handleCompletedSends(List<ClientResponse> responses, long now) {
//① 遍历 completedSends 集合
for (Send send : this.selector.completedSends()) {
//② 获得 inFlightRequests 最后一个请求
InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
//③ 检查请求是否需要响应。
if (!request.expectResponse) {
//将 inFlightRequests 队列的第一个请求删除
this.inFlightRequests.completeLastSent(send.destination());
//生成 ClientResponse 对象, 添加到 response集合中
responses.add(request.completed(null, now));
}
}
}
- ① 遍历
completedSends
集合 - ② 获得
inFlightRequests
最后一个请求 - ③ 检查请求是否需要响应。
- 将
inFlightRequests
队列的第一个请求删除 - 生成
ClientResponse
对象, 添加到 response集合中
- 将
2.6.5 handleCompletedReceives 方法
遍历completedReceives 队列, 并在 InFlightRequests
中删除对应的ClientRequest,并向 response 列表中添加对应的ClientResponse。如果是Metadata更新请求的响应,则会调用 MetadataUpdate#handleSuccessfulResponse
方法更新元数据
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
//① 获取响应的NodeId
String source = receive.source();
//② 从inFlightRequests 取出相对应的request
InFlightRequest req = inFlightRequests.completeNext(source);
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
if (log.isTraceEnabled()) {
log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
req.header.apiKey(), req.header.correlationId(), responseStruct);
}
//③ 解析response
AbstractResponse body = AbstractResponse.
parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
maybeThrottle(body, req.header.apiVersion(), req.destination, now);
//④ 如果是 MetadataResponse 则调用 metadataUpdater.handleSuccessfulResponse 更新元数据
if (req.isInternalRequest && body instanceof MetadataResponse)
metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
responses.add(req.completed(body, now));
}
}
- ① 获取响应的
NodeId
- ②
从inFlightRequests
取出相对应的request - ③ 解析
response
- ④ 如果是
MetadataResponse
则调用metadataUpdater.handleSuccessfulResponse
更新元数据- 如果是
ApiVersionsResponse
则调用handleApiVersionsResponse
- 如果都不是放入response队列中
- 如果是
2.6.6 handleDisconnections 方法
遍历disconnected 列表,将 inFlightRequests
对应节点的ClientRequest 清空,对每个请求都创建一个ClientResponse 并添加到response 列表中。这里创建的ClietResponse 会标识不是服务端返回的正常响应,而是因为连接断开了。如果是Metadata元数据更新请求的响应,则会调用metadataUpdater#handleServerDisconnect
方法处理
private void handleDisconnections(List<ClientResponse> responses, long now) {
for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) {
String node = entry.getKey();
log.debug("Node {} disconnected.", node);
//① 更新连接状态,并清理掉InFlightRequests 中断开连接的Node 对应的ClientRequest
processDisconnection(responses, node, now, entry.getValue());
}
}
private void processDisconnection(List<ClientResponse> responses,
String nodeId,
long now,
ChannelState disconnectState) {
//② 更新状态为 disconnected
connectionStates.disconnected(nodeId, now);
//从apiVersions 中删除 apiVersions
apiVersions.remove(nodeId);
nodesNeedingApiVersionsFetch.remove(nodeId);
switch (disconnectState.state()) {
case AUTHENTICATION_FAILED:
AuthenticationException exception = disconnectState.exception();
connectionStates.authenticationFailed(nodeId, now, exception);
log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,
disconnectState.remoteAddress(), exception.getMessage());
break;
case AUTHENTICATE:
log.warn("Connection to node {} ({}) terminated during authentication. This may happen " +
"due to any of the following reasons: (1) Authentication failed due to invalid " +
"credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS " +
"traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.",
nodeId, disconnectState.remoteAddress());
break;
case NOT_CONNECTED:
log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
break;
default:
break; // Disconnections in other states are logged at debug level in Selector
}
cancelInFlightRequests(nodeId, now, responses);
metadataUpdater.handleServerDisconnect(now, nodeId, Optional.ofNullable(disconnectState.exception()));
}
2.6.7 handleConnections 方法
处理connected列表
private void handleConnections() {
for (String node : this.selector.connected()) {
if (discoverBrokerVersions) {
this.connectionStates.checkingApiVersions(node);
nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
log.debug("Completed connection to node {}. Fetching API versions.", node);
} else {
this.connectionStates.ready(node);
log.debug("Completed connection to node {}. Ready.", node);
}
}
}
2.6.8 handleInitiateApiVersionRequests 方法
private void handleInitiateApiVersionRequests(long now) {
Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}
}
}
2.6.9 handleTimedOutRequests 方法
处理InFlightRequests中超时的请求
private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
for (String nodeId : nodeIds) {
// close connection to the node
this.selector.close(nodeId);
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
//和2.6.6 handleDisconnections 方法一样调用
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
}
2.6.10 completeResponses 方法
完成响应,遍历response 列表一一调用onComplete方法。如果以生产者发送消息为例最终会回调Sender线程中的handleProduceResponse
方法。
private void completeResponses(List<ClientResponse> responses) {
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
Kafka的网络层大致就分析完了,采用了 Reactor 模式,一个Acceptor 线程通过给不同的端点放置监听器来获取Connection,通过多个Processor 来处理这些请求并且返回这些请求的响应,多个Handler 线程来真正的执行这些请求以及生成响应,Processor 与 Handler 之间通过RequestChannel 做数据交换。 那么接下来会分析 Handler 线程以及 KafkaApis。
参考: Kafka-2.5.0 源码
《Apache Kafka源码剖析》
《深入理解Kafka:核心设计与实践》