Kafka 服务端(一)网络层

2020年4月22日 | 作者 Siran | 14000字 | 阅读大约需要28分钟
归档于 消息队列 | 标签 #kafka

1. 概述

Kafka 的网络层使用了 Reactor 模式,其中又有两种类型:

  • date-plane:专门处理来自 客户端broker 的请求
    • 一个 Acceptor 线程,用于接收并处理所有的新连接
    • N个 Processor 线程,每一个 Processor 线程中又有自己的 Selector,用于从连接中读取请求和写会响应
    • M个 Handler 线程用来处理请求和生成响应,用于处理请求并将产生的响应返回给Processor。
  • control-plane:专门处理来自 controller 的请求,需要通过 control.plane.listener.name 显示配置,如果不配置默认使用date-plane的线程模型
    • 一个 Acceptor 线程,用于接收并处理所有的新连接
    • 一个 Processor 线程,每一个 Processor 线程中又有自己的 Selector,用于从连接中读取请求和写会响应
    • 一个 Handler 线程用来处理请求和生成响应,用于处理请求并将产生的响应返回给Processor。
    • 就是通过 KafkaController 和 Broker 之间做交互的时候使用。

如果不熟悉Reactor模型的可以看以下两篇博客:

下面是 data-plane 模式的结构:

  • SocketServer 是核心类,其中包含一个 Acceptor 线程用于接收并处理所有的新连接
  • 每个Acceptor又包含多个Processor 线程
  • 每一个 Processor 线程中又有自己的 Selector,用于从连接中读取请求和写会响应
  • 每个Acceptor又包含多个 Handler 线程用来处理请求和生成响应,用于处理请求并将产生的响应返回给Processor。
  • 而Processor 线程与Handler线程之间通过RequestChannel 进行通信。

control-plane就是把多个Processor 和 多个 Handler 都变成了一个,接下来开始分析源码。


2. 源码解析

2.1 SocketServer

class SocketServer(val config: KafkaConfig,
                   val metrics: Metrics,
                   val time: Time,
                   val credentialProvider: CredentialProvider)
  extends Logging with KafkaMetricsGroup with BrokerReconfigurable {

  //① 在 RequestChannel 的 requestQueue 中缓存的最大请求个数, 默认是500  可以通过queued.max.requests进行配置
  private val maxQueuedRequests = config.queuedMaxRequests

  //micrometer 用于指标采集
  private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
  private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
  private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
  memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))

  //② 内存池,和RecordAccumualtor中的BufferPool 一个意思
  private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
  //③ data-plane  数据平面的Processor 集合
  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
  //④ 数据平面 EndPoint Acceptor的映射关系:
  //EndPoint :一般服务器都有多个网卡,可以配置多个IP,kafka 可以监听多个端口,EndPoint 封装了多个需要监听的Host、Port及使用的网络协议
  //每个EndPoint 都会创建对应的Acceptor,也就是说一个指定的IP,Port 都有一个Acceptor监听
  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
  //⑤ 数据平面中用于存在R'e'q'e
  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix)
  // control-plane
  private var controlPlaneProcessorOpt : Option[Processor] = None
  private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
  val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, ControlPlaneMetricPrefix))

  //⑥ 下一个Processor Id
  private var nextProcessorId = 0
  //⑦ 提供了控制每个 IP 上的最大连接数的功能。底层通过Map对象,记录每个IP 地址上建立的连接数。
  //创建新Connect 时与 maxConnectionsPerIpOverrides(或者maxConnectionsPerIp) 指定的最大值进行比较。
  //超过则报错,因为有多个Acceptor 线程并发访问底层的 Map 对象,则需要synchronized 进行同步
  private var connectionQuotas: ConnectionQuotas = _
  private var stoppedProcessingRequests = false
  • maxQueuedRequests :在 RequestChannel 的 requestQueue 中缓存的最大请求个数, 默认是500 可以通过queued.max.requests进行配置
  • memoryPool:内存池,和RecordAccumualtor中的BufferPool 一个意思
    • socket.request.max.bytes:最大字节数,超过会报错。默认是100M
  • dataPlaneProcessors:data-plane 中的 Processor 线程集合,通过 ConcurrentHashMap 进行存储。
  • dataPlaneAcceptors:数据平面 EndPoint Acceptor的映射关系,通过 ConcurrentHashMap 进行存储。
    • EndPoint :一般服务器都有多个网卡,可以配置多个IP,kafka 可以监听多个端口,EndPoint 封装了多个需要监听的Host、Port及使用的网络协议
    • 每个EndPoint 都会创建对应的Acceptor,也就是说一个指定的 ip,port 都有一个Acceptor监听
  • dataPlaneRequestChannel :Processor 线程与 Handler 线程交互的信息的队列,能缓存的最大值就是 maxQueuedRequests
  • nextProcessorId : 下一个Processor 的ID(轮训算法)
  • connectionQuotas:提供了控制每个 IP 上的最大连接数的功能。底层通过Map对象,记录每个IP 地址上建立的连接数。创建新Connect 时与 maxConnectionsPerIpOverrides(或者maxConnectionsPerIp) 指定的最大值进行比较。超过则报错,因为有多个Acceptor 线程并发访问底层的 Map 对象,则需要 synchronized 进行同步

2.1.1 startup

初始化方法

def startup(startupProcessors: Boolean = true): Unit = {
    //① 同步
    this.synchronized {
      //② 创建 ConnectionQuotas
      connectionQuotas = new ConnectionQuotas(config, time)
      //③ 创建控制平面  processor 线程 、 acceptor 会在 创建数据/控制平面的时候进行创建
      createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
      // 创建数据平面
      createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
      //④ 是否启动Processor线程
      if (startupProcessors) {
        //启动线程
        startControlPlaneProcessor(Map.empty)
        startDataPlaneProcessors(Map.empty)
      }
    }
}   
  • synchronized 进行同步
  • ② 初始化 ConnectionQuotas
  • ③ 初始化 Acceptor 线程 和 Processor 线程
  • ④ 启动 Processor 线程

注:SocketServer.startup: Kafka#mian ——> KafkaServerStartable#startup ——> KafkaServer#startup。

