0%

控制器

Kafka 集群中有一个broker 会被选举为Leader控制器,控制器是kafka集群的中心,它的主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群。一个kafka集群中,除控制器这台Broker之外的其他Broker会根据控制器的指挥来实现相应的功能

这个Leader控制器是整个集群的,要与分区的Leader区分开

  • 控制器负责管理kafka分区的状态
  • 管理每个分区的副本状态
  • 监听Zookeeper中数据的变化并作出相应的反馈
  • 主题的创建与删除
  • 代理故障转移等功能

只有Controller在zookeeper上注册相应的监听器,其他的broker尽量不要监听zookeeper的数据变化,降低zookeeper的负载,也减少了zookeeper数据变化导致的羊群效应

所有的Broker都会监听控制器Leader的状态,负责管理集群 broker 的上下线,所有topic 的分区副本分配和leader 选举等工作。
Controller 的管理工作都是依赖于 Zookeeper的

  • controller_epoch:用于记录控制器发生变更次数,即记录当前控制器是第几代控制器,初始值为0,每选出一个新的控制器该字段加一,每个向控制器发送的请求都会带上这个字段,如果请求的controller_epoch小于内存中的controller_epoch,则认为这个请求是向已过期的控制器发送的请求,此请求无效。若该值大于内存中的controller_epoch的值,说明已经有新的控制器当选了。通过该值来保证集群控制器的唯一性,进而保证相关操作的一致性。该字段可以使用zookeeper执行get/controller_epoch查看

  • zkVersion:类似于数据库的乐观锁,用于更新zookeeper路径下相应元数据信息

  • leader_epoch:分区Leader更新次数。controller_epoch是相对代理而言的,而leader_epoch是相对分区来说的,由于各请求到达顺序不同,控制器通过controller_epoch和leader_epoch来确定具体执行哪个命令操作

  • 已分配副本(assigned replica):每个分区的所有副本集合被称为已分配副本(AR)

  • LeaderAndIsr:kafka将Leader对应的brokeId和ISR列表(In-Sync-Replicas 副本同步队列)封装为一个LeaderAndIsr类

  • 优先副本(preferred replica):在AR中,第一个副本称为优先副本。理想情况下,优先副本应该是该分区的Leader,kafka要确保所有主题的优先副本在kafka集群中均衡分布,这样就保证了所有分区的Leader均衡分布。保证Leader在集群中均衡分布很重要,因为所有的读写请求都由分区Leader副本处理,如果Leader过于集中,就会造成集群负载不均衡。为了保证优先副本的均衡分布,kafka提供了5种分区选举器(PartitionLeaderSelector),当分区数发生变化或是分区Leader宕机时就会通过分区选举器及时选出分区新的Leader

在启动kafka集群时,每一个代理都会实例化并启动一个KafkaController,并将该代理的brokerId注册到zookeeper的相应节点中。kafka集群中各代理会根据选举及只选出其中一个代理作为Leader,即Leader控制器。当控制器发生宕机后其他代理再次竞选出新的控制器。控制器负责主题的创建与删除、分区和副本的管理以及代理故障转移处理等。当一个代理被选举成为控制器时,该代理对应的KafkaController就会注册(Register)控制器相应的操作权限,同时标记自己是Leader。当代理不再是控制器时,就要注销掉(DeRegist)相应的权限。

这些功能的入口是在kafka核心core工程下的kafka.controller.KafkaController

控制器初始化

