Flink--API 之Transformation-转换算子的使用解析

embedded/2024/11/29 23:28:29/

目录

一、常用转换算子详解

(一)map 算子

(二)flatMap 算子

(三)filter 算子

(四)keyBy 算子

元组类型

POJO

(五)reduce 算子

二、合并与连接操作

(一)union 算子

(二)connect 算子

三、侧输出流(Side Outputs)

四、总结


        在大数据处理领域,Apache Flink 凭借其强大的流处理和批处理能力备受青睐。而转换算子作为 Flink 编程模型中的关键部分,能够对数据进行灵活多样的处理操作,满足各种复杂业务场景需求。本文将深入介绍 Flink 中常见的转换算子,包括 map、flatMap、filter、keyBy、reduce 等,并结合详细代码示例讲解其使用方法,同时探讨 union、connect 等合并连接操作以及侧输出流等特性,帮助读者全面掌握 Flink 转换算子的精髓。

一、常用转换算子详解

(一)map 算子

功能概述
        map 算子主要用于对输入流中的每个元素进行一对一的转换操作,基于用户自定义的映射逻辑将输入元素转换为新的输出元素。

代码示例
假设我们有一份访问日志数据,格式如下:

86.149.9.216 10001 17/05/2015:10:05:30 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:06:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:07:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:08:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:09:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:10:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:16:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:26:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css

        我们要将其转换为一个 LogBean 对象,包含访问 ip、用户 userId、访问时间戳 timestamp、访问方法 method、访问路径 path 等字段。

假如需要用到日期工具类,可以导入lang3包:

<dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version>
</dependency>

以下是代码实现:


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;public class MapDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");//3. transformation-数据处理转换// 此处也可以将数据放入到tuple中,tuple可以支持到tuple25DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split(" ");String ip = arr[0];int userId = Integer.valueOf(arr[1]);String createTime = arr[2];// 如何将一个时间字符串变为时间戳// 17/05/2015:10:05:30/*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = simpleDateFormat.parse(createTime);long timeStamp = date.getTime();*/// 要想使用这个common.lang3 下的工具类,需要导入包Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");long timeStamp = date.getTime();String method = arr[3];String path = arr[4];LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);return logBean;}});//4. sink-数据输出mapStream.print();//5. execute-执行env.execute();}
}

        在上述代码中,通过 map 函数的自定义逻辑,将每行日志字符串按空格拆分后,进行相应字段提取与时间戳转换,最终封装成 LogBean 对象输出。

第二个版本:

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;@Data
@AllArgsConstructor
class LogBean{private String ip;      // 访问ipprivate int userId;     // 用户idprivate long timestamp; // 访问时间戳private String method;  // 访问方法private String path;    // 访问路径
}
public class Demo04 {// 将数据转换为javaBeanpublic static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");//3. transformation-数据处理转换SingleOutputStreamOperator<LogBean> map = streamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split("\\s+");//时间戳转换  17/05/2015:10:06:53String time = arr[2];SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = format.parse(time);long timeStamp = date.getTime();return new LogBean(arr[0],Integer.parseInt(arr[1]),timeStamp,arr[3],arr[4]);}});//4. sink-数据输出map.print();//5. execute-执行env.execute();}
}

(二)flatMap 算子

功能概述
        flatMap 算子可以将输入流中的每个元素转换为零个、一个或多个输出元素。它适用于需要对输入元素进行展开、拆分等操作的场景。

代码示例
        假设有数据格式为“张三,苹果手机,联想电脑,华为平板”这样的文本文件,我们要将其转换为“张三有苹果手机”“张三有联想电脑”“张三有华为平板”等形式。

flatmap.log文件如:

张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板

代码如下:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\flatmap.log");//3. transformation-数据处理转换DataStream<String> flatMapStream = fileStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {//张三,苹果手机,联想电脑,华为平板String[] arr = line.split(",");String name = arr[0];for (int i = 1; i < arr.length; i++) {String goods = arr[i];collector.collect(name+"有"+goods);}}});//4. sink-数据输出flatMapStream.print();//5. execute-执行env.execute();}
}

        这里在 flatMap 函数内部,按逗号拆分每行数据,遍历拆分后的数组(除第一个元素作为名称外),通过 collector 将新组合的字符串收集输出。

(三)filter 算子

功能概述
        filter 算子依据用户定义的过滤条件,对输入流元素进行筛选,满足条件的元素继续向下游传递,不满足的则被过滤掉。

