0%

kafka之控制器

控制器

  • controller_epoch:用于记录控制器发生变更次数,即记录当前控制器是第几代控制器,初始值为0,每选出一个新的控制器该字段加一,每个控制器发送请求都会带上这个字段,如果请求的controller_epoch小于内存中的controller_epoch,责任无这个请求是已过期的控制器发送的请求,此请求无效。若该值大于内存中的controller_epoch的值,说明已经有新的控制器当选了。通过该值来保证集群控制器的唯一性,进而保证相关操作的一致性。该字段可以使用zookeeper执行get/controller_epoch查看

  • zkVersion:类似于数据库的乐观锁,用于更新zookeeper路径下相应元数据信息

  • leader_epoch:分区Leader更新次数。controller_epoch是相对代理而言的,而leader_epoch是相对分区来说的,由于各请求到达顺序不同,控制器通过controller_epoch和leader_epoch来确定具体执行哪个命令操作

  • 已分配副本(assigned replica):每个分区的所有副本集合被称为已分配副本(AR)

  • LeaderAndIsr:kafka将Leader对应的brokeId和ISR列表(In-Sync-Replicas 副本同步队列)封装为一个LeaderAndIsr类

  • 优先副本(preferred replica):在AR(Assigned Replicas 所有副本)中,第一个副本称为优先副本。理想情况下,优先副本应该是该分区的Leader,kafka要确保所有主题的优先副本在kafka集群中均衡分布,这样就保证了所有分区的Leader均衡分布。保证Leader在集群中均衡分布很重要,因为所有的读写请求都由分区Leader副本处理,如果Leader过于集中,就会造成集群负载不均衡。为了保证优先副本的均衡分布,kafka提供了5种分区选举器(PartitionLeaderSelector),当分区数发生变化或是分区Leader宕机时就会通过分区选举器及时选出分区新的Leader

在启动kafka集群时,每一个代理都会实例化并启动一个KafkaController,并将该代理的brokerId注册到zookeeper的相应节点中。kafka集群中各代理会根据选举及只选出其中一个代理作为Leader,即Leader控制器。当控制器发生宕机后其他代理再次竞选出新的控制器。控制器负责主题的创建与删除、分区和副本的管理以及代理故障转移处理等。当一个代理被选举称为控制器时,该代理对应的KafkaController就会注册(Register)控制器相应的操作权限,同时标记自己是Leader。当代理不再是控制器时,就要注销掉(DeRegist)相应的权限。

这些功能的入口是在kafka核心core工程下的kafka.controller.KafkaController类

控制器初始化

