2. Flink分区策略

ops/2025/1/24 14:24:17/
一. Flink分区策略概述

Flink任务在执行过程中,一个流(stream)包含一个或多个分区(Stream partition),TaskManager中的一个slot的SubTask就是一个stream partition(流分区)。

Flink分区之间进行数据传递模式有两种。
1. one-to-one模式

  • 数据不需要重新分布,上游SubTask生产的数据与下游SubTask收到的数据完全一致,常见的map,filter等算子的SubTask的数据传递都是one-to-one的对应关系,类似于spark中的窄依赖。

2. redistributing模式

  • 数据需要经过重新分区,比如keyBy算子的SubTask的数据传递都是Redistributing方式,类似于spark中的宽依赖。
二. Flink内置分区策略

Flink内置的分区策略有如下8种:

  • ForwardPartitioner
  • RebalancePartitioner
  • ShufflePartitioner
  • BroadcastPartitioner
  • GlobalPartitioner
  • KeyGroupStreamPartitioner
  • RescalePartitioner
  • BinaryHashPartitioner

在上下游的算子没有指定分区策略的情况下,如果上下游的算子并行度一致且满足one-to-one模式,则默认使用ForwardPartitioner,否则使用RebalancePartitioner。在StreamGraph类的源码中可以看到该规则:

java">private void createActualEdge(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner<?> partitioner,OutputTag outputTag,StreamExchangeMode exchangeMode,IntermediateDataSetID intermediateDataSetId) {StreamNode upstreamNode = getStreamNode(upStreamVertexID);StreamNode downstreamNode = getStreamNode(downStreamVertexID);// 如果没有指定分区器,上下游并行度一致的情况下使用ForwardPartitioner, // 否则使用RebalancePartitionerif (partitioner == null&& upstreamNode.getParallelism() == downstreamNode.getParallelism()) {partitioner =dynamic ? new ForwardForUnspecifiedPartitioner<>() : new ForwardPartitioner<>();} else if (partitioner == null) {partitioner = new RebalancePartitioner<Object>();}// ...
}

1. ForwardPartitioner
ForwardPartitioner策略将上游同一个分区的元素发送到了下游同一个分区中。
在这里插入图片描述

代码示例:

java">public class PartitionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);// 禁止operator chain 优化env.disableOperatorChaining();DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);source.print();env.execute("PartitionDemo");}
}

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

2. RebalancePartitioner

在这里插入图片描述
RebalancePartitioner会先随机选一个下游分区,之后轮询(round-robin)遍历下游所有分区进行数据传输。

代码示例:

java">public class PartitionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(4);// 禁止operator chain 优化env.disableOperatorChaining();DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);source.print();env.execute("PartitionDemo");}
}

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

3. ShufflePartitioner

在这里插入图片描述

ShufflePartitioner会随机选取下游分区进行数据传输。下游分区由Random生成的随机数决定。

代码示例:

java">public class PartitionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(4);// 禁止operator chain 优化env.disableOperatorChaining();DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);source.shuffle().print();env.execute("PartitionDemo");}
}

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

4. BroadcastPartitioner

在这里插入图片描述

BroadcastPartitioner会将每条数据发送给下游每一个分区。

代码示例:

java">public class PartitionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(4);// 禁止operator chain 优化env.disableOperatorChaining();DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);source.broadcast().print();env.execute("PartitionDemo");}
}

运行结果:

# scoket发送hello
[root@hadoop3 ~]# nc -lk 8888
hello

最终输出了4条消息

4> hello
1> hello
2> hello
3> hello

此时WebUI上算子链的数据流转关系如下:

在这里插入图片描述

5. GlobalPartitioner
在这里插入图片描述
GlobalPartitioner只将消息下发给下游算子的第一个分区。

代码示例

java">public class PartitionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(4);// 禁止operator chain 优化env.disableOperatorChaining();DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);source.global().print();env.execute("PartitionDemo");}
}

运行结果:

# 向socket发送2条消息
[root@hadoop3 ~]# nc -lk 8888
hello
world

最终两条消息都被发送到了下游1号分区

1> hello
1> world

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

6. KeyGroupStreamPartitioner
在这里插入图片描述
KeyGroupStreamPartitioner将消息发送给key值经过hash计算后的下游分区。

示例代码:

java">public class PartitionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(4);// 禁止operator chain 优化env.disableOperatorChaining();SingleOutputStreamOperator<Tuple2<String, Integer>> mapped = env.socketTextStream("192.168.47.130", 8888).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});mapped.keyBy(0).sum(1).print();env.execute("PartitionDemo");}
}

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

7. RescalePartitioner
在这里插入图片描述
RescalePartitioner在pointwise模式下会先根据上下游并行度进行匹配,再从匹配后的下游中从0号分区轮询传输数据。