代码示例
        读取map算子中的访问日志数据,过滤出访问 IP 是 83.149.9.216 的访问日志,代码实现如下:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Date;public class FilterDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");// 数据处理转换DataStream<String> filterStream = fileStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {String ip = line.split(" ")[0];return ip.equals("83.149.9.216");}});// 数据输出filterStream.print();// 执行env.execute();}
}

        在 filter 函数中,通过拆分每行日志获取 IP 地址,并与目标 IP 进行比较决定是否保留该条日志数据。

(四)keyBy 算子

功能概述
        keyBy 算子在流处理中用于对数据按照指定的键进行分组,类似于 SQL 中的 group by,后续可基于分组进行聚合等操作,支持对元组类型 POJO 类型数据按不同方式指定分组键。

流处理中没有groupBy,而是keyBy

KeySelector对象可以支持元组类型,也可以支持POJO[Entry、JavaBean]

元组类型

单个字段keyBy

//用字段位置(已经被废弃)
wordAndOne.keyBy(0)//用字段表达式
wordAndOne.keyBy(v -> v.f0)

多个字段keyBy

//用字段位置
wordAndOne.keyBy(0, 1);//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {return Tuple2.of(value.f0, value.f1);}
});

类似于sql中的group by

select sex,count(1) from student group by sex;

group by 后面也可以跟多个字段进行分组,同样 keyBy 也支持使用多个列进行分组 

POJO

public class PeopleCount {private String province;private String city;private Integer counts;public PeopleCount() {}//省略其他代码。。。
}

单个字段keyBy

source.keyBy(a -> a.getProvince());

多个字段keyBy

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(PeopleCount value) throws Exception {return Tuple2.of(value.getProvince(), value.getCity());}
});

代码示例
        假设有数据表示不同球类的数量,格式为 Tuple2(球类名称,数量),如 Tuple2.of("篮球", 1) 等,需求是统计篮球、足球各自的总数量。代码如下:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("篮球", 1),Tuple2.of("篮球", 2),Tuple2.of("篮球", 3),Tuple2.of("足球", 3),Tuple2.of("足球", 2),Tuple2.of("足球", 3));// 数据处理转换/*KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});*/KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(tuple -> tuple.f0);keyedStream.sum(1).print();// 执行env.execute();}
}

这里通过 lambda 表达式指定 Tuple2 的第一个元素(球类名称)作为分组键,对数据分组后使用 sum 聚合统计每种球类的数量总和。

pojo演示

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-13 14:32:52**/
public class Demo07 {@Data@AllArgsConstructorstatic class Ball{private String ballName;private int num;}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//3. transformation-数据处理转换//4. sink-数据输出// 以下演示数据是pojoDataStreamSource<Ball> ballSource = env.fromElements(new Ball("篮球", 1),new Ball("篮球", 2),new Ball("篮球", 3),new Ball("足球", 3),new Ball("足球", 2),new Ball("足球", 3));ballSource.keyBy(ball -> ball.getBallName()).print();ballSource.keyBy(new KeySelector<Ball, String>() {@Overridepublic String getKey(Ball ball) throws Exception {return ball.getBallName();}});//5. execute-执行env.execute();}
}

(五)reduce 算子

功能概述
        reduce 算子可对一个数据集或一个分组进行聚合计算,将多个元素逐步合并为一个最终元素,常用于求和、求最值等场景,sum 底层其实也是基于 reduce 实现。

代码示例
        读取访问日志,统计每个 IP 地址的访问 PV 数量,代码如下:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Date;public class ReduceDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");//3. transformation-数据处理转换// 此处也可以将数据放入到tuple中,tuple可以支持到tuple25DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split(" ");String ip = arr[0];int userId = Integer.valueOf(arr[1]);String createTime = arr[2];// 如何将一个时间字符串变为时间戳// 17/05/2015:10:05:30/*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = simpleDateFormat.parse(createTime);long timeStamp = date.getTime();*/// 要想使用这个common.lang3 下的工具类,需要导入包Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");long timeStamp = date.getTime();String method = arr[3];String path = arr[4];LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);return logBean;}});DataStream<Tuple2<String, Integer>> mapStream2 = mapStream.map(new MapFunction<LogBean, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(LogBean logBean) throws Exception {return Tuple2.of(logBean.getIp(), 1);}});//4. sink-数据输出KeyedStream<Tuple2<String,Integer>, String> keyByStream = mapStream2.keyBy(tuple -> tuple.f0);// sum的底层是 reduce// keyByStream.sum(1).print();//  [ ("10.0.0.1",1),("10.0.0.1",1),("10.0.0.1",1) ]keyByStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {// t1 => ("10.0.0.1",10)// t2 => ("10.0.0.1",1)return Tuple2.of(t1.f0,  t1.f1 + t2.f1);}}).print();//5. execute-执行env.execute();}
}

        先将日志数据处理成包含 IP 和计数 1 的 Tuple2 格式,按 IP 分组后,在 reduce 函数中对相同 IP 的计数进行累加,得到每个 IP 的访问 PV 数。

