Flink数据转换方法使用案例总结

news/2024/10/30 13:31:31/

目录

  • Flink数据转换方法使用案例
    • Map
    • FlatMap
    • Filter
    • KeyBy
    • Reduce
    • Aggregations
    • Window
    • Join
    • Union
    • Project
    • Distinct
    • Sort
    • Partition
    • Iterate
    • Fold
    • 使用 Flink 数据转换 Conclusion 的案例
      • 问题描述
      • 解决方案
      • 结论

Flink数据转换方法使用案例

Apache Flink是一个分布式流处理框架,它提供了丰富的数据转换方法,可以帮助我们对数据进行各种各样的转换操作。本文将介绍Flink中常用的数据转换方法,并提供相应的使用案例。

Map

Map方法可以将输入的数据转换成另一种形式,常用于数据清洗、数据格式转换等场景。

DataStream<String> input = env.fromElements("hello world", "flink is awesome");
DataStream<String> output = input.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.toUpperCase();}
});

上述代码将输入的字符串转换成大写形式。

FlatMap

FlatMap方法可以将输入的数据转换成多个输出,常用于数据拆分、数据过滤等场景。

DataStream<String> input = env.fromElements("hello world", "flink is awesome");
DataStream<String> output = input.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(word);}}
});

上述代码将输入的字符串按照空格拆分成多个单词,并输出每个单词。

Filter

Filter方法可以根据条件过滤输入的数据,常用于数据筛选、数据清洗等场景。

DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> output = input.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value % 2 == 0;}
});

上述代码将输入的整数中的偶数过滤出来。

KeyBy

KeyBy方法可以将输入的数据按照指定的Key进行分组,常用于数据聚合、数据统计等场景。

DataStream<Tuple2<String, Integer>> input = env.fromElements(new Tuple2<>("a", 1),new Tuple2<>("b", 2),new Tuple2<>("a", 3),new Tuple2<>("b", 4)
);
DataStream<Tuple2<String, Integer>> output = input.keyBy(0).sum(1);

上述代码将输入的Tuple按照第一个元素进行分组,并对每组的第二个元素进行求和。

Reduce

Reduce方法可以对输入的数据进行归约操作,常用于数据聚合、数据统计等场景。

DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> output = input.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}
});

上述代码将输入的整数进行求和操作。

Aggregations

Aggregations方法可以对输入的数据进行聚合操作,常用于数据统计、数据分析等场景。

DataStream<Tuple2<String, Integer>> input = env.fromElements(new Tuple2<>("a", 1),new Tuple2<>("b", 2),new Tuple2<>("a", 3),new Tuple2<>("b", 4)
);
DataStream<Tuple2<String, Integer>> output = input.keyBy(0).maxBy(1);

上述代码将输入的Tuple按照第一个元素进行分组,并对每组的第二个元素进行最大值计算。

Window

Window方法可以对输入的数据进行窗口操作,常用于数据统计、数据分析等场景。

DataStream<Tuple2<String, Integer>> input = env.fromElements(new Tuple2<>("a", 1),new Tuple2<>("b", 2),new Tuple2<>("a", 3),new Tuple2<>("b", 4)
);
DataStream<Tuple2<String, Integer>> output = input.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1);

上述代码将输入的Tuple按照第一个元素进行分组,并对每组的第二个元素进行5秒的滚动窗口求和操作。

Join

DataStream<Tuple2<String, Integer>> stream1 = ...;
DataStream<Tuple2<String, Integer>> stream2 = ...;// 使用Key进行连接
stream1.join(stream2).where(tuple -> tuple.f0).equalTo(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws Exception {return new Tuple3<>(first.f0, first.f1, second.f1);}});

Union

DataStream<String> stream1 = ...;
DataStream<String> stream2 = ...;// 将两个流合并成一个流
DataStream<String> unionStream = stream1.union(stream2);

Project

DataStream<Tuple2<String, Integer>> stream = ...;// 只保留Tuple中的第一个元素
DataStream<String> projectedStream = stream.map(tuple -> tuple.f0);

Distinct

DataStream<String> stream = ...;// 去重
DataStream<String> distinctStream = stream.distinct();

Sort

DataStream<Tuple2<String, Integer>> stream = ...;// 按照Tuple中的第二个元素进行升序排序
DataStream<Tuple2<String, Integer>> sortedStream = stream.keyBy(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).sortBy(tuple -> tuple.f1, true);

Partition

DataStream<String> stream = ...;// 按照Hash值进行分区
DataStream<String> partitionedStream = stream.keyBy(str -> str.hashCode() % 10).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce((value1, value2) -> value1 + value2);

Iterate

DataStream<Long> initialStream = ...;// 迭代计算
IterativeStream<Long> iterativeStream = initialStream.iterate();
DataStream<Long> iterationBody = iterativeStream.map(value -> value + 1).filter(value -> value < 100);
iterativeStream.closeWith(iterationBody);DataStream<Long> resultStream = iterativeStream.map(value -> value * 2);

Fold

DataStream<Integer> stream = ...;// 对流中的元素进行累加
DataStream<Integer> foldedStream = stream.keyBy(value -> value % 2).window(TumblingEventTimeWindows.of(Time.seconds(10))).fold(0, (value1, value2) -> value1 + value2);

以下是一个使用 Flink 数据转换 Conclusion 的案例:

使用 Flink 数据转换 Conclusion 的案例

问题描述

