Kafka Controller 模块(一)概述

2020年6月6日 | 作者 Siran | 5400字 | 阅读大约需要11分钟
归档于 消息队列 | 标签 #kafka

概述

在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(KafkaController),它负责管理整个集群中所有分区和副本的状态

  • 当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。
  • 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
  • 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配

1. 控制器的选举及异常恢复

Kafka中的控制器选举工作依赖于ZooKeeper,成功竞选为控制器的broker会在ZooKeeper中创建/controller这个临时(EPHEMERAL)节点此临时节点的内容参考如下:

{"version":"1","brokerid":"0","timestamp":"1529210289997"}
  • version:版本号
  • brokerid:表示成为控制器的broker的id编号
  • timestamp:表示竞选成为控制器时的时间戳。

过程

  • 在任意时刻,集群中有且仅有一个控制器。

  • 每个 broker 启动的时候会去尝试读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其他 broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;

  • 如果 ZooKeeper 中不存在/controller节点,或者这个节点中的数据异常,那么就会尝试去创建/controller节点。

    • 当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker竞选失败。
  • 每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId

  • ZooKeeper 中还有一个与控制器有关的/controller_epoch 节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。

  • controller_epoch:用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。

    • controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1
    • 每个和控制器交互的请求都会携带controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。
    • 如果请求的controller_epoch值大于内存中的controller_epoch值,那么说明已经有新的控制器当选了。由此可见,Kafka 通过 controller_epoch 来保证控制器的唯一性,进而保证相关操作的一致性。

    具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:

    • 监听分区相关的变化:为ZooKeeper中的 /admin/reassign_partitions 节点注册PartitionReassignmentHandler,用来处理分区重分配的动作。为 ZooKeeper 中的 /isr_change_notification 节点注册IsrChangeNotificetionHandler,用来处理ISR集合变更的动作。为ZooKeeper中的 /admin/preferred-replica-election 节点添加 PreferredReplicaElectionHandler,用来处理优先副本的选举动作。

    • 监听主题相关的变化:为 ZooKeeper 中的 /brokers/topics 节点添加 TopicChangeHandler,用来处理主题增减的变化;为 ZooKeeper 中的 /admin/delete_topics 节点添加TopicDeletionHandler,用来处理删除主题的动作。

    • 监听broker相关的变化:为ZooKeeper中的 /brokers/ids 节点添加 BrokerChangeHandler,用来处理broker增减的变化。

    • 从ZooKeeper中读取获取当前所有与主题分区broker有关的信息并进行相应的管理:对所有主题对应的 ZooKeeper 中的 /brokers/topics/<topic> 节点添加PartitionModificationsHandler,用来监听主题中的分区分配变化。

    • 启动并管理分区状态机和副本状态机

    • 更新集群的元数据信息

    • 如果参数 auto.leader.rebalance.enable 设置为 true,则还会开启一个名为“auto-leader-rebalance-task”的定时任务来负责维护分区的优先副本的均衡。

控制器在选举成功之后会读取 ZooKeeper 中各个节点的数据来初始化上下文信息(ControllerContext),并且需要管理这些上下文信息。

比如为某个主题增加了若干分区,控制器在负责创建这些分区的同时要更新上下文信息,并且需要将这些变更信息同步到其他普通的broker 节点中。

不管是监听器触发的事件,还是定时任务触发的事件,或者是其他事件(比如ControlledShutdown)都会读取或更新控制器中的上下文信息,那么这样就会涉及多线程间的同步。

如果单纯使用锁机制来实现,那么整体的性能会大打折扣。

针对这一现象,Kafka 的控制器使用单线程基于事件队列的模型,将每个事件都做一层封装,然后按照事件发生的先后顺序暂存到 LinkedBlockingQueue 中,最后使用一个专用的线程(ControllerEventThread)按照FIFO(First Input First Output,先入先出)的原则顺序处理各个事件,这样不需要锁机制就可以在多线程间维护线程安全

在Kafka的早期版本中,并没有采用Kafka Controller这样一个概念来对分区和副本的状态进行管理,而是依赖于ZooKeeper,每个broker都会在ZooKeeper上为分区和副本注册大量的监听器(Watcher)。当分区或副本状态变化时,会唤醒很多不必要的监听器,这种严重依赖ZooKeeper 的设计会有脑裂、羊群效应,以及造成 ZooKeeper 过载的隐患。在目前的新版本的设计中,只有KafkaController在ZooKeeper上注册相应的监听器,其他的broker极少需要再监听ZooKeeper中的数据变化,这样省去了很多不必要的麻烦。不过每个broker还是会对/controller节点添加监听器,以此来监听此节点的数据变化(ControllerChangeHandler)。

  • 当/controller 节点的数据发生变化时,每个 broker 都会更新自身内存中保存的activeControllerId。如果broker 在数据变更前是控制器,在数据变更后自身的 brokerid 值与新的 activeControllerId 值不一致,那么就需要“退位”,关闭相应的资源,比如关闭状态机、注销相应的监听器等。有可能控制器由于异常而下线,造成/controller 这个临时节点被自动删除;也有可能是其他原因将此节点删除了。

  • 当/controller节点被删除时,每个broker都会进行选举,如果broker在节点被删除前是控制器,那么在选举前还需要有一个“退位”的动作。如果有特殊需要,则可以手动删除/controller 节点来触发新一轮的选举。当然关闭控制器所对应的 broker,以及手动向/controller节点写入新的brokerid的所对应的数据,同样可以触发新一轮的选举。


2.优雅关闭