也就是说在Kafka启动就会调用SocketServer#startup方法进行初始化。

2.1.2 初始化 ConnectionQuotas

SocketServer的内部类,用于控制每个IP最多的连接数

class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
  //① 每个IP上能创建的最大连接数 通过max.connections.per.ip 配置 默认值 Integer.MAX_VALUE(2的31次方-1)
  @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
  //② 具体指定某IP上最大的连接数,这里指定的最大连接数会覆盖上面 maxConnectionsPerIp  字段的值,通过max.connections.per.ip.overrides配置,默认是 ""
  @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
  //③ broker 的最大连接数, max.connections 默认值 Integer.MAX_VALUE(2的31次方-1)
  @volatile private var brokerMaxConnections = config.maxConnections
  //④ ip 与 连接数的映射关系
  private val counts = mutable.Map[InetAddress, Int]()

  //⑤ 监听器 与 对应数量的映射关系
  private val listenerCounts = mutable.Map[ListenerName, Int]()
  //⑥ 监听器与最大连接数的映射关系
  private val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]()
  @volatile private var totalCount = 0
}
  • defaultMaxConnectionsPerIp: 每个IP上能创建的最大连接数 通过max.connections.per.ip 配置 默认值 Integer.MAX_VALUE(2的31次方-1)
  • maxConnectionsPerIpOverrides:具体指定某IP上最大的连接数,这里指定的最大连接数会覆盖上面 maxConnectionsPerIp 字段的值,通过max.connections.per.ip.overrides配置,默认是 "”
  • brokerMaxConnections:broker 的最大连接数, max.connections 默认值 Integer.MAX_VALUE(2的31次方-1)
  • counts: ip 与 连接数的映射关系
  • ⑤ 监听器 与 对应数量的映射关系
  • ⑥ 监听器与最大连接数的映射关系

2.1.3 createDataPlaneAcceptorsAndProcessors

初始化Acceptor 线程与 Processor 线程 ,以 Data-Plane 为例子

private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                    endpoints: Seq[EndPoint]): Unit = synchronized {
    //循环                                                    
    endpoints.foreach { endpoint =>
      //③ 在connectionQuotas监听器添加监听器,如果这个端点之前已经有监听器监听了,那么直接唤醒,否则创建加入唤醒
      connectionQuotas.addListener(config, endpoint.listenerName)
      //④ 创建acceptor线程
      val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
      //⑤ 给创建的acceptor 线程添加processor 线程
      addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
      //⑥ 启动dataPlaneAcceptor线程
      KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
      //⑦ 调用countdownLatch#await 方法等待唤醒,当Acceptor线程启动的时候 会调用countdown 方法唤醒。
      dataPlaneAcceptor.awaitStartup()
      //⑧ 放入dataPlaneAcceptors中
      dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
      info(s"Created data-plane acceptor and processors for endpoint : $endpoint")
    }
  }
  • dataProcessorsPerListener 参数

  • endpoints 参数

  • connectionQuotas#addListener()方法:在connectionQuotas监听器添加监听器,如果这个端点之前已经有监听器监听了,那么直接唤醒,否则创建加入唤醒

    private[network] def addListener(config: KafkaConfig, listenerName: ListenerName): Unit = {
      //同步  
      counts.synchronized {
          //是否包含,不包含创建
        if (!maxConnectionsPerListener.contains(listenerName)) {
          val newListenerQuota = new ListenerConnectionQuota(counts, listenerName)
          maxConnectionsPerListener.put(listenerName, newListenerQuota)
          listenerCounts.put(listenerName, 0)
          config.addReconfigurable(newListenerQuota)
        }
        //唤醒
        counts.notifyAll()
      }
    }
    
  • createAcceptor()方法:创建acceptor线程

    private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = synchronized {
      val sendBufferSize = config.socketSendBufferBytes
      val recvBufferSize = config.socketReceiveBufferBytes
      val brokerId = config.brokerId
      //创建Acceptor线程
      new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas, metricPrefix)
    }
    
    • sendBufferSize : 发送最大的字节数 默认是100 * 1024 可以通过socket.send.buffer.bytes配置
    • recvBufferSize :接收最大的字节数 默认是100 * 1024 可以通过 socket.receive.buffer.bytes 配置
  • addDataPlaneProcessors()方法 :给创建的acceptor 线程添加 processor 线程

    private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
      //监听的端点
      val listenerName = endpoint.listenerName
      //该端点的协议
      val securityProtocol = endpoint.securityProtocol
      //创建该端点的Processor 数组
      val listenerProcessors = new ArrayBuffer[Processor]()
      //注意:这个创建多少个Processor 通过 num.network.threads 配置,默认3个
      for (_ <- 0 until newProcessorsPerListener) {
        val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)
        listenerProcessors += processor
        dataPlaneRequestChannel.addProcessor(processor)
        nextProcessorId += 1
      }
      listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
      acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)
    }
    
    • 注意:newProcessorsPerListener 这个创建多少个Processor 通过 num.network.threads 配置,默认3个。也就是说该参数设置的Processor的数量。Acceptor数量 始终是一个。
  • ⑥ 启动dataPlaneAcceptor线程,名字为

  • ⑦ 调用 countdownLatch#await 方法等待唤醒,当Acceptor线程启动的时候会调用 countdown 方法唤醒。

    def awaitStartup(): Unit = startupLatch.await
    
  • ⑧ 放入 dataPlaneAcceptors 中,就是调用 ConcurrentHashMap#put 方法

2.1.5 startDataPlaneProcessors

启动 Processor 线程 ,以 Data-Plane 为例子

def startDataPlaneProcessors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = synchronized {
    val interBrokerListener = dataPlaneAcceptors.asScala.keySet
      .find(_.listenerName == config.interBrokerListenerName)
      .getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
    val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
      dataPlaneAcceptors.asScala.filterKeys(_ != interBrokerListener).values
    orderedAcceptors.foreach { acceptor =>
      val endpoint = acceptor.endPoint
      debug(s"Wait for authorizer to complete start up on listener ${endpoint.listenerName}")
      waitForAuthorizerFuture(acceptor, authorizerFutures)
      debug(s"Start processors on listener ${endpoint.listenerName}")
      //看这里就行
      acceptor.startProcessors(DataPlaneThreadPrefix)
    }
    info(s"Started data-plane processors for ${dataPlaneAcceptors.size} acceptors")
  }

