集群成员关系
Kafka 使用 Zookeeper 来维护集群成员的信息。每个 broker 都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。
在 broker 启动的时候,它通过创建临时节点把自己的 ID 注册到 Zookeeper。
Kafka 组件订阅 Zookeeper 的 **/brokers/ids **路径(broker 在 Zookeeper 上的注册路径),当有 broker 加入集群或退出集群时,这些组件就可以获得通知。
如果你要启动另一个具有相同 ID 的 broker,会得到一个错误——新 broker 会试着进行注册,但不会成功,因为 Zookeeper里已经有一个具有相同 ID 的 broker。
在 broker 停机、出现网络分区或长时间垃圾回收停顿时,broker 会从 Zookeeper 上断开连接,此时 broker 在启动时创建的临时节点会自动从 Zookeeper 上移除。监听 broker 列表的Kafka 组件会被告知该 broker 已移除。
在关闭 broker 时,它对应的节点也会消失,不过它的 ID 会继续存在于其他数据结构中。
例如,主题的副本列表(下面会介绍)里就可能包含这些 ID。
在完全关闭一个 broker 之后,如果使用相同的 ID 启动另一个全新的 broker,它会立即加入集群,并拥有与旧 broker相同的分区和主题。
控制器
控制器其实就是一个broker,只不过除了具有普通broker的功能之外,还会负责选举分区Leader(后面讨论)。
集群里第一个启动的 broker 通过在 Zookeeper 里创建一个临时节点** /controller** 让自己成为控制器。
其他 broker 在启动时也 会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然后“意识”到控制器节点已存在,也就是说集群里已经有一个控制器了。
其他 broker 在控制器节点上创建 Zookeeper watch 对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群里一次只有一个控制器存在。
如果控制器被关闭或者与 Zookeeper 断开连接,Zookeeper 上的临时节点就会消失。集群里的其他 broker 通过 watch 对象得到控制器节点消失的通知,它们会尝试让自己成为新的控制器。
第一个在 Zookeeper 里成功创建控制器节点的 broker 就会成为新的控制器,其他节点会收到“节点已存在”的异常,然后在新的控制器节点上再次创建 watch 对象。
每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他 broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧 epoch 的消息,就会忽略它们。
当控制器发现一个 broker 已经离开集群(通过观察相关的 Zookeeper 路径),它会知道,那些失去Leader的分区需要一个新Leader(这些分区的Leader刚好是在这个 broker 上)。
控制器遍历这些分区,并确定谁应该成为新Leader(简单来说就是分区副本列表里的下一个副本),
然后向所有包含新Leader或现有跟随者的 broker 发送请求。该请求消息包含了谁是新Leader以及谁是分区跟随者的信息。
随后,新Leader开始处理来自生产者和消费者的请求,而跟随者开始从新Leader那里复制消息。
当控制器发现一个 broker 加入集群时,它会使用 broker ID 来检查新加入的 broker 是否包含现有分区的副本。
如果有,控制器就把变更通知发送给新加入的 broker 和其他 broker, 新 broker 上的副本开始从Leader那里复制消息。
简而言之,Kafka 使用 Zookeeper 的临时节点来选举控制器,并在节点加入集群或退出集群时通知控制器。
控制器负责在节点加入或离开集群时进行分区Leader选举。控制器使用epoch 来避免“脑裂”。
脑裂”是指两个节点同时认为自己是当前的控制器。
KRaft: 基于Raft的新的Kafka控制器
从 2019 年开始,Apache Kafka 社区开始了一个重大的项目:从基于 ZooKeeper 的控制器转移到基于 Raft quorum的控制器。新控制器的预览版名为KRaft,是Apache Kafka 2.8版本的一部分。
计划于 2021 年年中发布的 Apache Kafka 3.0 版本将包括 KRaft 的第一个生产版本,Kafka 集群将能够与传统的基于 ZooKeeper 的控制器或 KRaft 一起运行
为什么 Kafka 社区决定更换控制器?
Kafka 现有的控制器已经进行了多次重写,但尽管改进了使用 ZooKeeper 存储主题、分区和副本信息的方式,但很明显,现有模型无法扩展到我们希望 Kafka 支持的分区数量。
下面几个已知的问题促使了这一变化:
- 元数据更新是同步写给Zookeeper的,但会异步发送给Broker. 同时从 ZooKeeper 接收更新是异步的.
所有这些都会导致Broker、控制器和 ZooKeeper 之间的元数据不一致的边界case。且这些case很难检测到。
- 每当控制器重新启动时,它必须从ZooKeeper读取所有Broker和分区的所有元数据,然后将这些元数据发送给所有Broker。
尽管经过多年优化,这仍然是一个主要瓶颈——随着分区和代理数量的增加,重启控制器变得越来越慢。
- 围绕元数据所有权的内部架构不是很好——一些操作是通过控制器完成的,其他通过任何代理完成的,还有一些直接在 ZooKeeper 上完成的。
- ZooKeeper 是一个独立的分布式系统,就像 Kafka 一样,它需要一些专业知识才能操作。因此,想要使用Kafka 的开发人员需要学习两种分布式系统,而不仅仅是一种。
基于上面这些问题,Apache Kafka社区选择替换现有的基于ZooKeeper的控制器。
在现有架构中,ZooKeeper 有两个重要功能:用于选举控制器和存储集群元数据——注册的代理、配置、主题、分区和副本。此外,控制器本身管理元数据——它用于选举领导者、创建和删除主题以及重新分配副本。所有这些功能都必须在新的控制器中被取代。
KRaft的理念
在新的架构中,控制器节点是一个管理元数据事件日志的Raft quorum。该日志包含了对集群元数据的每一次改变的信息。 当前存储在 ZooKeeper 中的所有内容,例如主题、分区、ISR、配置等,都将存储在此日志中。
使用Raft算法,控制器节点可以自己从它们中间选出一个领导者,而不依赖任何外部系统。
元数据日志的负责人称为Active Controller。Active Controller处理所有来自brokers的RPC。追随者控制器去复制active controller中最新数据,并且如果Active Controller退出,则将其用作热备份。
因为现在所有的控制器都会跟踪最新的状态,控制器的故障切换将不需要一个漫长的重新加载期(即将所有的状态转移到新的控制器上)。
控制器不再将更新推送给其他Broker,相反这些代理将通过新的 MetadataFetch API 从Active Controller获取更新的数据。
与获取请求类似,Broker将跟踪他们获取的最新元数据更改的偏移量,并且只会从控制器请求较新的更新。
Broker会将元数据持久保存到磁盘,这将使它们能够快速启动,即使有数百万个分区。
Broker将在控制器的法定人数中注册,并将保持注册,直到被管理员取消注册,因此,一旦Broker关闭,它就会下线,但仍然注册。
已上线但未更新最新元数据的Broker将被fenced,无法为客户请求提供服务。
新的fenced状态将防止出现这样的情况:客户向不再是领导者的Broker发送事件,但由于自己太过落后而没有意识到它不是领导者。
作为迁移到controller quorum的一部分,以前涉及直接与 ZooKeeper 通信的客户端或broker的所有操作都将通过控制器进行路由
因此可以通过替换控制器进行无缝迁移,而无需更改任何broker上的任何内容。
复制
复制功能是 Kafka 架构的核心。事实上,Kafka 把自己描述成“一个分布式的、可分区的、可复制的提交日志服务”。
复制之所以这么关键,是因为它可以在个别节点失效时仍能保证 Kafka 的可用性和持久性。
副本分类
Kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。
那些副本被保存在 broker 上,每个 broker 可以保存成百上千个属于不同主题和分区的副本。
副本有以下两种类型:
- Leader副本:每个分区都有一个Leader副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。
- 跟随者副本:leader以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从leader那里复制消息,保持与leader一致的状态。如果leader发生崩溃,其中的一个跟随者会被提升为新leader。
副本间的通信与交互
Leader的另一个任务是搞清楚哪个跟随者的状态与自己是一致的。
跟随者为了保持与Leader的状态一致,在有新消息到达时尝试从Leader那里复制消息,不过有各种原因会导致同步失败。例如,网络拥塞导致复制变慢,broker 发生崩溃导致复制滞后,直到重启 broker 后复制才会继续。
为了与Leader保持同步,跟随者向Leader发送获取数据的请求,这种请求与消费者为了读取消息而发送的请求是一样的。Leader将响应消息发给跟随者。请求消息里包含了跟随者想要获取消息的偏移量,而且这些偏移量总是有序的。
一个跟随者副本先请求消息 1,接着请求消息 2,然后请求消息 3,在收到这 3 个请求的响应之前,它是不会发送第 4 个请求消息的。如果跟随者发送了请求消息 4,那么Leader就知道它已经收到了前面 3 个请求的响应。
通过查看每个跟随者请求的最新偏移量,Leader就会知道每个跟随者复制的进度。
如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消息,但在10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本无法与Leader保持一致,在Leader发生失效时,它就不可能成为新Leader——毕竟它没有包含全部的消息。
相反,持续请求得到的最新消息副本被称为同步的副本。在Leader发生失效时,只有同步副本才有可能被选为新Leader。
跟随者的正常不活跃时间或在成为不同步副本之前的时间是通过 replica.lag.time.max.ms参数来配置的。这个时间间隔直接影响着Leader选举期间的客户端行为和数据保留机制。我们将在后面讨论可靠性保证,到时候会深入讨论这个问题。
首选Leader
除了当前Leader之外,每个分区都有一个首选Leader——创建主题时选定的Leader就是分区的首选Leader。
之所以把它叫作首选Leader,是因为在创建分区时,需要在 broker 之间均衡Leader(后面会介绍在 broker 间分布副本和Leader的算法)。
因此,我们希望首选Leader在成为真正的Leader时,broker 间的负载最终会得到均衡。默认情况下,Kafka 的 auto.leader.rebalance.enable 被设为 true,它会检查首选Leader是不是当前Leader,如果不是,并且该副本是同步的,那么就会触发Leader选举,让首选Leader成为当前Leader。
找到首选Leader
从分区的副本清单里可以很容易找到首选Leader(可以使用 kafka.topics.sh 工具查看副本和分区的详细信息)。
清单里的第一个副本一般就是首选Leader。不管当前Leader是哪一个副本,都不会改变这个事实,即使使用副本分配工具将副本重新分配给其他 broker。
要记住,如果你手动进行副本分配,第一个指定的副本就是首选Leader,所以要确保首选Leader被传播到其他 broker 上,避免让包含了Leader的 broker 负载过重,而其他broker 却无法为它们分担负载。
参考这里