Flink Transformation - 转换算子全面解析

ops/2024/11/24 20:44:17/

Flink Transformation - 转换算子全面解析

一、引言

在Flink的数据流处理中,转换算子(Transformation Operators)扮演着极为关键的角色。它们能够对输入的数据流进行各种处理和转换操作,以满足不同的业务需求。本文将详细介绍Flink中常见的转换算子,包括mapflatMapfilterkeyByreduceunionconnect以及各种物理分区算子,并结合代码示例进行深入讲解。

二、常用转换算子

(一)map算子

map算子用于将一个数据流中的每个元素进行一对一的转换。例如,假设有如下数据,我们可以将其转换为一个LogBean对象并输出。首先,读取本地文件的方式如下:

DataStream<String> lines = env.readTextFile("./data/input/flatmap.log");

假设LogBean类有相应的字段定义(例如String field1; String field2;等),map算子的使用示例如下:

DataStream<LogBean> logBeanStream = lines.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {// 解析line并创建LogBean对象LogBean logBean = new LogBean();// 设置LogBean的各个字段值return logBean;}
});

(二)FlatMap算子

FlatMap算子将数据流中的每个元素转换为零个、一个或多个元素。例如,读取flatmap.log文件中的数据,如“张三,苹果手机,联想电脑,华为平板”,可以转换为“张三有苹果手机”“张三有联想电脑”“张三有华为平板”等。代码演示如下:

DataStream<String> lines = env.readTextFile("./data/input/flatmap.log");
DataStream<String> resultStream = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] items = line.split(",");for (int i = 1; i < items.length; i++) {collector.collect(items[0] + "有" + items[i]);}}
});

(三)Filter算子

Filter算子用于根据指定的条件过滤数据流中的元素。例如,读取a.log文件中的访问日志数据,过滤出访问IP是83.149.9.216的访问日志:

DataStream<String> lines = env.readTextFile("./data/input/a.log");
DataStream<String> filteredStream = lines.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {// 解析line获取IP并判断是否为目标IPString ip = parseIPFromLine(line);return "83.149.9.216".equals(ip);}
});

(四)KeyBy算子

在流处理中,KeyBy算子类似于批处理中的groupBy,用于按照指定的键对数据进行分组。KeySelector对象可以支持元组类型,也可以支持POJO(如EntryJavaBean)。

  • 元组类型
    • 单个字段keyBy:例如,对于一个包含Tuple2<String, Integer>类型的数据流,如果要按照第一个字段(String类型)进行分组,可以这样写:
DataStream<Tuple2<String, Integer>> tupleStream =...;
KeyedStream<Tuple2<String, Integer>, String> keyedStream = tupleStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple) throws Exception {return tuple.f0;}
});
- **多个字段`keyBy`**:类似于SQL中的`group by`多个字段,例如对于`Tuple3<String, Integer, Double>`类型的数据流,按照第一个和第二个字段进行分组:
DataStream<Tuple3<String, Integer, Double>> tuple3Stream =...;
KeyedStream<Tuple3<String, Integer, Double>, Tuple2<String, Integer>> keyedStream = tuple3Stream.keyBy(new KeySelector<Tuple3<String, Integer, Double>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple3<String, Integer, Double> tuple) throws Exception {return Tuple2.of(tuple.f0, tuple.f1);}
});
  • POJO类型
    • 单个字段keyBy:假设User类有idname等字段,要按照id字段进行分组:
DataStream<User> userStream =...;
KeyedStream<User, String> keyedStream = userStream.keyBy(new KeySelector<User, String>() {@Overridepublic String getKey(User user) throws Exception {return user.getId();}
});
- **多个字段`keyBy`**:例如按照`User`类的`id`和`age`字段进行分组:
DataStream<User> userStream =...;
KeyedStream<User, Tuple2<String, Integer>> keyedStream = userStream.keyBy(new KeySelector<User, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(User user) throws Exception {return Tuple2.of(user.getId(), user.getAge());}
});

(五)Reduce算子