二、合并与连接操作

(一)union 算子

功能概述
        union 算子能够合并多个同类型的流,将多个 DataStream 合并成一个 DataStream,但注意合并时流的类型必须一致,且不会对数据去重

代码示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionConnectDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");DataStreamSource<String> stream2 = env.fromElements("hello", "kong ni qi wa", "看电子书的人");// 合并流DataStream<String> unionStream = stream1.union(stream2);unionStream.print();// 执行env.execute();}
}

上述代码将两个包含字符串元素的流进行合并输出。

(二)connect 算子

功能概述
        connect 算子可连接 2 个不同类型的流,连接后形成 ConnectedStreams,内部两个流保持各自的数据和形式独立,之后需通过自定义处理逻辑(如 CoMapFunction 等)处理后再输出,且处理后的数据类型需相同。

代码示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class UnionConnectDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");DataStreamSource<Long> stream3 = env.fromSequence(1, 10);// 连接流ConnectedStreams<String, Long> connectStream = stream1.connect(stream3);// 处理流DataStream<String> mapStream = connectStream.map(new CoMapFunction<String, Long, String>() {@Overridepublic String map1(String value) throws Exception {return value;}@Overridepublic String map2(Long value) throws Exception {return Long.toString(value);}});// 输出mapStream.print();// 执行env.execute();}
}

        这里连接了一个字符串流和一个长整型序列流,通过 CoMapFunction 分别将字符串按原样、长整型转换为字符串后合并输出。

三、侧输出流(Side Outputs)

功能概述
        侧输出流可根据自定义规则对输入流数据进行分流,将满足不同条件的数据输出到不同的“分支”流中,方便后续针对性处理。

