Kafka-Controller选举

server/2024/11/17 23:32:56/

一、上下文

《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧

二、设置ZooKeeper

‌ZooKeeper是一个开源的分布式协调服务,主要用于分布式系统中各节点的协调和管理。Kafka的Controller选举也一样用到了它。

  override def startup(): Unit = {//....initZkClient(time)configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))//....}private def initZkClient(time: Time): Unit = {//config.zkConnect//zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafkainfo(s"Connecting to zookeeper on ${config.zkConnect}")_zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)//如果需要,在ZK中预先创建顶级路径。_zkClient.createTopLevelPaths()}def createTopLevelPaths(): Unit = {//创建 Persistent 持久化的 zk 路径ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)}//确保ZK中存在持久路径def makeSurePersistentPathExists(path: String): Unit = {createRecursive(path, data = null, throwIfPathExists = false)}

1、KafkaZkClient

KafkaZkClient是在Kafka.zookeeper.ZooKeeperClient之上提供更高级别的Kafka特定操作。

实现说明:此类包括各种组件(Controller, Configs, Old Consumer等)的方法,在某些情况下会从调用包中返回类的实例。

  def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = {//...KafkaZkClient(...)}

2、AdminZkClient

它提供与ZooKeeper交互的管理员相关方法。

class AdminZkClient(...){//创建topicdef createTopic(...){...}//获取broker元数据def getBrokerMetadatas(...){...}//创建主题并可选地验证其参数。请注意,TopicCommand也使用此方法。def createTopicWithAssignment(...){...}//验证主题创建参数def validateTopicCreate(...){...}//删除topic //为给定主题创建删除路径def deleteTopic(...){...}//使用可选的副本分配向现有主题添加分区。请注意,TopicCommand使用此方法。def addPartitions(...){...}//将broker从实体名称解析为整数iddef parseBroker(...){...}//.....
}

3、ZkConfigRepository

zookeeper的配置仓库,也就是kafka在zookeeper中配置信息。

class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {override def config(configResource: ConfigResource): Properties = {//....//从zookeeper的目录下读取数据,并封装成实体(topic、broker、client-id、user、user/clients/client-id、ip)adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)}
}

4、ZkData

object ZkData {//这些是kafka broker 启动时应该存在的持久ZK路径。val PersistentZkPaths: Seq[String] = Seq(ConsumerPathZNode.path, // old consumer pathBrokerIdsZNode.path,TopicsZNode.path,ConfigEntityChangeNotificationZNode.path,DeleteTopicsZNode.path,BrokerSequenceIdZNode.path,IsrChangeNotificationZNode.path,ProducerIdBlockZNode.path,LogDirEventNotificationZNode.path) ++ ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
}//旧的consumer在zk上的路径
object ConsumerPathZNode {def path = "/consumers"
}object BrokerIdsZNode {def path = s"${BrokersZNode.path}/ids"def encode: Array[Byte] = null
}object TopicsZNode {def path = s"${BrokersZNode.path}/topics"
}object ConfigEntityChangeNotificationZNode {def path = s"${ConfigZNode.path}/changes"
}object DeleteTopicsZNode {def path = s"${AdminZNode.path}/delete_topics"
}object BrokerSequenceIdZNode {def path = s"${BrokersZNode.path}/seqid"
}object IsrChangeNotificationZNode {def path = "/isr_change_notification"
}object ProducerIdBlockZNode {val CurrentVersion: Long = 1Ldef path = "/latest_producer_id_block"def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {Json.encodeAsBytes(Map("version" -> CurrentVersion,"broker" -> producerIdBlock.assignedBrokerId,"block_start" -> producerIdBlock.firstProducerId.toString,"block_end" -> producerIdBlock.lastProducerId.toString).asJava)}object LogDirEventNotificationZNode {def path = "/log_dir_event_notification"
}

三、验证元数据属性集成是否有效

1、meta.properties文件是否始终设置了cluster.id

2、meta.properties文件是否始终设置了node.id或者 broker.id

initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)

四、动态broker初始化

动态broker配置存储在ZooKeeper中,可以在两个级别定义:

1、每个代理的配置持久化在/configs/brokers/{brokerId} 这些可以使用AdminClient使用资源名称brokerId进行描述/更改

2、整个集群的默认值持续存在于/configs/brokers/<default> 这些可以使用AdminClient使用空资源名称进行描述/更改。

broker配置的优先级顺序为:

1、DYNAMIC_BROKER_CONFIG:存储在ZK中的/configs/brokers/{brokerId}

2、DYNAMIC_DEFAULT_BROKER_CONFIG: 存储在ZK中的//configs/brokers/<default>

3、STATIC_BROKER_CONFIG:启动代理时使用的属性,通常来自server.properties文件

4、DEFAULT_CONFIG:KafkaConfig中定义的默认配置

config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None)

五、启动KafkaController

_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()

1、KafkaController结构

class KafkaController(...){//事件管理private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)//如果brokerid = 当前的controllerid 那么就返回truedef isActive: Boolean = activeControllerId == config.brokerId@volatile private var brokerInfo = initialBrokerInfo@volatile private var _brokerEpoch = initialBrokerEpoch//启动def startup(): Unit = {zkClient.registerStateChangeHandler(new StateChangeHandler {//ControllerHandler = "controller-state-change-handler"override val name: String = StateChangeHandlers.ControllerHandleroverride def afterInitializingSession(): Unit = {eventManager.put(RegisterBrokerAndReelect)}override def beforeInitializingSession(): Unit = {val queuedEvent = eventManager.clearAndPut(Expire)//阻止新会话的初始化,直到处理过期事件,这确保在创建新会话之前已处理所有挂起的事件queuedEvent.awaitProcessing()}})eventManager.put(Startup)eventManager.start()}override def process(event: ControllerEvent): Unit = {event match {//.....case RegisterBrokerAndReelect =>processRegisterBrokerAndReelect()case Startup =>processStartup()//.....}}
}

1、ControllerEventManager结构

class ControllerEventManager(...){//用串行化队列代替锁private val queue = new LinkedBlockingQueue[QueuedEvent]//ControllerEventThreadName = "controller-event-thread"private[controller] var thread = new ControllerEventThread(ControllerEventThreadName)def start(): Unit = thread.start()def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {val queuedEvent = new QueuedEvent(event, time.milliseconds())queue.put(queuedEvent)queuedEvent}class ControllerEventThread(name: String)  extends ShutdownableThread(...){override def doWork(): Unit = {//从队列获取事件,主要是controller相关的事件val dequeued = pollFromEventQueue()dequeued.event match {case controllerEvent =>def process(): Unit = dequeued.process(processor)}}}}
}

ControllerEventManager中的ControllerEventThread的父类是ShutdownableThread,它里面有真正的run()且调起了doWork(),doWork()又调起了process(),因此真正执行的是process()

public abstract class ShutdownableThread extends Thread {public abstract void doWork();public void run() {while (isRunning())doWork();}
}

这是一个死循环,也就是后面只要往队列中添加事件,会自动执行对应方法。从KafkaController的startup()中我们知道放了两个事件:RegisterBrokerAndReelect和Startup,下面我们来看看它们里面做了什么

2、RegisterBrokerAndReelect事件处理

  private def processRegisterBrokerAndReelect(): Unit = {_brokerEpoch = zkClient.registerBroker(brokerInfo)processReelect()}

1、向zookeeper注册broker

class KafkaZkClient private[zk] (...{def registerBroker(brokerInfo: BrokerInfo): Long = {//brokers/ids/brokeridval path = brokerInfo.path//创建 对应的 brokerid 的 临时znode节点,说明:当该brokers挂掉后会随之消失val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")//返回czxid (broker epoch)stat.getCzxid}}

2、开始选举

class KafkaController(...){private def processReelect(): Unit = {maybeResign()elect()}private def maybeResign(): Unit = {val wasActiveBeforeChange = isActive//在zk上注册节点改变事件,当controller改变时触发,为下面的选举做铺垫zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)activeControllerId = zkClient.getControllerId.getOrElse(-1)if (wasActiveBeforeChange && !isActive) {//当当前broker辞去controller职务时触发onControllerResignation()}}private def elect(): Unit = {//获取 活动状态 contoller ,如果 集群已经启动了很长时间,新增了一台broker,那么此时会获得 当下的controller ,//如果此时集群刚刚启动,那么此时没有 活动状态的 controller ,返回的结果就是  -1activeControllerId = zkClient.getControllerId.getOrElse(-1)/** 我们可以在初始启动和handleDeleted ZK回调期间到达这里。由于潜在的竞争条件,当我们到达这里时,控制器可能已经被选中了。如果此代理已经是控制器,则此检查将防止以下createEphemeralPath方法进入无限循环。*/if (activeControllerId != -1) {//如果当下已经有 activeControllerId 那么就停止选举 ,否则继续往下走//Broker $activeControllerId 已被选为控制器,因此停止选举过程debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")return}//try中会发生如下情况//1、正常运行:当选controller//2、异常://    1、ControllerMovedException //       1、其他broker成功当选controller//       2、controller已经当选,但刚刚离职,需要重新选举//    2、Throwable  该节点当选controller,但是就职时出错了。删除该controller,重新选举//       try {val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)controllerContext.epoch = epochcontrollerContext.epochZkVersion = epochZkVersionactiveControllerId = config.brokerId//${config.brokerId}已成功当选为控制器。Epoch增加到${controllerContext.eepoch},Epoch zk版本现在是${controller Context.eepoch ZkVersion}”info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +s"and epoch zk version is now ${controllerContext.epochZkVersion}")//成功当选controller,并开始履行作为该角色的责任onControllerFailover()} catch {case e: ControllerMovedException =>//重新开始监听目录变化maybeResign()if (activeControllerId != -1)debug(s"代理$activeControllerId被选为控制器,而不是代理${config.brokerId}", e)elsewarn("管制员已经当选,但刚刚辞职,这将导致另一轮选举", e)case t: Throwable =>error(s"在代理${config.brokerId}上选择或成为控制器时出错。立即触发控制器移动", t)triggerControllerMove()}}}

在选举前zkClient注册的 controllerChangeHandler 事件其实就是观察 controller目录的变化

class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {//controller目录override val path: String = ControllerZNode.pathoverride def handleCreation(): Unit = eventManager.put(ControllerChange)override def handleDeletion(): Unit = eventManager.put(Reelect)override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}

六、总结

1、设置zookeeper,如:zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka并创建持久化目录:consumers、brokers/ids、brokers/topics、config/changes、admin/delete_topics、brokers/seqid、isr_change_notification、latest_producer_id_block、log_dir_event_notification

2、验证元数据属性集成是否有效,主要时看每个broker是否有了唯一的id

3、将每个broker的id注册到zookeeper

4、启动KafkaController

5、启动ControllerEventThread线程并不断消费LinkedBlockingQueue中事件

6、向队列注册RegisterBrokerAndReelect事件、Startup事件

7、首先处理RegisterBrokerAndReelect事件

8、向zookeeper注册broker,并建立临时znode

9、注册controllerChangeHandler 事件其实就是观察 controller目录的变化

10、每个broker开始向zookeeper将自己注册为controller

11、正常情况下只有一个broker成功注册成功,其他broker抛出ControllerMovedException继续监控controller目录的变化

12、如果选举controller成功,但是在就职时失败会里面进行卸任工作,并进行新一轮选举


http://www.ppmy.cn/server/142445.html

相关文章

机器学习-4:机器学习的建模流程

机器学习的建模流程 流程为&#xff1a; 原始数据 --> 数据预处理 --> 特征工程 --> 建模 --> 验证。 原始数据收集 所有AI或机器学习的基础就是数据&#xff0c;没有数据就什么都做不了&#xff0c;在搭建一个系统之前首要考虑的就是有没有足够多的数据可以支撑这…

VMware 17虚拟Ubuntu 22.04设置共享目录

VMware 17虚拟Ubuntu 22.04设置共享目录 共享文件夹挂载命令&#xff01;&#xff01;&#xff01;<font colorred>配置启动自动挂载Chapter1 VMware 17虚拟Ubuntu 22.04设置共享目录一、卸载老版本二、安装open-vm-tools<font colorred>三、配置启动自动挂载四、添…

【计网不挂科】计算机网络第一章< 概述 >习题库(含答案)

前言 大家好吖&#xff0c;欢迎来到 YY 滴计算机网络 系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C的老铁 本博客主要内容&#xff0c;收纳了一部门基本的计算机网络题目&#xff0c;供yy应对期中考试复习。大家可以参考 本章为分章节的习题内容题库&#x…

【青牛科技】视频监控器应用

1、简介&#xff1a; 我司安防产品广泛应用在视频监控器上&#xff0c;产品具有性能优良&#xff0c;可 靠性高等特点。 2、图示&#xff1a; 实物图如下&#xff1a; 3、具体应用&#xff1a; 标题&#xff1a;视频监控器应用 简介&#xff1a;视频监控器工作原理是光&#x…

Spring Boot编程训练系统:架构设计与实现技巧

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理编程训练系统的相关信息成为必然。开发合适…

【SQL】E-R模型(实体-联系模型)

目录 一、介绍 1、实体集 定义和性质 属性 E-R图表示 2. 联系集 定义和性质 属性 E-R图表示 一、介绍 实体-联系数据模型&#xff08;E-R数据模型&#xff09;被开发来方便数据库的设计&#xff0c;它是通过允许定义代表数据库全局逻辑结构的企业模式&#xf…

并发编程(9)——Actor/CSP设计模式

文章目录 九、day91. Actor设计模式2. CSP设计模式2.1 使用C实现CSP2.2 通过CSP实现ATM取款逻辑 九、day9 在并发编程中&#xff0c;多个线程可能需要同时访问相同的内存资源。为了防止不同线程之间的资源冲突&#xff0c;传统并发设计方法通常使用共享内存和加锁机制来确保线…

RHEL/CENTOS 7 ORACLE 19C-RAC安装(纯命令版)

一 首先需要安装两个CENTOS 7虚拟机(此处省略)。 由于我们是要安装ORCLE-RAC双节点集群所以至少每个CENTOS虚拟机上需要两块网卡&#xff0c;并且两块网卡都是HOST-ONLY具体步骤请看视频一《为虚拟机添加网卡》 这里大家需要注意的是&#xff0c;我们需要绑定两台机器的IP一共…