private[network] def startProcessors(processorThreadPrefix: String): Unit = synchronized {
    if (!processorsStarted.getAndSet(true)) {
      startProcessors(processors, processorThreadPrefix)
    }
  }

也就是说在 2.1.4的addDataPlaneProcessors 创建的Processor数组,这里会循环的一一启动。

在这里初始化就结束了。接下来逐一分析 Acceptor 线程Processor 线程是怎么工作的。


2.2 AbstractServerThread

AbstractServerThread 提供了启动关闭相关的方法并且实现了Runnable 接口的抽象类,而 Acceptor 线程Processor 线程都继承它。

private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
  //① count 为 1 的CountDownLatch 对象,标识了当前线程的startup操作是否完成,在2.1.3中的第⑦步有体现。
  private val startupLatch = new CountDownLatch(1)

  //② count 为 0 的CountDownLatch 对象,标识了当前线程的shutdown操作是否完成
  @volatile private var shutdownLatch = new CountDownLatch(0)

  //③ 表示当前线程是否存活,初始化的时候为true,在shutdown 方法的时候会cas的改为false
  private val alive = new AtomicBoolean(true)
 
  //④ wakeup 方法,由子类实现
  def wakeup(): Unit

  //⑤ shutdown 方法,提供默认实现
  def shutdown(): Unit = {
    if (alive.getAndSet(false))
      wakeup()
    shutdownLatch.await()
  }

  //⑥ awaitStartup 方法 提供默认实现
  def awaitStartup(): Unit = startupLatch.await

  //⑦ startupComplete方法 ,提供默认实现
  protected def startupComplete(): Unit = {
    // Replace the open latch with a closed one
    shutdownLatch = new CountDownLatch(1)
    startupLatch.countDown()
  }

  //⑧ `shutdownComplete` 方法,提供默认实现
  protected def shutdownComplete(): Unit = shutdownLatch.countDown()

  protected def isRunning: Boolean = alive.get

  def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
    if (channel != null) {
      debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress()}")
      //关闭 socketChannel 并减少 记录的连接数
      connectionQuotas.dec(listenerName, channel.socket.getInetAddress)
      CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
      CoreUtils.swallow(channel.close(), this, Level.ERROR)
    }
  }
}
  • startupLatch :count 为 1 的 CountDownLatch 对象,标识了当前线程的startup操作是否完成,在2.1.3中的第⑦步有体现。
  • shutdownLatch : count 为 0 的CountDownLatch 对象,标识了当前线程的shutdown操作是否完成
  • alive : 表示当前线程是否存活,初始化的时候为true,在shutdown 方法的时候会cas的改为false
  • wakeup 方法,由子类实现
  • shutdown 方法,提供默认实现
  • awaitStartup 方法 提供默认实现
  • startupComplete 方法 ,提供默认实现
  • shutdownComplete 方法,提供默认实现

方法都比较简单,下面通过它的两个实现类 AcceptorProcessor 来看看这些方法是怎么实现的。


2.3 Acceptor

主要接收客户端建立连接的请求,创建Socket 连接并分配给Processor 处理。

private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              connectionQuotas: ConnectionQuotas,
                              metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

  //① java nio selector
  private val nioSelector = NSelector.open()
  //② 接收客户端请求的 ServerSocketChannel
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  //③ processors 类型的数组
  private val processors = new ArrayBuffer[Processor]()
  //④ 标识  processors 线程是否启动
  private val processorsStarted = new AtomicBoolean
  private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent",
    "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))
}    
  • nioSelector : java nio selector
  • serverChannel : 接收客户端请求的 ServerSocketChannel
  • processors : processors 类型的数组
  • processorsStarted : 标识 processors 线程是否启动

初始化 Selector 、创建用于接收客户端请求的 ServerSocketChannel 对象,创建Acceptor管理的 Processor 数组。

2.3.1 startProcessors

启动Processors 数组中的所有Processor 线程

private[network] def startProcessors(processorThreadPrefix: String): Unit = 
synchronized {//同步
    if (!processorsStarted.getAndSet(true)) {
      startProcessors(processors, processorThreadPrefix)
    }
  }

private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
    //启动线程
    processors.foreach { processor =>
      KafkaThread.nonDaemon(s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
        processor).start()
    }
  } 

2.3.2 addProcessors

添加Processor 线程

private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
    processors ++= newProcessors
    if (processorsStarted.get)
      startProcessors(newProcessors, processorThreadPrefix)
  }

2.3.3 removeProcessors

删除Processor 线程

private[network] def removeProcessors(removeCount: Int, requestChannel: RequestChannel): Unit = synchronized {
    val toRemove = processors.takeRight(removeCount)
    processors.remove(processors.size - removeCount, removeCount)
    toRemove.foreach(_.shutdown())
    toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
  }

2.3.4 shutdown

关闭方法

override def shutdown(): Unit = {
    //调用父类的shutdown
    super.shutdown()
    synchronized {
      //调用Processors 数组中的Processor 的shutdown 关闭processor线程
      processors.foreach(_.shutdown())
    }
  }

2.3.5 run

核心方法:由于Acceptor的父类AbstractServerThread继承了Runnable 所以这个run方法是主要方法。就是对OP_ACCEPT 事件的处理

