Kafka-副本分配策略

news/2024/11/24 7:38:32/

一、上下文

《Kafka-创建topic源码》我们大致分析了topic创建的流程,为了保持它的完整性和清晰度。细节并没有展开分析。下面我们就来分析下副本的分配策略以及副本中的leader角色的确定逻辑。当有了副本分配策略,才会得到分区对应的broker,才可以在topic目录下写入对应的数据。Controller端才可以让这些分区和副本上线去提供服务。副本的leader角色确定后才能使producer生产的数据知道第一个写入的broker节点是哪个?以及follower的同步工作。

二、目的

1、将副本均匀地分布在broker之间

2、对于分配给特定broker的分区,其其他副本分布在其他broker上

3、如果所有broker都有机架信息,请尽可能将每个分区的副本分配给不同的机架

如果不考虑机架的情况下也要实现副本分配的目标,我们的做法是这样的

1、从broker列表中的随机位置开始,通过循环分配每个分区的第一个副本

2、以递增的移位分配每个分区的剩余副本

三、示例

1、场景描述

假如一个topic有6个分区(0, 1, 2, 3, 4, 5)且副本因子为3,对应的集群情况如下图:

每个机架对应的brokerid如下:

机架brokerId列表
rack10,5
rack23,4
rack31,2

获取机架交错的broker列表:(0, 3, 1, 5, 4, 2)

有了这个列表就课可以以简单的循环方式将副本分配给broker,确保每个broker上的leader和follower数量均匀分布,并将副本分配到所有机架。

2、分配结果

分区副本所在的brokerId列表
00,3,1
13,1,5
21,5,4
35,4,2
44,2,0
52,0,3

机架感知分配总是使用机架交替broker列表上的轮询来选择分区的第一个副本。对于其余的副本,它将偏向于机架上没有任何副本分配的broker,直到每个机架都有一个副本。然后,任务将回到broker 列表上的循环。

因此,如果副本的数量 >= 机架的数量,它将确保每个机架至少获得一个副本。否则,每个机架最多只能获得一个副本。在副本数量与机架数量相同并且每个机架具有相同数量的代理的完美情况下,它保证了副本在broker和机架之间的分布是均匀的。

此时如果再增加一个分区(6分区)呢?按照规律分配的副本所在的broker列表应该是0,3,1。但如果这样就违背了目的3。一旦它完成了第一轮循环,如果有更多的分区要分配,算法将开始转移follower 。这是为了确保我们不会总是得到相同的序列集。因此分区6可能分配到的副本broker列表为0,4,2。

四、源码

    //fixedStartIndex =-1 , startPartitionId = -1private static Map<Integer, List<Integer>> assignReplicasToBrokersRackAware(int nPartitions,int replicationFactor,Collection<BrokerMetadata> brokerMetadatas,int fixedStartIndex,int startPartitionId) {//存放broker和机架的对应关系Map<Integer, String> brokerRackMap = new HashMap<>();//获取broker和机架的对应关系brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.get()));//机架数量int numRacks = new HashSet<>(brokerRackMap.values()).size();//获取交替机架的broker列表List<Integer> arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap);//broker 数量int numBrokers = arrangedBrokerList.size();Map<Integer, List<Integer>> ret = new HashMap<>();// fixedStartIndex 的初始值是 -1 ,因此 startIndex = 一个 0 至 broker 数量 之间的随机整数int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size());// startPartitionId 的初始值是 -1 ,因此 currentPartitionId = 0int currentPartitionId = Math.max(0, startPartitionId);//下一个要分配的副本 ,第一次应该是 一个 0 至 broker 数量 之间的随机整数int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size());//循环分区列表,对每个分区进行副本分配for (int i = 0; i < nPartitions; i++) {if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size() == 0))nextReplicaShift += 1;//第一个副本索引int firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size();//默认将第一个副本 作为leaderint leader = arrangedBrokerList.get(firstReplicaIndex);List<Integer> replicaBuffer = new ArrayList<>();replicaBuffer.add(leader);Set<String> racksWithReplicas = new HashSet<>();racksWithReplicas.add(brokerRackMap.get(leader));Set<Integer> brokersWithReplicas = new HashSet<>();brokersWithReplicas.add(leader);//根据副本因子,进行副本的分配,因为有了leader,因此只用循环处理 replicationFactor - 1 的甚于副本分配int k = 0;for (int j = 0; j < replicationFactor - 1; j++) {boolean done = false;while (!done) {Integer broker = arrangedBrokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size()));String rack = brokerRackMap.get(broker);// 跳过这个broker ,如果满足以下2个条件中的1个//1、同一机架中已经有一个broker分配了副本,并且有一个或多个机架没有任何副本,或者//2、broker已经分配了副本,但有一个或多个broker没有分配副本if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size() == numRacks)&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size() == numBrokers)) {replicaBuffer.add(broker);racksWithReplicas.add(rack);brokersWithReplicas.add(broker);done = true;}k += 1;}}//返回分区对应的副本的broker列表ret.put(currentPartitionId, replicaBuffer);currentPartitionId += 1;}return ret;}

