kafka__Controller__0">kafka 管理节点 Controller 角色分析
kafka controller 如何管理分区的创建、状态监测、故障切换、内容复制、如何管控分区副本的状态检测故障切换、数据同步、learder 选举?
Kafka Controller 是 Kafka 集群中的一个关键组件,负责管理分区的创建、状态监测、故障切换、内容
复制等任务。它通过 ZooKeeper 协调和管理这些任务,确保 Kafka 集群的高可用性和一致性。以下是 Kafka Controller 管理分区和副本的详细实现原理和示例代码:
1. 管理分区的创建
当创建一个新的 Topic 时,Kafka Controller 会负责分区和副本的分配。
创建 Topic 示例
# 创建 Topic "my_topic",有 3 个分区和 3 个副本
bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3
Controller 处理分区创建的核心代码
// KafkaController.scala
class KafkaController(zooKeeperClient: KafkaZkClient, config: KafkaConfig) {def onPartitionModifications() {val partitions = zkClient.getAllPartitions()partitions.foreach { partition =>val leaderIsrAndControllerEpoch = zkClient.getLeaderAndIsrForPartition(partition)if (leaderIsrAndControllerEpoch.isEmpty) {// 如果没有 Leader,选举新的 LeaderpartitionStateMachine.handleStateChanges(Set(partition), OnlinePartition, Map.empty)}}}
}
2. 分区状态监测
Kafka Controller 通过 ZooKeeper 监控分区的状态,包括分区的 Leader 和 ISR 列表的变化。
分区状态监测的核心代码
class PartitionStateMachine(controller: KafkaController) {def handleStateChanges(partitions: Set[TopicPartition], targetState: PartitionState, assignments: Map[TopicPartition, Seq[Int]