def run(): Unit = {
    //① 在channel 中注册accept事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    //② 标识当前线程启动操作已经完成
    startupComplete()
    try {
      var currentProcessorIndex = 0
      while (isRunning) {//③ 检测线程运行状态
        try {

          val ready = nioSelector.select(500) //④ 等待关注的事件
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()

                if (key.isAcceptable) { //⑤ 如果是 accept 事件,则调用accept 来处理事件,不是则报错
                  accept(key).foreach { socketChannel =>

                    var retriesLeft = synchronized(processors.length)
                    var processor: Processor = null
                    do {
                      retriesLeft -= 1
                      processor = synchronized {
                        //⑥ 更新下一个processor 的id,使用的是训轮算法
                        currentProcessorIndex = currentProcessorIndex % processors.length
                        processors(currentProcessorIndex)
                      }
                      currentProcessorIndex += 1
                      //⑦ 通过 assignNewConnection方法 给Processor 线程分配connection
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                  }
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
      CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
      shutdownComplete()
    }
  }
  • ① 在channel 中注册accept事件
  • ② 标识当前线程启动操作已经完成
  • ③ 检测线程运行状态
  • ④ 等待关注的事件
  • ⑤ 如果是 accept 事件,则调用 accept 来处理事件,不是则报错
    //实现对 `OP_ACCEPT` 事件的处理,它会创建 `SocketChannel` 同时还会增加 `ConnectionQuotas` 中记录的连接数。
    private def accept(key: SelectionKey): Option[SocketChannel] = {
      val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
      //① 创建socketChannel
      val socketChannel = serverSocketChannel.accept()
      try {
        //② 增加 connectionQuotas 中连接数记录
        connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
        //非堵塞
        socketChannel.configureBlocking(false)
        //tcp连接不允许延迟,设置keepAlive 和 sendBufferSize
        socketChannel.socket().setTcpNoDelay(true)
        socketChannel.socket().setKeepAlive(true)
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
          socketChannel.socket().setSendBufferSize(sendBufferSize)
        Some(socketChannel)
      } catch {
        case e: TooManyConnectionsException =>
          info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
          close(endPoint.listenerName, socketChannel)
          None
      }
    }
    
  • ⑥ 更新下一个 processor 的id,使用的是训轮算法
  • ⑦ 通过 assignNewConnection 方法 给Processor 线程分配connection,与我之前看的2.1.1版本有所不同(直接调用Processor#accept)

2.4 Processor

主要用于完成读取请求和写回响应的操作,具体的业务操作通过 Handler 线程来处理

  //① ArrayBlockingQueue ,其中保存了由此Processor 处理的新建的SocketChannel
  private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
  //② 保存未发送的响应,与客户端的inflightRequest 的不同的是 客户端并不会对服务端发送的响应消息再次发送确认,
  //所以inflightResponse 中的响应会在发送成功后成功后移除,而InFlightRequest中的请求是在收到响应后才会移除
  private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
  //③ LinkedBlockingDeque 由此 Processor 需要响应请求的队列
  private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
  //④ 因连接超时kill的数量
  val expiredConnectionsKilledCount = new CumulativeSum()
  //⑤ 创建selector KSelector 类型,通过builder 模式 负责管理网络连接
  private val selector = createSelector(
    ChannelBuilders.serverChannelBuilder(listenerName,
      listenerName == config.interBrokerListenerName,
      securityProtocol,
      config,
      credentialProvider.credentialCache,
      credentialProvider.tokenCache,
      time,
      logContext))
  • newConnections :ArrayBlockingQueue ,其中保存了由此Processor 处理的新建的SocketChannel
  • inflightResponses :保存未发送的响应,与客户端的inflightRequest 的不同的是 客户端并不会对服务端发送的响应消息再次发送确认,所以inflightResponse 中的响应会在发送成功后成功后移除,而InFlightRequest中的请求是在收到响应后才会移除
  • responseQueue :LinkedBlockingDeque 由此 Processor 需要响应请求的队列
  • expiredConnectionsKilledCount :因连接超时kill的数量
  • selector :创建selector KSelector 类型,通过builder 模式 负责管理网络连接

2.4.1 run 方法

核心方法:和 Acceptor 一样继承 AbstractServerThread

  override def run(): Unit = {
    //① 标识Processor的初始化流程已经结束,唤醒堵塞等待此Processor初始化完成得线程
    startupComplete()
    try {
      while (isRunning) {
        try {
          //② 处理newConnections 队列中新建的 SocketChannel,队列中每个SocketChannel 都要在nioSelector 上注册read事件。
          //socketChannel 会被封装成KafkaChannel,并附加到selectionKey上,所以后面出发read 事件时,从selectionKey 获取的是KafkaChannel 类型的对象,
          configureNewConnections()
          //③ 获取RequestChannel 中对应的requestQueue 队列, 并处理其中缓存的Response、
          //如果response 是SendAction 类型,表示该Response 需要发送给客户端,则查找对应的kafkaChannel,为其注册write事件,并将kafkaChannel.send字段指向待发送的response对象。
          //同时还会将response 从 responseQueue队列中移除,放入inflightResponse中,当发送完一个完成的响应后,会取消此连接注册的write事件。
          //如果response 是NoOpAction类型,表示此连接暂无响应需要发送,则KafkaClient注册read,允许其继续读取请求
          //如果response 是closeConnectionAction 类型,则关闭对应的连接
          processNewResponses()
          //④ 发送响应 调用的 networkclient的 Kselector.poll 发送成功后加入 completedRecevices, completedSends, disConnected中等待处理
          poll()
          //⑤ 处理KSelector.completedReceive队列。遍历completedReceive,将networkreceive、processorid、身份认证信息一起封装成
          //RequestChannel.Request对象并放入requestChannel.requestQueue队列中,等待Handler线程的后续处理。
          //之后取消对应KafkaChannel注册的read事件,表示在发送响应之前此连接不能在读取任何请求了
          processCompletedReceives()
          //⑥ 处理KSelector.completedSend队列。将inflightResponse 中保存对应的response删除,为其连接重新注册read事件,允许从该连接读取数据
          processCompletedSends()
          //⑦ 处理KSelector.disconnected队列,先从inflightResponse 中删除该连接对应的所有Response。然后减少ConnectionQuotas中记录的连接数,为后续新建连接做准备
          processDisconnected()
          //⑧ 调用close 关闭
          closeExcessConnections()
        } catch {
          case e: Throwable => processException("Processor got uncaught exception.", e)
        }
      }
    } finally {
      debug(s"Closing selector - processor $id")
      CoreUtils.swallow(closeAll(), this, Level.ERROR)
      shutdownComplete()
    }
  }
  • startupComplete方法 :标识Processor的初始化流程已经结束,唤醒堵塞等待此Processor初始化完成得线程
  • configureNewConnections方法:处理 newConnections 队列中新建的 SocketChannel,队列中每个 SocketChannel 都要在 nioSelector 上注册read事件。socketChannel 会被封装成 KafkaChannel,并附加到selectionKey上,所以后面出发 read 事件时,从selectionKey 获取的是KafkaChannel 类型的对象
  • processNewResponses方法:获取 RequestChannel 中对应的requestQueue 队列, 并处理其中缓存的Response、如果response 是SendAction 类型,表示该Response 需要发送给客户端,则查找对应的kafkaChannel,为其注册 write 事件,并将kafkaChannel.send字段指向待发送的response对象。同时还会将response 从 responseQueue队列中移除,放入 inflightResponse 中,当发送完一个完成的响应后,会取消此连接注册的write事件。
    • 如果response 是NoOpAction类型,表示此连接暂无响应需要发送,则KafkaClient注册read,允许其继续读取请求
    • 如果response 是closeConnectionAction 类型,则关闭对应的连接
  • poll方法 :发送响应 调用的 networkclient的 Kselector.poll 发送成功后加入 completedRecevices, completedSends, disConnected中等待处理
  • processCompletedReceives方法:处理KSelector.completedReceive队列。遍历completedReceive,将networkreceive、processorid、身份认证信息一起封装成RequestChannel.Request对象并放入requestChannel.requestQueue队列中,等待Handler线程的后续处理。之后取消对应KafkaChannel注册的read事件,表示在发送响应之前此连接不能在读取任何请求了
  • processCompletedSends方法:处理KSelector.completedSend队列。将 inflightResponse 中保存对应的response删除,为其连接重新注册read事件,允许从该连接读取数据
  • processDisconnected方法:处理KSelector.disconnected队列,先从inflightResponse 中删除该连接对应的所有Response。然后减少ConnectionQuotas中记录的连接数,为后续新建连接做准备
  • closeExcessConnections方法:调用close 关闭

2.4.2 startupComplete 方法

标识Processor的初始化流程已经结束,唤醒堵塞等待此Processor初始化完成得线程

protected def startupComplete(): Unit = {
    //①创建count数为1的shutdownLatch
    shutdownLatch = new CountDownLatch(1)
    //②唤醒被startupLatch 堵塞的线程
    startupLatch.countDown()
  }
  • ① 这里解释了为什么 2.2 AbstractServerThread 中的 shutdownLatch 初始值是0。
  • ② 唤醒被 startupLatch 堵塞的线程

2.4.3 configureNewConnections 方法

处理 newConnections 队列中新建的 SocketChannel,队列中每个 SocketChannel 都要在 nioSelector 上注册read事件。socketChannel 会被封装成 KafkaChannel,并附加到selectionKey上,所以后面出发 read 事件时,从selectionKey 获取的是KafkaChannel 类型的对象

private def configureNewConnections(): Unit = {
    var connectionsProcessed = 0
    while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {//遍历newConnections 集合
      val channel = newConnections.poll()
      try {
        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
        //注册OP_READ事件
        selector.register(connectionId(channel.socket), channel)
        connectionsProcessed += 1
      } catch {
        // We explicitly catch all exceptions and close the socket to avoid a socket leak.
        case e: Throwable =>
          val remoteAddress = channel.socket.getRemoteSocketAddress
          // need to close the channel here to avoid a socket leak.
          close(listenerName, channel)
          processException(s"Processor $id closed connection from $remoteAddress", e)
      }
    }
  }

2.4.4 processNewResponses 方法

processNewResponses方法:获取 RequestChannel 中对应的requestQueue 队列, 并处理其中缓存的Response、如果response 是SendAction 类型,表示该Response 需要发送给客户端,则查找对应的kafkaChannel,为其注册 write 事件,并将kafkaChannel.send字段指向待发送的response对象。同时还会将response 从 responseQueue队列中移除,放入 inflightResponse 中,当发送完一个完成的响应后,会取消此连接注册的write事件。

  • 如果response 是NoOpAction类型,表示此连接暂无响应需要发送,则KafkaClient注册read,允许其继续读取请求
  • 如果response 是closeConnectionAction 类型,则关闭对应的连接
private def processNewResponses(): Unit = {
    var currentResponse: RequestChannel.Response = null
    //dequeueResponse 方法 从responseQueue 队列中弹出一个response
    while ({currentResponse = dequeueResponse(); currentResponse != null}) {
      val channelId = currentResponse.request.context.connectionId
      try {
        currentResponse match {
          case response: NoOpResponse =>
            updateRequestMetrics(response)
            trace(s"Socket server received empty response to send, registering for read: $response")
            handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
            tryUnmuteChannel(channelId)

          case response: SendResponse =>
            sendResponse(response, response.responseSend)
          case response: CloseConnectionResponse =>
            updateRequestMetrics(response)
            trace("Closing socket connection actively according to the response code.")
            close(channelId)
          case _: StartThrottlingResponse =>
            handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
          case _: EndThrottlingResponse =>
            // Try unmuting the channel. The channel will be unmuted only if the response has already been sent out to
            // the client.
            handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
            tryUnmuteChannel(channelId)
          case _ =>
            throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
        }
      } catch {
        case e: Throwable =>
          processChannelException(channelId, s"Exception while processing response for $channelId", e)
      }
    }
  }

2.4.5 poll 方法

发送响应 调用的 networkclient的 Kselector.poll 发送成功后加入 completedRecevices, completedSends, disConnected中等待处理。 和KafkaProducer 中发送消息是一样的。

private def poll(): Unit = {
    val pollTimeout = if (newConnections.isEmpty) 300 else 0
    try selector.poll(pollTimeout)
    catch {
      case e @ (_: IllegalStateException | _: IOException) =>
        error(s"Processor $id poll failed", e)
    }
  }

2.4.6 processCompletedReceives 方法

处理KSelector.completedReceive队列。遍历completedReceive,将networkreceive、processorid、身份认证信息一起封装成RequestChannel.Request对象并放入requestChannel.requestQueue队列中,等待Handler线程的后续处理。之后取消对应KafkaChannel注册的read事件,表示在发送响应之前此连接不能在读取任何请求了

private def processCompletedReceives(): Unit = {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        openOrClosingChannel(receive.source) match {
          case Some(channel) =>
            val header = RequestHeader.parse(receive.payload)
            if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
              trace(s"Begin re-authentication: $channel")
            else {
              val nowNanos = time.nanoseconds()
              if (channel.serverAuthenticationSessionExpired(nowNanos)) {
                // be sure to decrease connection count and drop any in-flight responses
                debug(s"Disconnecting expired channel: $channel : $header")
                close(channel.id)
                expiredConnectionsKilledCount.record(null, 1, 0)
              } else {
                val connectionId = receive.source
                val context = new RequestContext(header, connectionId, channel.socketAddress,
                  channel.principal, listenerName, securityProtocol,
                  channel.channelMetadataRegistry.clientInformation)
                val req = new RequestChannel.Request(processor = id, context = context,
                  startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
                // KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
                // and version. It is done here to avoid wiring things up to the api layer.
                if (header.apiKey == ApiKeys.API_VERSIONS) {
                  val apiVersionsRequest = req.body[ApiVersionsRequest]
                  if (apiVersionsRequest.isValid) {
                    channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
                      apiVersionsRequest.data.clientSoftwareName,
                      apiVersionsRequest.data.clientSoftwareVersion))
                  }
                }
                requestChannel.sendRequest(req)
                selector.mute(connectionId)
                handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
              }
            }
          case None =>
            // This should never happen since completed receives are processed immediately after `poll()`
            throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
        }
      } catch {
        // note that even though we got an exception, we can assume that receive.source is valid.
        // Issues with constructing a valid receive object were handled earlier
        case e: Throwable =>
          processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
      }
    }
  }

2.4.7 processCompletedSends 方法

处理KSelector.completedSend队列。将 inflightResponse 中保存对应的response删除,为其连接重新注册read事件,允许从该连接读取数据

private def processCompletedSends(): Unit = {
    selector.completedSends.asScala.foreach { send =>
      try {
        val response = inflightResponses.remove(send.destination).getOrElse {
          throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
        }
        updateRequestMetrics(response)

        response.onComplete.foreach(onComplete => onComplete(send))

        handleChannelMuteEvent(send.destination, ChannelMuteEvent.RESPONSE_SENT)
        tryUnmuteChannel(send.destination)
      } catch {
        case e: Throwable => processChannelException(send.destination,
          s"Exception while processing completed send to ${send.destination}", e)
      }
    }
  }

2.4.8 processDisconnected 方法

处理KSelector.disconnected队列,先从inflightResponse 中删除该连接对应的所有Response。然后减少ConnectionQuotas中记录的连接数,为后续新建连接做准备

private def processDisconnected(): Unit = {
    selector.disconnected.keySet.asScala.foreach { connectionId =>
      try {
        val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
          throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
        }.remoteHost
        inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
        // the channel has been closed by the selector but the quotas still need to be updated
        connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
      } catch {
        case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e)
      }
    }
  }