每个代理在启动时都会实例化并启动一个KafkaController。初始化过程如下

  • 创建ControllerContext实例对象,用于缓存控制器各种处理操作所需要的数据结构(epoch、zkVersion、代理列表、主题列表、各主题对应的分区和副本的AR列表等)

  • 实例化用于维护和管理分区状态的状态机PartitionStateMachine。kafka分区有四种状态

    • NewPartition状态值0,分区被创建后为0,该状态下已得到副本分配,但还没有Leader和ISR信息

    • OnlinePartition状态值1,分区的Leader被选举出来后为1

    • OfflinePratition状态值2, 分区Leader成功选举出来之后,Leader的代理仍处于宕机状态,则该分区转为离线状态

    • NonExistentPartition状态值3,分区可能还没有创建,也可能是曾经创建过但是被删除了

    分区状态机会注册两个监听器(//TODO不过后续版本主题不存储在zookeeper了,处理逻辑变了)

    • TopicChangeListener用于监听/brokers/topics路径下子节点变化,当创建一个主题时,会在该路径下创建一个与该主题相同名称的子节点。主题发生变化时会触发该监听器,handleChildChange()方法会更新ControllerContext中维护的主题列表信息以及各主题对应分区的AR信息
    • DeleteTopicsListener用于监听/admin/delete_topics子节点的变更,当删除一个主题时,会在该路径下创建一个与待删除主体名称相同的子节点,发生变化时触发该监听器,该监听器中的handleChildChange()方法会将待删除的主题从/brokers/topic路径下移除,并将该主题加入到TopicDeletionManager维护的记录待删除主题队列中,交给TopicDeletionManager执行删除
  • 实例化一个对副本状态管理的状态机ReplicaStateMachine,kafka定义了7种状态

    • NewRelica状态值为1,新创建的副本状态,该状态的副本只能接收成为Follower副本的转换请求

    • OnlineReplica状态值为2,副本被启动或者已是分区AR的一部分时,该副本处于在线状态,该状态下的副本可以接受Leader或Follower的转换请求

    • OfflineReplica状态值为3,副本所在代理已宕机,则该副本被设置为离线状态,表示该副本将要从ISR中下线

    • ReplicaDeletionStarted状态值为4,对处于离线状态的副本进行删除操作时,先将副本状态标记为此状态,表示正在进行删除离线副本的操作

    • ReplicaDeletionSucessful状态值为5,删除副本成功,删除请求没有错误码应答

    • ReplicaDeletionInelgible状态值为6,副本删除失败

    • NonExistentReplica状态值为7,副本删除成功

    在副本状态机内有一个BrokerChangeListen监听器,监听zookeeper的brokers/ids/路径下各代理对应的brokeId节点的变化

  • 创建用于将当前代理选举为控制器的ZookeeperLeaderElector选举器对象,有两个回调函数

    • 完成控制器相应初始化操作的onControllerFailover()方法

    • 当新的控制器当选时让先前的控制器注销控制器权限的onControllerResignation()方法

    kafka控制器的选举策略是在zookeeper的/controller路径下创建临时节点,并注册一个LeaderChangeListener,通过该监听器监听该临时节点,当临时节点信息发生变更时,触发监听器。

  • 创建一个独立定时任务KafkaScheduler,用于控制器进行平衡操作,生命周期只有在代理成为Leader控制器期间有效

  • 声明一个对主题操作管理的TopicDeletionManager对象。

  • 创建一个用于在分区状态发生变化时为分区选举出Leader副本的分区选举器PartitionLeaderSelector,kafka提供了5种分区选举器

    • OfflinePartitionLeaderSelector:分区状态机宕机、新创建一个分区或是将一个分区状态由NewPartition状态、OfflinePartition状态转换到OnlinePartition状态会使用该选举器。该选举器的策略是首先判断是否有存活的ISR,若ISR中至少有一个存活的代理,则从ISR列表中选第一个存活的代理作为Leader,存活的ISR作为新的ISR;否则,若配置项unclean.leader.election.enable为true,则表示允许从不在ISR列表中的副本选举Leader,同时AR中若有存活的副本,则从AR列表中选第一个代理作为Leader,存活的AR作为新的ISR。

    • ReassignedPartitionLeaderSelector:当分区进行重分配时会调用该选举器。该选举器的选举策略是从AR列表中找出存活的副本列表,若有存活的副本则去存活副本列表中的第一个副本作为Leader,将当前ISR作为新的ISR,将AR作为接受LeaderAndIsr请求的副本集合。

    • PreferredReplicaPartitionLeaderSelector:该选举器直接将优先副本设置为分区的Leader。

    • ControllerdShutdownLeaderSelector:该选举器将从ISR中剔除已关闭的节点,将剔除后的ISR作为新的ISR,同时从新的ISR中选举第一个作为Leader副本。将AR中剔除已关闭节点的副本节点作为接受LeaderAndIsr请求的副本集合

    • NoOpLeaderSelector:该选举器只返回当前分区的Leader和ISR

  • 实例化ControllerBrokerRequestBatch,用来记录和缓存分区状态机以及副本状态机中handleStateChange()方法中产生的request,控制器将这些request交给ControllerBrokerRequestBatch.sendRequestsToBrokers()方法批量发送出去,交由KafkaApis调用相应的handle方法进行处理

  • 实例化3个监听器,用于监听分区重分配的PartitionsReassignedListener,用于监听当分区变化时触发的PreferredReplicaPartitionLeaderSelector选举器将优先副本选举为Leader的PerferredReplicaElectionListener、用于监听当ISR发生变化时将ISR变化通知给Zookeeper进行更新操作,同时向所有的代理节点发送元数据修改请求的IsrChangeNotificationListener

控制器选举过程

每个代理启动时都会创建一个KafkaController实例,当kafkaController启动后就会从所有代理中选择一个代理作为Leader,称为Leader选举。除了在启动时选举外,当控制器所在代理发生故障或zookeeper通过心跳机制感知控制器与自己的连接Session已过期时,也会再次进行选举。

ZookeeperLeaderElector这个类负责控制器选举(0.11版本,之后换了),ZookeeperLeaderElector启动时首先注册一个LeaderChangeListener,负责监听zookeeper的/controller节点数据变化,该节点存储了当前Leader的brokerId,数据格式为json。当该节点数据发生变化时,比较当前代理的brokeId与当前Leader的brokeId是否相同,若不同,表示当前代理已不是Leader,则回调onControllerResignnation退位,注销Leader的权限,表示当前代理状态设置为RunningAsBroker,同时将代理的leader_epoch和zkVersion设置为0.当该节点数据被删除时,若当前代理是Leader,则先退位,然后再触发选举,否则直接触发选举

控制器选举算法入口为ZooKeeperLeaderElector.select()方法

每个代理先从zookeeper的/controller节点获取Leader信息,解析当前Leader的leaderId,若leaderId等于-1,表示还没有成功选举出Leader,则该代理将封装有自己brokeId的信息请求zookeeper将该数据写入/controller节点。如果leaderId不为-1,则表示已有代理抢先成为了Leader,则停止选举。若写入代理信息成功,则当前代理即为所选出的Leader。

在抢占写/controller节点时若发生非ZkNodeExistsException异常,则会将leaderId设置为-1,同时删除存储在/controller节点的元数据信息,以便让请求最先到达zookeeper的代理成为Leader,由于删除了/controller节点将会触发LeaderChangeListener.handleDataDelete()方法,就会重新选举Leader。同时/controller节点数据变化,会触发LeaderChangeListener.handleDataChange()方法,其他代理通过当前的leaderId与自己的brokerId比较,若在节点数据发生前自己是Leader,现在leaderId与自己的brokeId不同,则自己退位,回调onControllerResignation函数

故障转移

触发控制器进行选举的3种情况:一是控制器启动的时候,二是当控制器发生故障转移的时候,三是当心跳检测超时的时候

控制器故障转移的本质是控制权的转移,重新选出新的控制器。还是ZookeeperLeaderElector对象的两个回调方法

  • onControllerFailover()当选为控制器时注册相应权限
    • 从zookeeper的/controller_epoch路径读取当前控制器的轮值次数,并更新到ControllerContext中
    • 将控制器轮值数加一,并尝试更新/controller_epoch中记录的次数值,若更新失败表示当前控制器已经被其他控制器替代
    • 注册分区管理相关的监听器。用于监听zookeeper下的/admin/reassign_partitions节点引发分区重分配操作的PartitionsReassignedListener;用于监听zookeeper的/isr_change_notification节点用于处理分区ISR发生变化的IsrChangeNotificationListener;用于监听/admin/preferred_replica_election节点将优先副本选为分区Leader操作的PreferredReplicaElectionListner
    • 注册主题管理的监听器。
    • 注册代理变化处理的监听器
    • 初始化ControllerContext,当一个代理称为控制器后,原控制器所持有的ControllerContext会被重新赋值
    • 启动分区状态机和副本状态机
    • 从ControllerContext中读取所有主题,为每个主题添加用于监听分区变化的PartitionModificationsListener
    • 检测当前是否有分区需要触发分区重分配操作
    • 检测当前是否又需要将优先副本选举为Leader的分区
    • 向kafka集群中所有存活的代理发送更新元数据请求
    • 根据auto.leader.reblance.enable配置决定是否创建用于分区平衡的定时任务。
    • 启动TopicDeletionManager组件
  • onControllerResignation()不再是控制器时注销权限的
    • 取消该控制器在zookeeper中注册的用于对分区及副本变化感知的监听器的监听
    • 关闭删除主题操作的TopicDeletionManager组件,并关闭分区平衡的定时任务
    • 在获取ControllerContext维护的重入锁的条件下取消对ISR变化的监听,关闭分区状态机和副本状态机,关闭控制器与其他代理之间进行通信的ControllerChannelManager
    • 将ControllerContext中用于记录控制器轮值次数以及轮值数对应的epochZkVersion字段置零,并将当前代理状态设置为RunningAsBroker

代理上下线

代理上下线操作BrokerChangeListener进行处理

  • 代理上线

    代理上线时,在代理启动时会向Zookeeper的/broker/ids节点下注册该代理的brokerId,此时会被副本状态机在Zookeeper所注册的BrokerChangeListener监听器监听到该节点信息的变化,通过zookeeper中记录的信息以及ControllerContext缓存的节点信息,计算出新上线的节点集合,对新上线的代理节点调用ControllerChangeManager.addBroker()方法完成新上线代理网络层相关初始化处理。然后调用KafkaController.onBrokerStartup()方法进行处理

    • 向集群当前所有代理发送UpdateMetadataRequest请求,使得代理知道有新的代理加入
    • 查找被分配到新上线节点的副本集合,通过副本状态机对副本状态进行相应变迁处理,将这些副本变成OnlineReplica,并通过分区状态机对分区状态为NewPartition和OfflinePartition的分区进行处理,将其状态转至OnlinePartition状态,并触发一次分区Leader选举,以确认新增的代理是否为Leader
    • 轮询被分配到新上线代理的副本,进行分区副本操作
    • 恢复由于新代理上线而被暂停的删除主题操作线程

代理下线

代理下线时,该代理在zookeeper的brokers/ids节点中注册的与该代理对应的节点将被删除,BrokerChangeListener的handleChildChange()方法被触发

  • 查找下线节点集合
  • 轮询下线节点,调用ControllerChannelManager.removeBroker()方法,关闭每个下线节点的网络连接,清空下线节点的消息队列,关闭下线节点发送Request请求的线程
  • 最后调用KafkaController的onBrokerFailure()方法进行处理
    • 查找Leader副本下线节点上的分区,将这些分区状态设置为OfflinePartition,并处理相应状态变迁,调用分区状态机的triggerOnlinePartitionStateChange()方法将处于分区选举器为分区选出Leader,并将Leader和ISR信息写入到zookeeper中,同时发送UpdateMatadataRequest请求更新元数据信息
    • 查找所有下线节点上的副本集合,将该集合分成两部分,一部分是待删除主题副本,将这些副本状态转换为ReplicaDeletionInteligible,标记副本对应的主题暂时不可被删除。另一部分为当前处于正常使用的主题的副本,将副本状态由OnlineReplica转化为OfflineReplica,将该副本节点从分区ISR中删除,并发送StopReplicaRequest请求,停止该副本从Leader副本同步消息的操作,发送LeaderAndIsrRequest请求,该分区Leader副本和Follower副本根据角色不同分别进行相应处理,同时发送UpdateMatadataRequest请求,更新当前所有存活代理的缓存的元数据信息
    • 若分区Leader副本分配在下线节点上的所有分区状态转换操作执行完成,则向集群所有存活的代理发送更新元数据的UpdateMetadataRequest请求,执行元数据更新操作