代码示例
以下示例对流中的数据按照奇数和偶数进行分流并获取分流后的数据:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class SideOutputExample {public static void main(String[] args) throws Exception {// 1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 侧道输出流// 从0到100生成一个Long类型的数据流作为示例数据DataStreamSource<Long> streamSource = env.fromSequence(0, 100);// 定义两个标签,用于区分偶数和奇数的侧输出流OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));OutputTag<Long> tag_odd = new OutputTag<Long>("奇数", TypeInformation.of(Long.class));// 2. source-加载数据// 使用ProcessFunction来处理每个元素,决定将其输出到哪个侧输出流或者主输出流SingleOutputStreamOperator<Long> process = streamSource.process(new ProcessFunction<Long, Long>() {@Overridepublic void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {// value代表每一个数据,判断是否为偶数if (value % 2 == 0) {// 如果是偶数,将其输出到偶数的侧输出流中ctx.output(tag_even, value);} else {// 如果是奇数,将其输出到奇数的侧输出流中ctx.output(tag_odd, value);}}});// 3. 获取奇数的所有数据,从主输出流中获取对应标签(tag_odd)的侧输出流数据DataStream<Long> sideOutput = process.getSideOutput(tag_odd);sideOutput.print("奇数:");// 获取所有偶数数据,同样从主输出流中获取对应标签(tag_even)的侧输出流数据DataStream<Long> sideOutput2 = process.getSideOutput(tag_even);sideOutput2.print("偶数:");// 4. sink-数据输出(这里通过打印展示了侧输出流的数据,实际应用中可对接其他下游操作)// 5. 执行任务env.execute();}
}

在上述代码中:

  • 首先,我们创建了 StreamExecutionEnvironment 来准备 Flink 的执行环境,并设置了运行模式为自动(AUTOMATIC)。
  • 接着,通过 fromSequence 方法生成了一个从 0 到 100 的 Long 类型的数据流作为示例的输入数据。
  • 然后,定义了两个 OutputTag,分别用于标记偶数和奇数的侧输出流,并且指定了对应的类型信息(这里都是 Long 类型)。
  • 在 process 函数中,针对每个输入的元素(value),通过判断其是否能被 2 整除来决定将其输出到对应的侧输出流中。如果是偶数,就通过 ctx.output(tag_even, value) 将其发送到偶数侧输出流;如果是奇数,则通过 ctx.output(tag_odd, value) 发送到奇数侧输出流。
  • 最后,通过 getSideOutput 方法分别获取奇数和偶数侧输出流的数据,并进行打印输出,以此展示了侧输出流的分流及获取数据的完整流程。实际应用场景中,这些侧输出流的数据可以进一步对接不同的业务逻辑进行相应处理,比如奇数流进行一种聚合计算,偶数流进行另一种统计分析等。

        这样,利用侧输出流的特性,我们可以很灵活地根据自定义条件对数据进行分流处理,满足多样化的数据处理需求。

四、总结

        本文围绕 Apache Flink 转换算子展开,旨在助力读者洞悉其核心要点与多样应用,以灵活处理复杂业务数据。

  1. 常用转换算子
    • map:基于自定义逻辑,对输入元素逐一变换,如剖析日志字符串、提取关键信息并转换格式,封装为定制对象输出,契合精细化处理需求。
    • flatMap:将输入元素按需拆分为零个及以上输出元素,凭借拆分、重组操作,挖掘数据深层价值,适配展开、细化数据场景。
    • filter:依设定条件甄别筛选,精准把控数据流向,剔除不符元素,保障下游数据贴合业务关注点。
    • keyBy:类比 SQL 分组,依指定键归拢数据,为聚合奠基,以不同方式适配多元数据类型,实现分类统计。
    • reduce:聚焦数据集或分组,渐进聚合,可求和、求最值等,sum 操作底层亦仰仗于此,高效整合数据得出汇总结果。
  2. 合并与连接操作
    • union:整合同类型多流为一,操作简便,唯需留意类型一致,虽不除重但拓宽数据维度。
    • connect:桥接不同类型流,借自定义逻辑协同处理,统一输出类型,达成跨流数据交互融合。
  3. 侧输出流特性:基于自定义规则巧妙分流,借OutputTag标记、ProcessFunction判定,将数据导向不同 “分支”,按需对接各异业务逻辑,于复杂场景中尽显灵活应变优势,全方位满足数据处理多元化诉求。

http://www.ppmy.cn/embedded/141598.html

相关文章

ES 和Kibana-v2 带用户登录验证

1. 前言 ElasticSearch、可视化操作工具Kibana。如果你是Linux centos系统的话&#xff0c;下面的指令可以一路CV完成服务的部署。 2. 服务搭建 2.1. 部署ElasticSearch 拉取docker镜像 docker pull elasticsearch:7.17.21 创建挂载卷目录 mkdir /**/es-data -p mkdir /**/…

idea中git的将A分支某次提交记录合并到B分支

一 实操案例 1.1 背景描述 在开发过程中&#xff0c;有时候需要将A分支某次提交记录功能合并到B分支上。主要原理用到git的cherry pick功能。 1.2 案例 实现的功能&#xff1a; master分支的11.24提交记录合并到feature_A分支&#xff1b; 1.master分支提交的记录 2.fea…

一款适用于教育行业的免费word插件

适用于Office和WPS的Word 功能如下&#xff1a; 1、批量生成导航编号&#xff0c;方便拖动排序题目&#xff0c;并可以一键修正自动题目编号 2、批量添加和删除题目来源&#xff0c;删除时如果选中一个区域&#xff0c;只删除选中区域内的题目来源&#xff0c;没有选中则删除…

二刷代码随想录第16天

513. 找树左下角的值 找到深度最大的点&#xff0c;遍历方式左边节点在右边节点前面&#xff0c;找到就返回&#xff0c;一定就是最左下角的值了 class Solution { public:int max_depth -1;int result 0;int findBottomLeftValue(TreeNode* root) {traversal(root, 0);ret…

ubuntu 安装proxychains

在Ubuntu上安装Proxychains&#xff0c;你可以按照以下步骤操作&#xff1a; 1、更新列表 sudo apt-update 2、安装Proxychains sudo apt-get install proxychains 3、安装完成后&#xff0c;你可以通过编辑/etc/proxychains.conf文件来配置代理规则 以下是一个简单的配置示例&…

信息技术与数据安全:打造高效、安全的数据处理系统

信息技术与数据安全&#xff1a;打造高效、安全的数据处理系统 在当今这个信息化高速发展的时代&#xff0c;数据已成为企业运营和决策的核心资源。随着大数据、云计算、人工智能等信息技术的飞速发展&#xff0c;数据处理能力得到了前所未有的提升&#xff0c;但同时也对数据…

git的使用(简洁版)

什么是 Git&#xff1f; Git 是一个分布式版本控制系统 (DVCS)&#xff0c;用于跟踪文件的更改并协调多人之间的工作。它由 Linus Torvalds 在 2005 年创建&#xff0c;最初是为了管理 Linux 内核的开发。Git 的主要目标是提供高效、易用的版本控制工具&#xff0c;使得开发者…

reactivex.Observable 超时问题

下面代码测试可知&#xff1a;超时设置需要在map之后才有效&#xff0c;换句话说就是&#xff0c;超时只对超时设置之前的代码有用 import io.reactivex.Observable; import java.util.concurrent.TimeUnit;public class TimeoutTest {public static void main(String[] args…