我们有一个数据集,其中包含了用户的浏览记录,每条记录包含了用户 ID、浏览时间、浏览的网页 URL 等信息。我们希望对这个数据集进行分析,找出每个用户最近浏览的 3 个网页。

解决方案

我们可以使用 Flink 的数据转换 Conclusion 来解决这个问题。具体步骤如下:

  1. 读取数据集,并将每条记录转换成一个元组,其中包含了用户 ID、浏览时间和网页 URL。
  2. 按照用户 ID 进行分组,然后对每个用户的浏览记录按照浏览时间进行排序。
  3. 对于每个用户,使用滑动窗口来获取最近浏览的 3 个网页。具体来说,我们可以使用 window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1))) 来定义一个大小为 10 分钟、滑动步长为 1 分钟的滑动窗口。
  4. 对于每个窗口,使用 reduceGroup() 函数来对窗口内的数据进行处理。具体来说,我们可以使用一个自定义的 TopNFunction 函数来实现对每个用户最近浏览的 3 个网页的查找和输出。

下面是完整的代码示例:

DataStream<Tuple3<String, Long, String>> data = env.readTextFile("input.txt").map(line -> {String[] fields = line.split(",");return new Tuple3<>(fields[0], Long.parseLong(fields[1]), fields[2]);});DataStream<Tuple3<String, Long, String>> result = data.keyBy(0).window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1))).reduceGroup(new TopNFunction(3));result.print();public static class TopNFunction implements GroupReduceFunction<Tuple3<String, Long, String>, Tuple3<String, Long, String>> {private final int n;public TopNFunction(int n) {this.n = n;}@Overridepublic void reduce(Iterable<Tuple3<String, Long, String>> iterable, Collector<Tuple3<String, Long, String>> collector) throws Exception {PriorityQueue<Tuple3<String, Long, String>> queue = new PriorityQueue<>(n, Comparator.comparingLong(t -> -t.f1));for (Tuple3<String, Long, String> t : iterable) {queue.offer(t);if (queue.size() > n) {queue.poll();}}for (Tuple3<String, Long, String> t : queue) {collector.collect(t);}}
}

结论

使用 Flink 的数据转换 Conclusion,我们可以方便地对数据集进行分析和处理,实现各种复杂的数据处理任务。在本例中,我们使用 Flink 的滑动窗口和自定义函数来查找每个用户最近浏览的 3 个网页,从而实现了对数据集的分析和处理。


http://www.ppmy.cn/news/104119.html

相关文章

Java最新版发送阿里短信教程

一、概述&#xff1a; 为什么现在的企业越来越多使用阿里云短信服务&#xff0c;究其原因是阿里云短信服务是一种可靠、高效、安全的短信发送服务&#xff0c;它具有以下优点&#xff1a; 高可靠性&#xff1a;阿里云短信服务采用全球领先的短信网关进行短信发送&#xff0c;确…

数据库基础——9.聚合函数

这篇文章来讲一下数据库中的聚合函数 目录 1. 聚合函数介绍 1.1 AVG和SUM函数 1.2 MIN和MAX函数 1.3 COUNT函数 2. GROUP BY 2.1 基本使用 2.2 使用多个列分组 2.3 GROUP BY中使用WITH ROLLUP 3. HAVING 3.1 基本使用 3.2 WHERE和HAVING的对比 4. SELECT的执…

【满分】【华为OD机试真题2023B卷 JAVAJS】字符串统计

华为OD2023(B卷)机试题库全覆盖,刷题指南点这里 字符串统计 知识点字符串 时间限制:1s 空间限制:256MB 限定语言:不限 题目描述: 给定两个字符集合,一个为全量字符集,一个为已占用字符集。已占用的字符集中的字符不能再使用,要求输出剩余可用字符集。 输入描述: 1、…

HTML <colgroup> 标签

实例 两个 colgroup 元素为表格中的三列规定了不同的对齐方式和样式(注意第一个 colgroup 元素横跨两列): <table width="100%" border="1"><colgroup span="2" align="left"></colgroup><colgroup align=&…

React面试题和基础

React的特点&#xff1a; JSX它使用虚拟DOM &#xff0c;减少 DOM 操作&#xff0c;提升性能。便于和其他平台集成。它可以进行服务器端渲染。单向数据流。组件化 双向数据绑定和单向数据流区别&#xff1f; 单向绑定的优点在于清晰可控&#xff0c;缺点则在于模板代码过多。…

MATLAB任意位置反转并拼接数组

初始化变量&#xff1a; 定义原始数组 original_array&#xff0c;例如 [1, 2, 3, 4, 5, 6]。定义仿真时间 simulationTime&#xff0c;表示遍历数组的总时间。 定义空的结果数组 result_array&#xff0c;用于存储最终生成的数组。 设置初始时间和索引&#xff1a; 初始化当前…

跨域图像识别

跨域图像识别 跨域图像识别&#xff08;Cross-domain Image Recognition&#xff09;是指在不同的数据集之间进行图像分类或识别的任务。由于不同数据集之间的分布差异&#xff0c;跨域图像识别面临着很大的挑战。 以下是几种代表性的跨域图像识别算法&#xff1a; DDC&#…

正则表达式必知必会 - 匹配单个字符

目录 一、匹配普通文本&#xff08;plain text&#xff09; 1. 匹配多个结果 2. 字母的大小写问题 二、匹配任意字符 三、匹配特殊字符 一、匹配普通文本&#xff08;plain text&#xff09; mysql> set s:Hello, my name is Ben. Please visit> my website at htt…