使用ControlledShutdown的方式关闭Kafka有两个优点:

  • 一是可以让消息完全同步到磁盘上,在服务下次重新上线时不需要进行日志的恢复操作;
  • 二是 ControllerShutdown 在关闭服务之前,会对其上的leader副本进行迁移,这样就可以减少分区的不可用时间。

若要成功执行 ControlledShutdown 动作还需要有一个先决条件,就是参数controlled.shutdown.enable的值需要设置为true,不过这个参数的默认值就为true,

即默认开始此项功能。ControlledShutdown 动作如果执行不成功还会重试执行,这个重试的动作由参数controlled.shutdown.max.retries 配置,默认为 3 次,每次重试的间隔由参数controlled.shutdown.retry.backoff.ms设置,默认为5000ms。

ControlledShutdown的整个执行过程。

假设此时有两个broker,其中待关闭的broker的id为x,Kafka控制器所对应的broker的id为y。

  • 步骤①:待关闭的broker在执行ControlledShutdown动作时首先与Kafka控制器建立专用连接

  • 步骤②:然后发送 ControlledShutdownRequest 请求,ControlledShutdownRequest 请求中只有一个brokerId字段,这个brokerId字段的值设置为自身的brokerId的值,即x

  • 步骤③:Kafka控制器在收到ControlledShutdownRequest请求之后会将与待关闭broker有关联的所有分区进行专门的处理,这里的“有关联”是指分区中有副本位于这个待关闭的broker之上

    • 如果这些分区的副本数大于1且leader副本位于待关闭broker上,那么需要实施leader副本的迁移及新的 ISR 的变更。具体的选举分配的方案由专用的选举器 ControlledShutdown-LeaderSelector提供
    • 如果这些分区的副本数只是大于1,leader副本并不位于待关闭broker上,那么就由Kafka控制器来指导这些副本的关闭。
    • 如果这些分区的副本数只是为 1,那么这个副本的关闭动作会在整个ControlledShutdown动作执行之后由副本管理器来具体实施。
    • 对于分区的副本数大于1且leader副本位于待关闭broker上的这种情况,如果在Kafka控制器处理之后leader副本还没有成功迁移,那么会将这些没有成功迁移leader副本的分区记录下来,并且写入ControlledShutdownResponse 的响应
    • 待关闭的broker在收到ControlledShutdownResponse响应之后,需要判断整个ControlledShutdown动作是否执行成功,以此来进行可能的重试或继续执行接下来的关闭资源的动作。执行成功的标准是ControlledShutdownResponse中error_code字段值为0,并且partitions_remaining数组字段为空。

    注意要点:待关闭的broker同时是Kafka控制器,这也就意味着自己可以给自己发送 ControlledShutdownRequest 请求,以及等待自身的处理并接收ControlledShutdownResponse的响应,具体的执行细节和x!=y的场景相同。


3.分区leader的选举

分区leader副本的选举由控制器负责具体实施。

当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的leader副本下线,此时分区需要选举一个新的leader 上线来对外提供服务)的时候都需要执行 leader 的选举动作,对应的选举策略为OfflinePartitionLeaderElectionStrategy

  • 这种策略的基本思路是按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。
  • 一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。
  • 注意这里是根据AR的顺序而不是ISR的顺序进行选举的。

举个例子,集群中有3个节点:broker0、broker1和broker2,在某一时刻具有3个分区且副本因子为3的主题topic-leader的具体信息如下:

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-leader
Topic:topic-leader   PartitionCount:3 ReplicationFactor:3 Configs:
     Topic: topic-leader Partition: 0 Leader : 1  Replicas: 1,2,0 ISR:2,0,1
     Topic: topic-leader Partition: 1 Leader : 2  Replicas: 2,0,1 ISR:2,0,1
     Topic: topic-leader Partition: 2 Leader : 0  Replicas: 0,1,2 ISR:0,2,1

此时关闭broker0,那么对于分区2而言,存活的AR就变为[1,2],同时ISR变为[2,1]。此时查看主题topic-leader的具体信息(参考如下),分区2的leader就变为了1而不是2。

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-leader
Topic:topic-leader   PartitionCount:3 ReplicationFactor:3 Configs:
     Topic: topic-leader Partition: 0 Leader : 1  Replicas: 1,2,0 ISR:2,1
     Topic: topic-leader Partition: 1 Leader : 2  Replicas: 2,0,1 ISR:2,1
     Topic: topic-leader Partition: 2 Leader : 1  Replicas: 0,1,2 ISR:2,1
  • 如果ISR集合中没有可用的副本,那么此时还要再检查一下所配置的unclean.leader.election.enable参数(默认值为false)。如果这个参数配置为true,那么表示允许从非ISR列表中的选举leader,从AR列表中找到第一个存活的副本即为leader。
  • 当分区进行重分配的时候也需要执行leader的选举动作,对应的选举策略ReassignPartitionLeaderElectionStrategy
    • 这个选举策略的思路比较简单:从重分配的AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中。当发生优先副本的选举时,直接将优先副本设置为leader即可,AR集合中的第一个副本即为优先副本(PreferredReplicaPartitionLeaderElectionStrategy)。
    • 还有一种情况会发生 leader 的选举,当某节点被优雅地关闭(也就是执行ControlledShutdown)时,位于这个节点上的leader副本都会下线,所以与此对应的分区需要执行leader的选举。与此对应的选举策略(ControlledShutdownPartitionLeaderElectionStrategy)为:从AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中,与此同时还要确保这个副本不处于正在被关闭的节点上。