2.4.9 closeExcessConnections 方法

调用close 关闭

private def closeExcessConnections(): Unit = {
    if (connectionQuotas.maxConnectionsExceeded(listenerName)) {
      val channel = selector.lowestPriorityChannel()
      if (channel != null)
        close(channel.id)
    }
  }

2.5 RequestChannel

Processor 线程与 Handler线程之间传递数据是通过RequestChannel 完成的。

class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
  import RequestChannel._
  val metrics = new RequestChannel.Metrics
  //① processor 线程向 handler 线程传递请求的队列,因为多个processor 线程和多个handler线程并发操作,所以选择线程安全的队列。
  private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
  //② processors 线程
  private val processors = new ConcurrentHashMap[Int, Processor]()
  //③ 相对应的responseQueue 位于Processor中
  private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
}
  • requestQueue : processor 线程向 handler 线程传递请求的队列,因为多个processor 线程和多个handler线程并发操作,所以选择线程安全的队列。
  • ② processors 线程
  • responseQueue :相对应的responseQueue 位于Processor中

2.5.1 sendRequest

把请求塞入 requestQueue 中,由KafkaRequestHandler线程 不断的调用 receiveRequest方法获取。

def sendRequest(request: RequestChannel.Request): Unit = {
    requestQueue.put(request)
  }