代码示例

java">public class PartitionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(4);// 禁止operator chain 优化env.disableOperatorChaining();DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);source.rescale().print();env.execute("PartitionDemo");}
}

此时WebUI上算子链的数据流转关系如下:
在这里插入图片描述

8. BinaryHashPartitioner
BinaryHashPartitioner是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化与反序列化。

三. Flink自定义分区策略

如何Flink内置的分区策略不能满足业务需求,可以通过调用DataStream的partitionCustom()方法实现自定义分区策略。

代码实现

java">public class PartitionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);// 禁止operator chain 优化env.disableOperatorChaining();DataStreamSource<String> source = env.socketTextStream("192.168.47.130", 8888);source.partitionCustom(new MyPartitioner(), new KeySelector<String, String>() {@Overridepublic String getKey(String value) throws Exception {return value;}}).print();env.execute("PartitionDemo");}
}class MyPartitioner implements Partitioner<String> {@Overridepublic int partition(String key, int numPartitions) {// 偶数分到分区1,奇数分到分区2return Integer.parseInt(key) % numPartitions;}
}

运行结果:

# 向socket发送数据
[root@hadoop3 ~]# nc -lk 8888
1
3
5
2
4
6

奇数和偶数输出到不同的分区中

2> 1
2> 3
2> 5
1> 2
1> 4
1> 6

http://www.ppmy.cn/ops/152756.html

相关文章

Kafka后台启动命令

#保存日志 nohup ./kafka-server-start.sh ../config/server.properties > /path/to/logfile.log 2>&1 &#不保存日志 nohup ./kafka-server-start.sh ../config/server.properties >/dev/null 2>&1 & nohup: 是一个Unix/Linux命令&#xff0c;用于…

MySQL性能分析的“秘密武器”,深度剖析SQL问题

MySQL出现性能问题&#xff0c;该怎样深入分析&#xff0c;有什么工具可以使用&#xff1f;本文将分享两个实用的工具&#xff0c;让你事半功倍&#xff01; profile工具 使用profile可以分析当前查询 SQL 语句执行的资源消耗情况&#xff0c;更清楚地了解SQL执行的过程。默认…

CentOS9 安装Docker+Dpanel+onlyoffice(https、更改字体、字号、去除限制)的避坑笔记

CentOS9 安装Dockeronlyoffice&#xff08;https、更改字体、字号、去除文件大小限制&#xff09;的避坑笔记 一、安装Docker二、更新docker镜像源&#xff1a;三、安装Dpanel四、安装onlyoffice&#xff08;开启https及一些碰到的问题&#xff09;五、更改字体和字号六、去除限…

Spring Boot 启动流程解析及重点源码

文章目录 引言Spring Boot 启动类分析1、SpringBootApplication 注解2、 SpringApplication.run() 方法3、Spring Boot 启动流程详解3.1 创建 SpringApplication 实例3.2 准备环境&#xff08;Environment&#xff09;3.3 执行 ApplicationListeners3.4 刷新应用上下文&#xf…

回归算法、聚类算法、决策树、随机森林、神经网络

这也太全了&#xff01;回归算法、聚类算法、决策树、随机森林、神经网络、贝叶斯算法、支持向量机等十大机器学习算法一口气学完&#xff01;_哔哩哔哩_bilibili 【线性回归、代价函数、损失函数】动画讲解_哔哩哔哩_bilibili 14分钟详解所有机器学习算法&#xff1a;…

T-SQL语言的语法

T-SQL深度解析与应用 T-SQL&#xff08;Transact-SQL&#xff09;是微软SQL Server使用的一种扩展SQL&#xff08;结构化查询语言&#xff09;。它不仅支持标准SQL的所有功能&#xff0c;而且增加了许多实用的扩展和特性&#xff0c;使得数据库的操作更加灵活和强大。本文将对…

Mysql索引(学习自用)

目录 一、索引概述 优缺点 二、索引结构 1、索引数据结构 2、索引支持结构 3、B树 4、B树 5、hash索引 6、为啥采用B树索引 三、索引分类 四、索引语法 五、索引性能分析 5.1查看执行频率 5.2慢查询日志 5.3profiling 5.4explain 六、索引使用规则 6.1验证索…

HOW - 基于master的a分支和基于a的b分支合流问题

目录 背景&问题方案解决方式1. 直接将 master 合并到 b 分支2. 重建 b 分支&#xff08;如果冲突过多&#xff0c;建议此方式&#xff09;3. 使用 Git 的“ours”或“theirs”策略解决冲突 总结 背景&问题 我有一个master分支&#xff0c;然后基于此创建了一个a分支&am…