http://www.ppmy.cn/news/1549484.html

相关文章

RabbitMQ1:初识MQ

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

python安装包中的一些问题(一):conda list 已经安装的包为啥在spyder pip list中没有?

出现这种情况的原因可能是因为你在 Spyder 中运行的 Python 环境与 Conda 环境不同&#xff0c;导致它们各自有各自的包列表。以下是一些可能的原因和解决方法&#xff1a; 1. Spyder 使用的 Python 环境与 Conda 环境不同 在 Conda 环境中安装的包&#xff0c;只会在该环境内…

2024年11月22日Github流行趋势

项目名称&#xff1a;twenty 项目维护者&#xff1a;charlesBochet, lucasbordeau, Weiko, FelixMalfait, bosiraphael 项目介绍&#xff1a;正在构建一个由社区驱动的现代Salesforce替代方案。 项目star数&#xff1a;22,938 项目fork数&#xff1a;2,413 项目名称&#xff1…

首次实现!在Docker容器中运行macOS项目,自动化下载与Web体验

# 在 Docker 容器中运行 macOS 项目 ## 项目简介 该项目旨在实现在 Docker 容器中运行 macOS 系统的功能。通过 KVM 加速和基于 Web 的查看器&#xff0c;用户可以在 Docker 环境中体验 macOS 操作系统。此外&#xff0c;该项目还提供自动下载功能&#xff0c;方便用户获取和使…

搜维尔科技:多画面显示3D系统解决方案,数据孪生可视化大屏3D展示技术

集成多画面系统 集成多画面系统解决方案 1.适合多个用户的紧凑型入门级解决方案 2.会议室功能、审批功能、3D模型讨论等多种使用可能性 3.配有组合设备&#xff0c;方便整合 CAVE 多画面显示系统 1.专业的大屏幕多画面解决方案 2.墙壁、天花板和地板三面CAVE 3.专为沉浸…

了解大模型:开启智能科技的新篇章

在当今科技飞速发展的时代,人工智能(AI)已经成为推动社会进步的重要力量。而在AI的众多技术分支中,大模型(Large Model)以其强大的数据处理能力和卓越的性能,正逐渐成为研究和应用的热点。本文旨在科普大模型的基本概念、与大数据的关系以及与人工智能的紧密联系,帮助读…

生产制造领域的多元化模式探索

在当今全球化和信息化的时代背景下&#xff0c;生产制造领域正经历着前所未有的变革。随着消费者需求的多样化、市场竞争的加剧以及技术的不断进步&#xff0c;传统的生产制造模式已经难以满足现代企业的需求。因此&#xff0c;多种生产制造模式应运而生&#xff0c;以适应不同…

神经网络问题之二:梯度爆炸(Gradient Explosion)

梯度爆炸&#xff08;Gradient Explosion&#xff09;是神经网络训练过程中常见的一个问题&#xff0c;它指的是在反向传播过程中&#xff0c;梯度值变得非常大&#xff0c;超出了网络的处理范围&#xff0c;从而导致权重更新变得不稳定甚至不收敛的现象。 一、产生原因 梯度爆…