聊聊Flink:Flink的分区机制

server/2024/11/19 20:05:05/

一、前言

flink任务在执行过程中,一个流(stream)包含一个或多个分区(Stream partition)。TaskManager中的一个slot的subtask就是一个stream partition(流分区),一个Job的流(stream)分布在多个不同的Slot上执行。每一个算子可以包含一个或多个子任务(subtask),这些subtask执行在不同的分区中,本质是在不同的线程、不同的物理机或不同的容器中彼此互不依赖地执行。

1.1 Flink数据传输

  • 组件之间的通信消息传输,即Client、JobManager、TaskManager之间的信息传递,采用Akka框架(主要用作组件间的协同,如心跳检测、状态上报、指标统计、作业提交和部署等)。
  • 算子之间的流数据传输
    • 本地线程内的流数据传输(同一个SubTask中):同一个SubTask内的两个Operator(属于同一个OperatorChain)之间的数据传输是方法调用,即上游算子处理完数据后,直接调用下游算子的processElement方法。
    • 本地线程间的流数据传输(同一个TaskManager的不同SubTask中):即同一个TaskManager(JVM进程)中的不同Task(线程,本质上是SubTask)的算子之间的数据传输,通过本地内存进行数据传递,存在数据序列化和反序列过程。
    • 跨网络的流数据传输(不同TaskManager的SubTask中):采用Netty框架,通过Socket传递,也存在数据序列化和反序列过程。

flink中的重分区算子定义上下游subtask之间数据传递的方式,SubTask之间进行数据传递模式有两种,一种是one-to-oneforwarding)模式,另一种是redistributing的模式。

1.2 重分区算子数据传递的两种方式

  • One-to-one:数据不需要重新分布,上游SubTask生产的数据与下游SubTask受到的数据完全一致,数据不需要重分区,也就是数据不需要经过IO,比如下图中source->map的数据传递形式就是One-to-One方式。常见的map、fliter、flatMap等算子的SubTask的数据传递都是one-to-one的对应关系。类似于spark中的窄依赖。
  • Redistributing:数据需要通过shuffle过程重新分区,需要经过IO,比如上图中的map->keyBy。创建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的数据传递都是Redistributing方式,但它们具体数据传递方式是不同的。类似于spark中的宽依赖。

在这里插入图片描述

flink中的重分区算子除了keyBy以外,还有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多种算子,它们的分区方式各不相同。需要注意的是,这些算子中除了keyBy能将DataStream转化为KeyedStream外,其它重分区算子均不会改变Stream的类型。

二、分区策略

数据在算子之间流动需要依靠分区策略(分区器),Flink目前内置了以下几种分区策略和自定义分区策略。已实现的分区策略对应的API为:
在这里插入图片描述
自定义分区策略的API为CustomPartitionerWrapper。

各个API的继承关系如下图所示:
在这里插入图片描述
ChannelSelector是分区策略的顶层接口,其决定了记录应该写入哪个逻辑通道,通道可理解为下游算子的某个实例,或下游并行算子的某个子任务。该接口的定义源码如下:
在这里插入图片描述

抽象类StreamPartitioner实现了ChannelSelector接口,是一个用于流程序的特殊的ChannelSelector,其中定义了一些通用的分区策略方法。Flink中的所有分区策略(分区器)都继承了StreamPartitioner类,并且实现了各自独有的分区规则。

三、内置分区策略

3.1 BinaryHashPartitioner

该分区策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化/反序列化。RowData用于表示结构化数据类型,运行时通过Table API或SQL管道传递的所有顶级记录都是RowData的实例。关于BinaryHashPartitioner,我们这里不做过多讲解。

3.2 BroadcastPartitioner

广播分区策略将上游数据记录输出到下游算子的每个并行实例中,即下游每个分区都会有上游的所有数据。使用DataStream的broadcast()方法即可设置该DataStream向下游发送数据时使用广播分区策略。

来一段代码演示下:

/*** 微信公众号:老周聊架构*/
public class PartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6);//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号dataStream.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}});//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Integer> dataStreamAfter = dataStream.broadcast();//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).print();env.execute("PartitionerTest Job");}
}

直接IDEA控制台输出:
在这里插入图片描述

从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
在这里插入图片描述
执行分区策略后,每个元素所属的分区如下:
在这里插入图片描述
对比表发现,广播分区策略将上游每个元素分别发送到了下游算子的所有分区,这种策略会把数据复制多份,向下游算子的每个分区发送一份。
在这里插入图片描述
我们把上面的任务提交到Flink,同样也可以看出前面分区前每个子任务两条数据,分区后每个子任务六条数据。
在这里插入图片描述
在这里插入图片描述
3.3 ForwardPartitioner

转发分区策略只将元素转发给本地运行的下游算子的实例,即将元素发送到与当前算子实例在同一个TaskManager的下游算子实例,而不需要进行网络传输。要求上下游算子并行度一样,这样上下游算子可以同属一个子任务。

这里把上面的代码调整下:

dataStream.forward()

IDEA控制台输出:
在这里插入图片描述
从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
在这里插入图片描述

