目录
- Flink数据转换方法使用案例
- Map
- FlatMap
- Filter
- KeyBy
- Reduce
- Aggregations
- Window
- Join
- Union
- Project
- Distinct
- Sort
- Partition
- Iterate
- Fold
- 使用 Flink 数据转换 Conclusion 的案例
- 问题描述
- 解决方案
- 结论
Flink数据转换方法使用案例
Apache Flink是一个分布式流处理框架,它提供了丰富的数据转换方法,可以帮助我们对数据进行各种各样的转换操作。本文将介绍Flink中常用的数据转换方法,并提供相应的使用案例。
Map
Map方法可以将输入的数据转换成另一种形式,常用于数据清洗、数据格式转换等场景。
DataStream<String> input = env.fromElements("hello world", "flink is awesome");
DataStream<String> output = input.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.toUpperCase();}
});
上述代码将输入的字符串转换成大写形式。
FlatMap
FlatMap方法可以将输入的数据转换成多个输出,常用于数据拆分、数据过滤等场景。
DataStream<String> input = env.fromElements("hello world", "flink is awesome");
DataStream<String> output = input.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(word);}}
});
上述代码将输入的字符串按照空格拆分成多个单词,并输出每个单词。
Filter
Filter方法可以根据条件过滤输入的数据,常用于数据筛选、数据清洗等场景。
DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> output = input.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value % 2 == 0;}
});
上述代码将输入的整数中的偶数过滤出来。
KeyBy
KeyBy方法可以将输入的数据按照指定的Key进行分组,常用于数据聚合、数据统计等场景。
DataStream<Tuple2<String, Integer>> input = env.fromElements(new Tuple2<>("a", 1),new Tuple2<>("b", 2),new Tuple2<>("a", 3),new Tuple2<>("b", 4)
);
DataStream<Tuple2<String, Integer>> output = input.keyBy(0).sum(1);
上述代码将输入的Tuple按照第一个元素进行分组,并对每组的第二个元素进行求和。
Reduce
Reduce方法可以对输入的数据进行归约操作,常用于数据聚合、数据统计等场景。
DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> output = input.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}
});
上述代码将输入的整数进行求和操作。
Aggregations
Aggregations方法可以对输入的数据进行聚合操作,常用于数据统计、数据分析等场景。
DataStream<Tuple2<String, Integer>> input = env.fromElements(new Tuple2<>("a", 1),new Tuple2<>("b", 2),new Tuple2<>("a", 3),new Tuple2<>("b", 4)
);
DataStream<Tuple2<String, Integer>> output = input.keyBy(0).maxBy(1);
上述代码将输入的Tuple按照第一个元素进行分组,并对每组的第二个元素进行最大值计算。
Window
Window方法可以对输入的数据进行窗口操作,常用于数据统计、数据分析等场景。
DataStream<Tuple2<String, Integer>> input = env.fromElements(new Tuple2<>("a", 1),new Tuple2<>("b", 2),new Tuple2<>("a", 3),new Tuple2<>("b", 4)
);
DataStream<Tuple2<String, Integer>> output = input.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1);
上述代码将输入的Tuple按照第一个元素进行分组,并对每组的第二个元素进行5秒的滚动窗口求和操作。
Join
DataStream<Tuple2<String, Integer>> stream1 = ...;
DataStream<Tuple2<String, Integer>> stream2 = ...;// 使用Key进行连接
stream1.join(stream2).where(tuple -> tuple.f0).equalTo(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws Exception {return new Tuple3<>(first.f0, first.f1, second.f1);}});
Union
DataStream<String> stream1 = ...;
DataStream<String> stream2 = ...;// 将两个流合并成一个流
DataStream<String> unionStream = stream1.union(stream2);
Project
DataStream<Tuple2<String, Integer>> stream = ...;// 只保留Tuple中的第一个元素
DataStream<String> projectedStream = stream.map(tuple -> tuple.f0);
Distinct
DataStream<String> stream = ...;// 去重
DataStream<String> distinctStream = stream.distinct();
Sort
DataStream<Tuple2<String, Integer>> stream = ...;// 按照Tuple中的第二个元素进行升序排序
DataStream<Tuple2<String, Integer>> sortedStream = stream.keyBy(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).sortBy(tuple -> tuple.f1, true);
Partition
DataStream<String> stream = ...;// 按照Hash值进行分区
DataStream<String> partitionedStream = stream.keyBy(str -> str.hashCode() % 10).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce((value1, value2) -> value1 + value2);
Iterate
DataStream<Long> initialStream = ...;// 迭代计算
IterativeStream<Long> iterativeStream = initialStream.iterate();
DataStream<Long> iterationBody = iterativeStream.map(value -> value + 1).filter(value -> value < 100);
iterativeStream.closeWith(iterationBody);DataStream<Long> resultStream = iterativeStream.map(value -> value * 2);
Fold
DataStream<Integer> stream = ...;// 对流中的元素进行累加
DataStream<Integer> foldedStream = stream.keyBy(value -> value % 2).window(TumblingEventTimeWindows.of(Time.seconds(10))).fold(0, (value1, value2) -> value1 + value2);
以下是一个使用 Flink 数据转换 Conclusion 的案例:
使用 Flink 数据转换 Conclusion 的案例
问题描述
我们有一个数据集,其中包含了用户的浏览记录,每条记录包含了用户 ID、浏览时间、浏览的网页 URL 等信息。我们希望对这个数据集进行分析,找出每个用户最近浏览的 3 个网页。
解决方案
我们可以使用 Flink 的数据转换 Conclusion 来解决这个问题。具体步骤如下:
- 读取数据集,并将每条记录转换成一个元组,其中包含了用户 ID、浏览时间和网页 URL。
- 按照用户 ID 进行分组,然后对每个用户的浏览记录按照浏览时间进行排序。
- 对于每个用户,使用滑动窗口来获取最近浏览的 3 个网页。具体来说,我们可以使用
window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
来定义一个大小为 10 分钟、滑动步长为 1 分钟的滑动窗口。 - 对于每个窗口,使用
reduceGroup()
函数来对窗口内的数据进行处理。具体来说,我们可以使用一个自定义的TopNFunction
函数来实现对每个用户最近浏览的 3 个网页的查找和输出。
下面是完整的代码示例:
DataStream<Tuple3<String, Long, String>> data = env.readTextFile("input.txt").map(line -> {String[] fields = line.split(",");return new Tuple3<>(fields[0], Long.parseLong(fields[1]), fields[2]);});DataStream<Tuple3<String, Long, String>> result = data.keyBy(0).window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1))).reduceGroup(new TopNFunction(3));result.print();public static class TopNFunction implements GroupReduceFunction<Tuple3<String, Long, String>, Tuple3<String, Long, String>> {private final int n;public TopNFunction(int n) {this.n = n;}@Overridepublic void reduce(Iterable<Tuple3<String, Long, String>> iterable, Collector<Tuple3<String, Long, String>> collector) throws Exception {PriorityQueue<Tuple3<String, Long, String>> queue = new PriorityQueue<>(n, Comparator.comparingLong(t -> -t.f1));for (Tuple3<String, Long, String> t : iterable) {queue.offer(t);if (queue.size() > n) {queue.poll();}}for (Tuple3<String, Long, String> t : queue) {collector.collect(t);}}
}
结论
使用 Flink 的数据转换 Conclusion,我们可以方便地对数据集进行分析和处理,实现各种复杂的数据处理任务。在本例中,我们使用 Flink 的滑动窗口和自定义函数来查找每个用户最近浏览的 3 个网页,从而实现了对数据集的分析和处理。