2.5.2 receiveRequest

此方法会被 KafkaRequestHandler线程 不断的调用。

def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
    requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

2.5.3 sendResponse

KafkaRequestHandler线程 塞入responseQueue中。

def sendResponse(response: RequestChannel.Response): Unit = {
    if (isTraceEnabled) {
      val requestHeader = response.request.header
      val message = response match {
        case sendResponse: SendResponse =>
          s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${sendResponse.responseSend.size} bytes."
        case _: NoOpResponse =>
          s"Not sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} as it's not required."
        case _: CloseConnectionResponse =>
          s"Closing connection for client ${requestHeader.clientId} due to error during ${requestHeader.apiKey}."
        case _: StartThrottlingResponse =>
          s"Notifying channel throttling has started for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
        case _: EndThrottlingResponse =>
          s"Notifying channel throttling has ended for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
      }
      trace(message)
    }

    val processor = processors.get(response.processor)
    // The processor may be null if it was shutdown. In this case, the connections
    // are closed, so the response is dropped.
    if (processor != null) {
      processor.enqueueResponse(response)
    }
  }

private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
    //入队列
    responseQueue.put(response)
    //唤醒
    wakeup()
  }

都是一些很简单的方法。


2.6 NetworkClient

用于生产者发送消息,用于消费者消费消息以及服务端Broker之间的通信。

//重要的字段
public class NetworkClient implements KafkaClient {
    //selector
    private final Selectable selector;

    //通过Metadataupdater 来判断现在是否正在更新元数据或者是需要更新的状态
    private final MetadataUpdater metadataUpdater;

    //每个节点的连接状态,底层使用 HashMap 管理
    private final ClusterConnectionStates connectionStates;

    //缓存发送出去的消息但是还没有得到响应的请求
    private final InFlightRequests inFlightRequests;