每个代理在启动时都会实例化并启动一个KafkaController。初始化过程如下

  • 创建ControllerContext实例对象,用于缓存控制器各种处理操作所需要的数据结构(epoch、zkVersion、代理列表、主题列表、各主题对应的分区和副本的AR列表等)

  • 实例化用于维护和管理分区状态的状态机PartitionStateMachine。kafka分区有四种状态

    • NewPartition状态值0,分区被创建后为0,该状态下已得到副本分配,但还没有Leader和ISR信息

    • OnlinePartition状态值1,分区的Leader被选举出来后为1

    • OfflinePratition状态值2, 分区Leader成功选举出来之后,Leader的代理仍处于宕机状态,则该分区转为离线状态

    • NonExistentPartition状态值3,分区可能还没有创建,也可能是曾经创建过但是被删除了

  • 实例化一个对副本状态管理的状态机ReplicaStateMachine,kafka定义了7种状态

    • NewRelica状态值为1,新创建的副本状态,该状态的副本只能接收成为Follower副本的转换请求

    • OnlineReplica状态值为2,副本被启动或者已是分区AR的一部分时,该副本处于在线状态,该状态下的副本可以接受Leader或Follower的转换请求

    • OfflineReplica状态值为3,副本所在代理已宕机,则该副本被设置为离线状态,表示该副本将要从ISR中下线

    • ReplicaDeletionStarted状态值为4,对处于离线状态的副本进行删除操作时,先将副本状态标记为此状态,表示正在进行删除离线副本的操作

    • ReplicaDeletionSucessful状态值为5,删除副本成功,删除请求没有错误码应答

    • ReplicaDeletionInelgible状态值为6,副本删除失败

    • NonExistentReplica状态值为7,不存在的副本

  • 创建几个处理器ControllerChangeHandler监听/controller、BrokerChangeHandler监听/brokers/ids子节点改变、BrokerModificationsHandler监听/brokers/ids/id数据改变、TopicChangeHandler监听/brokers/topics子节点改变、TopicDeletionHandler监听/admin/delete_topics子节点改变、PartitionModificationsHandler监听/brokers/topics/topic数据改变、PartitionReassignmentHandler监听/admin/reassign_partitions创建、preferredReplicaElectionHandler监听/admin/preferred_replica_election创建、IsrChangeNotificationHandler监听/isr_change_notification子节点变化、logDirEventNotificationHandler监听/log_dir_event_notification子节点改变

    监听/brokers/ids节点,kafka在启动时会在zookeeper中的/brokers/ids路径下注册一个与当前broker的id相同的临时节点,当有broker加入或退出集群时,会就触发Watcher机制

  • 创建一个独立定时任务KafkaScheduler,用于控制器进行平衡操作,生命周期只有在代理成为Leader控制器期间有效

  • 声明一个对主题操作管理的TopicDeletionManager对象。

  • 实例化ControllerBrokerRequestBatch,用来记录和缓存分区状态机以及副本状态机中handleStateChange()方法中产生的request,控制器将这些request交给ControllerBrokerRequestBatch.sendRequestsToBrokers()方法批量发送出去,交由KafkaApis调用相应的handle方法进行处理

控制器选举过程

每个代理启动时都会创建一个KafkaController实例,并将该代理的brokerId注册到zookeeper的相应节点中。当kafkaController启动后就会从所有代理中选择一个代理作为Leader,称为Leader选举。除了在启动时选举外,当控制器所在代理发生故障或zookeeper通过心跳机制感知控制器与自己的连接Session已过期时,也会再次进行选举。

kafka集群中第一个启动的broker通过在zookeeper里创建一个临时节点/controller让自己成为controller控制器。其他broker在启动时也会尝试创建这个节点,但是由于节点已存在,所以会返回节点已存在异常,然后其他broker会在控制器上注册一个Zookeeper的watch对象,当/controller节点发生变化时,其他broker就会收到节点变更的通知来尝试让自己成为新的控制器。

1
2
3
4
5
6
7
8
9
10
case object Startup extends ControllerEvent {

def state = ControllerState.ControllerChange

override def process(): Unit = {
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
}

}

每个代理先从zookeeper的/controller节点获取Leader信息,解析当前Leader的leaderId,若leaderId等于-1,表示还没有成功选举出Leader,则该代理将封装有自己brokeId的信息请求zookeeper将该数据写入/controller节点。如果leaderId不为-1,则表示已有代理抢先成为了Leader,则停止选举。若写入代理信息成功,则当前代理即为所选出的Leader。

1
val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)

故障转移

触发控制器进行选举的3种情况:一是控制器启动的时候,二是当控制器发生故障转移的时候,三是当心跳检测超时的时候

控制器故障转移的本质是控制权的转移,重新选出新的控制器。

代理上下线