执行分区策略后,每个元素所属的分区如下:
在这里插入图片描述

对比发现,转发分区策略将上游同一个分区的元素发送到了下游同一个分区中。使用数据流图表示如下图:

在这里插入图片描述
在上下游的算子没有指定分区策略的情况下,如果上下游的算子并行度一致,则默认使用ForwardPartitioner,否则使用RebalancePartitioner。在StreamGraph类的源码中可以看到该规则:
在这里插入图片描述
对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。
在这里插入图片描述
3.4 GlobalPartitioner

全局分区策略将上游所有元素发送到下游子任务编号等于0的分区算子实例上(下游第一个实例)。

这里把上面的代码调整下:

dataStream.global()

IDEA控制台输出:
在这里插入图片描述
分区前:
在这里插入图片描述

分区后:
在这里插入图片描述

全局分区策略将上游所有分区中的所有元素发送到了下游编号为0的分区中:
在这里插入图片描述

3.5 .KeyGroupStreamPartitioner

Key分区策略根据元素Key的Hash值输出到下游算子指定的实例。keyBy()算子底层正是使用的该分区策略,底层最终会调用KeyGroupStreamPartitioner的selectChannel()方法,计算每个Key对应的通道索引(通道编号,可理解为分区编号),根据通道索引将Key发送到下游相应的分区中。selectChannel()方法源码如下:
在这里插入图片描述
在这里插入图片描述

总的来说,Flink底层计算通道索引(分区编号)的流程如下:

  • 计算Key的HashCode值。
  • 将Key的HashCode值进行特殊的Hash处理,即MathUtils.murmurHash(keyHash),返回一个非负哈希码。
  • 将非负哈希码除以最大并行度取余数,得到keyGroupId,即Key组索引。
  • 使用公式keyGroupId×parallelism/maxParallelism得到分区编号。parallelism为当前算子的并行度,即通道数量;maxParallelism为系统默认支持的最大并行度,即128。

3.6 RebalancePartitioner

平衡分区策略使用循环遍历下游分区的方式,将上游元素均匀分配给下游算子的每个实例。每个下游算子的实例都具有相等的负载。当数据流中的元素存在数据倾斜时,使用该策略对性能有很大的提升。

这里把上面的代码调整下:

dataStream.setParallelism(2);

dataStreamAfter.setParallelism(3);

dataStream.rebalance()

IDEA控制台输出:
在这里插入图片描述

分区前:
在这里插入图片描述
分区后:

在这里插入图片描述

平衡分区策略将上游所有元素均匀发送到了下游算子的所有分区:
在这里插入图片描述

3.7 RescalePartitioner

重新调节分区策略基于上下游算子的并行度,将元素以循环的方式输出到下游算子的每个实例。类似于平衡分区策略,但又与平衡分区策略不同。

上游算子将元素发送到下游哪一个算子实例,取决于上游和下游算子的并行度。例如,如果上游算子的并行度为2,而下游算子的并行度为4,那么一个上游算子实例将把元素均匀分配给两个下游算子实例,而另一个上游算子实例将把元素均匀分配给另外两个下游算子实例。相反,如果下游算子的并行度为2,而上游算子的并行度为4,那么两个上游算子实例将分配给一个下游算子实例,而另外两个上游算子实例将分配给另一个下游算子实例。

假设上游算子并行度为2,分区编号为A和B,下游算子并行度为4,分区编号为1、2、3、4,那么A将把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游算子并行度为4,编号为A、B、C、D,下游算子并行度为2,编号为1、2,那么A和B把数据发送给1,C和D则把数据发送给2。

这里把上面的代码调整下:

dataStream.rescale()

同时将第一个map算子的并行度设置为2,第二个map算子的并行度设置为4。

IDEA控制台输出:
在这里插入图片描述

分区前:
在这里插入图片描述

分区后:
在这里插入图片描述
在这里插入图片描述

接下来改变map算子的并行度,将第一个map算子的并行度设置为4,第二个map算子的并行度设置为2。

在这里插入图片描述

如果想将元素均匀地输出到下游算子的每个实例,以实现负载均衡,同时又不希望使用平衡分区策略的全局负载均衡,则可以使用重新调节分区策略。该策略会尽可能避免数据在网络间传输,而能否避免还取决于TaskManager的Task Slot数量、上下游算子的并行度等。

3.8 ShufflePartitioner

随机分区策略将上游算子元素输出到下游算子的随机实例中。元素会被均匀分配到下游算子的每个实例。这种策略可以实现计算任务的负载均衡。

这里把上面的代码调整下:

dataStream.shuffle()

这里就不做过多演示了。我们下面来看下自定义分区策略。

四、自定义分区策略

自定义分区策略的API为CustomPartitionerWrapper。该策略允许开发者自定义规则将上游算子元素发送到下游指定的算子实例中。

4.1 新建自定义分区器

新建分区器类MyCustomPartitioner并实现接口Partitioner(Object表示分区Key的数据类型),实现其中未实现的方法partition(),在该方法中添加相应的分区逻辑。