    //发送最大的字节数 默认是100 * 1024 可以通过`socket.send.buffer.bytes`配置
    private final int socketSendBuffer;

    //接收最大的字节数 默认是100 * 1024 可以通过 `socket.receive.buffer.bytes` 配置
    private final int socketReceiveBuffer;

    //默认超时时间
    private final int defaultRequestTimeoutMs;

    //重试 需要等待的时间
    private final long reconnectBackoffMs;

}    

2.6.1 ready 方法

该节点是否可以发送。

public boolean ready(Node node, long now) {
        if (node.isEmpty())
            throw new IllegalArgumentException("Cannot connect to empty node " + node);

        if (isReady(node, now))
            return true;

        if (connectionStates.canConnect(node.idString(), now))
            // if we are interested in sending to a node and we don't have a connection to it, initiate one
            initiateConnect(node, now);

        return false;
    }

满足可以发送的要求有以下几点:

  • isReady :现在不能是正在更新元数据或者是需要更新的状态
    public boolean isReady(Node node, long now) {
          return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now);
      }
    
  • canSendRequest :当前状态要是 ready 状态,并且KafkaChannel 已经创建好了 并且 inFlightRequests 没有缓存满可以继续缓存请求。
    private boolean canSendRequest(String node, long now) {
          return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
              inFlightRequests.canSendMore(node);
      }
    
  • canConnect: 是否可以进行连接,如果状态是disconnect那么就无法进行连接
  • initiateConnect :建立连接。
    private void initiateConnect(Node node, long now) {
          String nodeConnectionId = node.idString();
          try {
              //状态改为正在连接
              connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
              InetAddress address = connectionStates.currentAddress(nodeConnectionId);
              log.debug("Initiating connection to node {} using address {}", node, address);
              //调用selector.connect进行连接
              selector.connect(nodeConnectionId,
                      new InetSocketAddress(address, node.port()),
                      this.socketSendBuffer,
                      this.socketReceiveBuffer);
          } catch (IOException e) {
              //发生错误状态改为disconnected,并且更新元数据
              log.warn("Error connecting to node {}", node, e);
              connectionStates.disconnected(nodeConnectionId, now);
              metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
          }
      }
    

2.6.2 send 方法

主要是将请求设置到KafkaChannel.send字段,同时添加到InFlightRequests队列中等待响应。

public void send(ClientRequest request, long now) {
        doSend(request, false, now);
    }

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
        ensureActive();
        String nodeId = clientRequest.destination();
        //① 检查是否能够向指定 Node 发送请求
        if (!isInternalRequest) {
            if (!canSendRequest(nodeId, now))
                throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
        }
        //② 封装成ClientRequest请求。
        AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
        try {
            NodeApiVersions versionInfo = apiVersions.get(nodeId);
            short version;
            if (versionInfo == null) {
                version = builder.latestAllowedVersion();
                if (discoverBrokerVersions && log.isTraceEnabled())
                    log.trace("No version information found when sending {} with correlation id {} to node {}. " +
                            "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
            } else {
                version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
                        builder.latestAllowedVersion());
            }
            doSend(clientRequest, isInternalRequest, now, builder.build(version));
        } catch (UnsupportedVersionException unsupportedVersionException) {
            log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
                    clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
            ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
                    clientRequest.callback(), clientRequest.destination(), now, now,
                    false, unsupportedVersionException, null, null);

            if (!isInternalRequest)
                abortedSends.add(clientResponse);
            else if (clientRequest.apiKey() == ApiKeys.METADATA)
                metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
        }
    }

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        //给ClientRequest加请求头
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (log.isDebugEnabled()) {
            int latestClientVersion = clientRequest.apiKey().latestVersion();
            if (header.apiVersion() == latestClientVersion) {
                log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
                        clientRequest.correlationId(), destination);
            } else {
                log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
                        header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
            }
        }
        
        Send send = request.toSend(destination, header);
        //③ 放入inFlightRequests中
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
        //④ 放入Send字段
        selector.send(send);
    }    
  • ① 检查是否能够向指定 Node 发送请求
  • ② 封装成 ClientRequest 请求。
  • ③ 放入 inFlightRequests
  • ④ 放入 KafkaChannel.Send 字段
    //org.apache.kafka.common.network.selector
    public void send(Send send) {
          String connectionId = send.destination();
          KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
          if (closingChannels.containsKey(connectionId)) {
              this.failedSends.add(connectionId);
          } else {
              try {
                  //放入 `KafkaChannel.Send` 字段
                  channel.setSend(send);
              } catch (Exception e) {
                  channel.state(ChannelState.FAILED_SEND);
                  this.failedSends.add(connectionId);
                  close(channel, CloseMode.DISCARD_NO_NOTIFY);
                  if (!(e instanceof CancelledKeyException)) {
                      log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
                              connectionId, e);
                      throw e;
                  }
              }
          }
      }
    

2.6.3 poll 方法

public List<ClientResponse> poll(long timeout, long now) {
        ensureActive();
        //① 如果拒绝发送队列里有数据,那么进行处理
        if (!abortedSends.isEmpty()) {
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }

        //② 更新元数据
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            //③ 执行IO操作
            this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        //④ 开始处理一系列的队列
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses);

        return responses;
    }
  • ① 如果拒绝发送队列里有数据,那么进行处理
  • ② 更新元数据
  • ③ 执行IO操作,此操作为核心方法 用于read或者write,详情可以看一开头的两篇博客中的NIO的Selector。
  • ④ 开始处理一系列的队列,就是发送请求或者接收响应之后做一些后续的处理改变状态之类的。在下面一一分析

2.6.4 handleCompletedSends 方法

处理发送成功的请求。首先,InFlightRequests保存的是已发送的但还没有收到响应的请求,completedSends 保存的是最近一次poll()方法中发送成功的请求。所以 InFlightRequestscompletedSends 中对应队列的最后一个请求是一致的。