Reduce算子可以对一个数据集或一个分组来进行聚合计算,最终聚合成一个元素。例如,读取a.log日志,统计ip地址访问pv数量,使用reduce操作聚合成一个最终结果:

DataStream<String> lines = env.readTextFile("./data/input/a.log");
DataStream<Tuple2<String, Integer>> ipCountStream = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String line) throws Exception {// 解析line获取IP并设置初始计数为1String ip = parseIPFromLine(line);return Tuple2.of(ip, 1);}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple) throws Exception {return tuple.f0;}
}).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value2.f1);}
});

(六)flatMap/map/filter/keyby/reduce综合练习

需求是对流数据中的单词进行统计,排除敏感词“TMD”(腾讯美团滴滴)。首先启动netcat服务端(在Windows上解压相关软件后,在路径中输入cmd,然后启动服务端),客户端双击nc.exe即可。代码示例如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> socketStream = env.socketTextStream("localhost", 8888);
DataStream<String> filteredWords = socketStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {if (!"TMD".equals(word)) {collector.collect(word);}}}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1);}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple) throws Exception {return tuple.f0;}
}).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value2.f1);}
});
filteredWords.print();
env.execute();

三、合并和连接算子

(一)Union算子

Union算子可以合并多个同类型的流,将多个DataStream合并成一个DataStream。需要注意的是,union合并的DataStream的类型必须是一致的,并且union可以取并集,但是不会去重。例如:

DataStream<String> stream1 = env.fromElements("a", "b", "c");
DataStream<String> stream2 = env.fromElements("c", "d", "e");
DataStream<String> unionStream = stream1.union(stream2);

(二)Connect算子

Connect算子可以连接2个不同类型的流(最后需要处理后再输出)。DataStreamDataStream连接后得到ConnectedStreams:连接两个保持它们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据和形式不发生任何变化(类似“一国两制”),两个流相互独立,作为对比Union后是真的变成一个流了。和union类似,但是connect只能连接两个流,两个流之间的数据类型可以不同,对两个流的数据可以分别应用不同的处理逻辑。例如:

DataStream<Integer> streamA = env.fromElements(1, 2, 3);
DataStream<String> streamB = env.fromElements("a", "b", "c");
ConnectedStreams<Integer, String> connectedStreams = streamA.connect(streamB);

四、Side Outputs侧道输出(侧输出流)

侧输出流可以对流中的数据按照特定规则进行分流。例如,对流中的数据按照奇数和偶数进行分流,并获取分流后的数据。具体实现时,可以在ProcessFunction中使用OutputTag来定义侧输出流,并根据条件将数据发送到不同的侧输出流中。

五、物理分区算子

Flink提供了以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。

(一)Global Partitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)。例如:

DataStream<Long> stream =...;
stream.global();

(二)Shuffle Partitioner

根据均匀分布随机划分元素。使用示例:

DataStream<Long> stream =...;
stream.shuffle();

(三)Broadcast Partitioner

发送到下游所有的算子实例,是将上游的所有数据,都给下游的每一个分区一份。例如:

DataStream<Long> stream =...;
stream.broadcast();

(四)Rebalance Partitioner(重分区)

通过循环的方式依次发送到下游的task,用于解决数据倾斜问题(当某一个分区数据量过大时)。可以通过人为制造数据不平衡,然后使用rebalance方法让其平衡,并通过观察每一个分区的总数来观察效果。例如:

DataStream<Long> stream =...;
DataStream<Long> rebalancedStream = stream.rebalance();

(五)Forward Partitioner

发送到下游对应的第一个task,保证上下游算子并行度一致,即上下游算子与下游算子是1:1的关系。在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner。对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。例如:

DataStream<Long> stream =...;
stream.forward();

(六)Custom(自定义)Partitioning

使用用户定义的Partitioner为每个元素选择目标任务。例如:

class CustomPartitioner implements Partitioner<Long>{@Overridepublic int partition(Long key, int numPartitions) {System.out.println(numPartitions);if(key < 10000){return 0;}return 1;}
}
DataStreamSource<Long> streamSource = env.fromSequence(1, 15000);
DataStream<Long> dataStream = streamSource.partitionCustom(new CustomPartitioner(), new KeySelector<Long, Long>() {@Overridepublic Long getKey(Long value) throws Exception {return value;}
});