代理上下线操作BrokerChangeHandler进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def process(): Unit = {
if (!isActive) return
val curBrokers = zkClient.getAllBrokersInCluster.toSet
val curBrokerIds = curBrokers.map(_.id)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
controllerContext.liveBrokers = curBrokers
val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " +
s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, all live brokers: ${liveBrokerIdsSorted.mkString(",")}")

newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
if (newBrokerIds.nonEmpty) // 上线
onBrokerStartup(newBrokerIdsSorted)
if (deadBrokerIds.nonEmpty) // 下线
onBrokerFailure(deadBrokerIdsSorted)
}

代理上线

代理上线时,在代理启动时会向Zookeeper的/broker/ids节点下注册该代理的brokerId,此时会被副本状态机在Zookeeper所注册的BrokerChangeHandler监听器监听到该节点信息的变化,

1
2
if (newBrokerIds.nonEmpty) // 上线
onBrokerStartup(newBrokerIdsSorted)

通过zookeeper中记录的信息以及ControllerContext缓存的节点信息,计算出新上线的节点集合,对新上线的代理节点调用ControllerChangeManager.addBroker()方法完成新上线代理网络层相关初始化处理。然后调用KafkaController.onBrokerStartup()方法进行处理

  • 向集群当前所有代理发送UpdateMetadataRequest请求,使得代理知道有新的代理加入
  • 查找被分配到新上线节点的副本集合,通过副本状态机ReplicaStateMachine对副本状态进行相应变迁处理,将这些副本变成OnlineReplica,并通过分区状态机PartitionStateMachine对分区状态为NewPartition和OfflinePartition的分区进行处理,将其状态转至OnlinePartition状态,并触发一次分区Leader选举,以确认新增的代理是否为Leader
  • 轮询被分配到新上线代理的副本,进行分区副本操作
  • 恢复由于新代理上线而被暂停的删除主题操作线程

代理下线

代理下线时,该代理在zookeeper的brokers/ids节点中注册的与该代理对应的节点将被删除,同样触发BrokerChangeHandler的handleChildChange()方法

1
2
if (deadBrokerIds.nonEmpty) // 下线
onBrokerFailure(deadBrokerIdsSorted)
  • 查找下线节点集合
  • 轮询下线节点
  • 查找Leader副本下线节点上的分区,将这些分区状态设置为OfflinePartition,并处理相应状态变迁,调用分区状态机PartitionStateMachine的triggerOnlinePartitionStateChange()方法将处于分区选举器为分区选出Leader,并将Leader和ISR信息写入到zookeeper中,同时发送UpdateMatadataRequest请求更新元数据信息
分区副本Leader选举

controller发现分区的Leader挂掉后,会从分区的replicas副本列表中取出第一个broker作为leader,再看该leader是否在isr列表中,如果在,则选为leader;否则继续取出第二个来比较

主题管理

TopicChangeHandler监听/brokers/topics子节点改变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
override def process(): Unit = {
if (!isActive) return
val topics = zkClient.getAllTopicsInCluster.toSet
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
// 向新创建的主题注册PartitionModificationsHandler分区变化的监听器
registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
deletedTopics.foreach(controllerContext.removeTopic)
addedPartitionReplicaAssignment.foreach {
case (topicAndPartition, newReplicas) => controllerContext.updatePartitionReplicaAssignment(topicAndPartition, newReplicas)
}
info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
s"[$addedPartitionReplicaAssignment]")
if (addedPartitionReplicaAssignment.nonEmpty)
onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}
}

分区管理

PartitionReassignmentHandler监听/admin/reassign_partitions创建,进行分区重分配操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
case object PartitionReassignment extends ControllerEvent {
override def state: ControllerState = ControllerState.PartitionReassignment

override def process(): Unit = {
if (!isActive) return

// We need to register the watcher if the path doesn't exist in order to detect future reassignments and we get
// the `path exists` check for free
if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
val partitionReassignment = zkClient.getPartitionReassignment

// Populate `partitionsBeingReassigned` with all partitions being reassigned before invoking
// `maybeTriggerPartitionReassignment` (see method documentation for the reason)
partitionReassignment.foreach { case (tp, newReplicas) =>
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(KafkaController.this, eventManager,
tp)
controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))
}

maybeTriggerPartitionReassignment(partitionReassignment.keySet)
}
}
}

欢迎关注我的其它发布渠道