/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class MyCustomPartitioner implements Partitioner {@Overridepublic int partition(Object key, int numPartitions) {if (key.equals("chinese")) {return 0;} else if (key.equals("math")) {return 1;} else {return 2;}}
}

上述代码通过partition()方法取得分区编号,将Key值等于chinese的元素分配到编号为0的分区,将Key值等于math的元素分配到编号为1的分区,其余元素分配到编号为2的分区。

4.2 使用自定义分区器

调用DataStream的partitionCustom()方法传入自定义分区器类MyCustomPartitioner的实例,可以对DataStream按照自定义规则进行重新分区,代码如下:

/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class CustomPartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<String> dataStream = env.fromElements("chinese,98", "math,88", "english,96");//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号SingleOutputStreamOperator<Map<String, Integer>> dataStreamBefore =dataStream.map(new RichMapFunction<String, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(String value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));Map<String, Integer> map = new HashMap<>();map.put(value.split(",")[0], Integer.parseInt(value.split(",")[1]));return map;}}).setParallelism(2);//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Map<String, Integer>> dataStreamAfter = dataStreamBefore.partitionCustom(new MyCustomPartitioner(), value -> value);//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Map<String, Integer>, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(Map<String, Integer> value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).setParallelism(3).print();env.execute("CustomPartitionerTest Job");}
}

分区前:
在这里插入图片描述

分区后:

在这里插入图片描述
自定义分区策略将上游所有元素按照自定义的规则发送到了下游的3个分区中。

把任务给到Flink上去跑,发现:
在这里插入图片描述

这是因为泛型擦除,下面的DataStream泛型需要指定类型,不能
在这里插入图片描述

小知识:

在编译之后程序会采取去泛型化的措施。也就是说Java中的泛型,只在编译阶段有效。在编译过程中,正确检验泛型结果后,在运行时会将泛型的相关信息擦除,编译器只会在对象进入JVM和离开JVM的边界处添加类型检查和转换的方法,泛型的信息不会进入到运行时阶段,这就是所谓的Java类型擦除

类型加好以后,再跑一下任务,会出现任务成功。

在这里插入图片描述
在这里插入图片描述


http://www.ppmy.cn/server/143284.html

相关文章

ChatGPT-o1快速完成论文选题的9类提示词

学境思源&#xff0c;一键生成论文初稿&#xff1a; AcademicIdeas - 学境思源AI论文写作 论文选题往往是学术写作的关键第一步&#xff0c;选题的好坏直接影响整个写作的质量和方向。ChatGPT-o1凭借强大的语言生成能力&#xff0c;能够帮助写作者快速构思和选择合适的论文主题…

MySQL数据库:SQL语言入门 【3】(学习笔记)

目录 5&#xff0c;TCL —— 事务控制语言&#xff08;Transaction Control Language&#xff09; &#xff08;1&#xff09;事务的概念作用 &#xff08;2&#xff09;事务的特性 【1】原子性 【2】一致性 【3】隔离性 【4】持久性 &#xff08;3&#xff09;并发事务带来…

基于Java Springboot宠物猫售卖管理系统

一、作品包含 源码数据库全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据库&#xff1a;…

LLaMA与ChatGLM选用比较

目录 1. 开发背景 2. 目标与应用 3. 训练数据 4. 模型架构与规模 5. 开源与社区支持 6. 对话能力 7. 微调与应用 8. 推理速度与资源消耗 总结 LLaMA(Large Language Model Meta AI)和 ChatGLM(Chat Generative Language Model)都是强大的大型语言模型,但它们有一…

前后端学习

以下是一个后端开发过程中需要从前端浏览器返回数据分析后端相关内容或报错的详细学习笔记&#xff0c;旨在帮助您全面理解和掌握在后端开发中如何有效地处理和分析前端浏览器返回的数据与报错信息。 目录 引言前端与后端通信基础 2.1 HTTP协议概述 2.2 常见的请求方法 2.3 数…

实现了两种不同的图像处理和物体检测方法

这段代码实现了两种不同的图像处理和物体检测方法&#xff1a;一种是基于Canny边缘检测与轮廓分析的方法&#xff0c;另一种是使用TensorFlow加载预训练SSD&#xff08;Single Shot Multibox Detector&#xff09;模型进行物体检测。 1. Canny边缘检测与轮廓分析&#xff1a; …

【专题】2024AIGC创新应用洞察报告汇总PDF洞察(附原数据表)

原文链接&#xff1a;https://tecdat.cn/?p38310 在科技日新月异的今天&#xff0c;人工智能领域正以前所未有的速度发展&#xff0c;AIGC&#xff08;人工智能生成内容&#xff09;成为其中最耀眼的明珠。从其应用场景的不断拓展&#xff0c;到对各行业的深刻变革&#xff0…

【VLANPWN】一款针对VLAN的安全研究和渗透测试工具

关于VLANPWN VLANPWN是一款针对VLAN的安全研究和渗透测试工具&#xff0c;该工具可以帮助广大研究人员通过对VLAN执行渗透测试&#xff0c;来研究和分析目标VLAN的安全状况。该工具专为红队研究人员和安全学习爱好者设计&#xff0c;旨在训练网络工程师提升网络的安全性能&…