六、总结

Flink的转换算子为数据流的处理提供了丰富而强大的功能。通过合理地组合和运用这些算子,可以构建出复杂而高效的数据流处理逻辑,以满足各种大数据处理场景下的业务需求。在实际应用中,需要根据数据的特点、业务逻辑以及性能要求等因素,灵活选择和配置合适的转换算子,从而充分发挥Flink在大数据处理领域的优势。


http://www.ppmy.cn/ops/136396.html

相关文章

Arduino UNO驱动ADS1220模数转换模块

文章目录 简介ADS1220特性参数ADS1220模数转换模块原理图Arduino UNO与ADS1220接线简单测试K型热电偶测试 简介 ADS1220 是一款精密的 24 位模数转换器 (ADC)&#xff0c;它提供许多集成功能&#xff0c;可降低测量小传感器信号的应用中的系统成本和元件数量。该器件具有两个差…

webkit浏览器内核编译(2024年11月份版本)

webkit浏览器内核编译 本文详细介绍了如何安装和配置Webkit的编译环境和工具的安装&#xff0c;以及在Windows上编译和运行WebKit浏览器引擎的过程&#xff0c;包括安装依赖、设置环境变量、生成解决方案并最终运行附带的MiniBrowser示例。 一、WebKit简介 WebKit 是一个开源的…

AR智能眼镜|AR眼镜定制开发|工业AR眼镜方案

AR眼镜的设计与制造成本主要受到芯片、显示屏和光学方案的影响&#xff0c;因此选择合适的芯片至关重要。一款优秀的芯片平台能够有效提升设备性能&#xff0c;并解决多种技术挑战。例如&#xff0c;采用联发科八核2.0GHz处理器&#xff0c;结合12nm制程工艺&#xff0c;这种低…

Linux:进程

✨✨所属专栏&#xff1a;Linux✨✨ ✨✨作者主页&#xff1a;嶔某✨✨ 冯诺依曼体系结构 现代计算机发展所遵循的基本结构形式始终是冯诺依曼结构。这种结构特点是“程序存储&#xff0c;共享数据&#xff0c;顺序执行”&#xff0c;需要CPU从存储器取出指令和数据进行相应的…

秋招面试基础总结,Java八股文基础(串联知识),四万字大全

目录 值传递和引用传递 静态变量和静态代码块的执行顺序 Java​​​​​​​集合的框架&#xff0c;Set,HashSet,LinkedHashSet这三个底层是什么 多线程篇 Java实现多线程的方式 假设一个线程池&#xff0c;核心线程数是2&#xff0c;最大线程数是3&#xff0c;阻塞队列是4…

一种构建网络安全知识图谱的实用方法

文章主要工作 论述了构建网络安全知识库的三个步骤&#xff0c;并提出了一个构建网络安全知识库的框架;讨论网络安全知识的推演 1.框架设计 总体知识图谱框架如图1所示&#xff0c;其包括数据源&#xff08;结构化数据和非结构化数据&#xff09;、信息抽取及本体构建、网络…

STL之哈希

STL之哈希 unordered_set/map&哈希之介绍unordered系列哈希哈希表的模拟实现 unordered_set&/map的模拟实现哈希的应用位图&#xff08;bitmap/bitset&#xff09;布隆过滤器&#xff08;Bloom Filter&#xff09;海量数据处理 unordered_set/map&哈希之介绍 unor…

联邦学习安全聚合算法综述(论文解析)以及如何确定自己研究方向的方法

自己写相关论文的方法&#xff1a; 可以重点看看综述类论文的未来研究方向和引言中前人已经做过的内容 联邦学习安全聚合算法综述 auth:江萍 1 通讯作者 李芯蕊 1 赵晓阳 2 杭永凯 摘要 摘要&#xff1a;随着深度学习技术的发展&#xff0c;人工智能在社会的各个方面有着重要…