private void handleCompletedSends(List<ClientResponse> responses, long now) {
        //① 遍历 completedSends 集合
        for (Send send : this.selector.completedSends()) {
            //② 获得 inFlightRequests 最后一个请求
            InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
            //③ 检查请求是否需要响应。
            if (!request.expectResponse) {
                //将 inFlightRequests 队列的第一个请求删除
                this.inFlightRequests.completeLastSent(send.destination());
                //生成 ClientResponse 对象, 添加到 response集合中
                responses.add(request.completed(null, now));
            }
        }
    }
  • ① 遍历 completedSends 集合
  • ② 获得 inFlightRequests 最后一个请求
  • ③ 检查请求是否需要响应。
    • inFlightRequests 队列的第一个请求删除
    • 生成 ClientResponse 对象, 添加到 response集合中

2.6.5 handleCompletedReceives 方法

遍历completedReceives 队列, 并在 InFlightRequests 中删除对应的ClientRequest,并向 response 列表中添加对应的ClientResponse。如果是Metadata更新请求的响应,则会调用 MetadataUpdate#handleSuccessfulResponse 方法更新元数据

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            //① 获取响应的NodeId
            String source = receive.source();
            //② 从inFlightRequests 取出相对应的request
            InFlightRequest req = inFlightRequests.completeNext(source);
            Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                throttleTimeSensor, now);
            if (log.isTraceEnabled()) {
                log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
                    req.header.apiKey(), req.header.correlationId(), responseStruct);
            }
            //③ 解析response
            AbstractResponse body = AbstractResponse.
                    parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
            maybeThrottle(body, req.header.apiVersion(), req.destination, now);
            //④ 如果是 MetadataResponse  则调用 metadataUpdater.handleSuccessfulResponse 更新元数据
            if (req.isInternalRequest && body instanceof MetadataResponse)
                metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) body);
            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
            else
                responses.add(req.completed(body, now));
        }
    }
  • ① 获取响应的NodeId
  • 从inFlightRequests 取出相对应的request
  • ③ 解析response
  • ④ 如果是 MetadataResponse 则调用 metadataUpdater.handleSuccessfulResponse 更新元数据
    • 如果是 ApiVersionsResponse 则调用 handleApiVersionsResponse
    • 如果都不是放入response队列中

2.6.6 handleDisconnections 方法

遍历disconnected 列表,将 inFlightRequests 对应节点的ClientRequest 清空,对每个请求都创建一个ClientResponse 并添加到response 列表中。这里创建的ClietResponse 会标识不是服务端返回的正常响应,而是因为连接断开了。如果是Metadata元数据更新请求的响应,则会调用metadataUpdater#handleServerDisconnect 方法处理

private void handleDisconnections(List<ClientResponse> responses, long now) {
        for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) {
            String node = entry.getKey();
            log.debug("Node {} disconnected.", node);
            //① 更新连接状态,并清理掉InFlightRequests 中断开连接的Node 对应的ClientRequest
            processDisconnection(responses, node, now, entry.getValue());
        }
    }

private void processDisconnection(List<ClientResponse> responses,
                                      String nodeId,
                                      long now,
                                      ChannelState disconnectState) {
        //② 更新状态为 disconnected                          
        connectionStates.disconnected(nodeId, now);
        //从apiVersions 中删除 apiVersions
        apiVersions.remove(nodeId);
        nodesNeedingApiVersionsFetch.remove(nodeId);
        switch (disconnectState.state()) {
            case AUTHENTICATION_FAILED:
                AuthenticationException exception = disconnectState.exception();
                connectionStates.authenticationFailed(nodeId, now, exception);
                log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,
                    disconnectState.remoteAddress(), exception.getMessage());
                break;
            case AUTHENTICATE:
                log.warn("Connection to node {} ({}) terminated during authentication. This may happen " +
                    "due to any of the following reasons: (1) Authentication failed due to invalid " +
                    "credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS " +
                    "traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.",
                    nodeId, disconnectState.remoteAddress());
                break;
            case NOT_CONNECTED:
                log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
                break;
            default:
                break; // Disconnections in other states are logged at debug level in Selector
        }

        cancelInFlightRequests(nodeId, now, responses);
        metadataUpdater.handleServerDisconnect(now, nodeId, Optional.ofNullable(disconnectState.exception()));
    }

2.6.7 handleConnections 方法

处理connected列表

private void handleConnections() {
        for (String node : this.selector.connected()) {
            if (discoverBrokerVersions) {
                this.connectionStates.checkingApiVersions(node);
                nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
                log.debug("Completed connection to node {}. Fetching API versions.", node);
            } else {
                this.connectionStates.ready(node);
                log.debug("Completed connection to node {}. Ready.", node);
            }
        }
    }

2.6.8 handleInitiateApiVersionRequests 方法

private void handleInitiateApiVersionRequests(long now) {
        Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = nodesNeedingApiVersionsFetch.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
            String node = entry.getKey();
            if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
                log.debug("Initiating API versions fetch from node {}.", node);
                ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
                ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
                doSend(clientRequest, true, now);
                iter.remove();
            }
        }
    }

2.6.9 handleTimedOutRequests 方法

处理InFlightRequests中超时的请求

private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
        List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
        for (String nodeId : nodeIds) {
            // close connection to the node
            this.selector.close(nodeId);
            log.debug("Disconnecting from node {} due to request timeout.", nodeId);
            //和2.6.6 handleDisconnections 方法一样调用 
            processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
        }
    }

2.6.10 completeResponses 方法

完成响应,遍历response 列表一一调用onComplete方法。如果以生产者发送消息为例最终会回调Sender线程中的handleProduceResponse方法。

private void completeResponses(List<ClientResponse> responses) {
        for (ClientResponse response : responses) {
            try {
                response.onComplete();
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }

Kafka的网络层大致就分析完了,采用了 Reactor 模式,一个Acceptor 线程通过给不同的端点放置监听器来获取Connection,通过多个Processor 来处理这些请求并且返回这些请求的响应,多个Handler 线程来真正的执行这些请求以及生成响应,Processor 与 Handler 之间通过RequestChannel 做数据交换。 那么接下来会分析 Handler 线程以及 KafkaApis。


参考: Kafka-2.5.0 源码

Kafka 官网

Kafka Wiki

《Apache Kafka源码剖析》

《深入理解Kafka:核心设计与实践》