Flink算子

embedded/2024/11/18 12:51:56/

文章目录

    • map
    • filter
    • flatMap
    • keyBy
    • aggregations
    • reduce
    • 物理分区算子
    • 富函数
    • split
    • side output
    • union(联合)
    • connect(连接)

map

Map 算子会遍历数据流的每一个元素产生一个新的元素。

java"> public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperator<Integer> source = env.socketTextStream("192.168.235.130", 8888).map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String s) throws Exception {return Integer.valueOf(s)*10;}});source.print();env.execute();}

在这里插入图片描述

filter

filter算子通过一个布尔表达式对数据流的元素进行过滤,若为true则正常输出该元素,若为false则过滤掉该元素。

java">    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperator<String> filter = env.socketTextStream("192.168.235.130", 8888).filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {String[] data = s.split(",");return "10".equals(data[1]);}});filter.print();env.execute();}

在这里插入图片描述

flatMap

flatMap遍历数据流中的每一个元素产生N(N >= 0)个元素。

java">    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperator<String> flatMap = env.socketTextStream("192.168.235.130", 8888).flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {String[] data = s.split(",");for (String str : data) {collector.collect(str);}}});flatMap.print();env.execute();}

在这里插入图片描述

keyBy

在使用聚合算子之前通常要经过keyBy分组,keyBy通过指定的key将数据流中的数据划分到不同的分区,那么具有相同key的数据都被发送到同一个分区,但一个分区中可能存在不同key的数据,底层原理是通过计算key的哈希值对分区数取模来实现的,如果key是POJO类型必须重写hashCode()方法。

java"> public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(10);KeyedStream<String, String> keyedStream = env.socketTextStream("192.168.235.130", 8888).keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String s) throws Exception {String[] data = s.split(",");return data[1];}});keyedStream.print();env.execute();}

在这里插入图片描述

aggregations

aggregations包含以下聚合算子,在数据流中,sum()用于对指定的字段求和,min()对指定的字段求最小值,max()对指定的字段求最大值,maxby()取比较字段的最大值,同时非比较字段 取 最大值这条数据的值,minBy()同理,取比较字段的最小值,同时非比较字段 取 最小值这条数据的值。

java">public class WaterSensor {public String id;public Long ts;public Integer vc;// 要提供一个空参的构造器public WaterSensor() {}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}}
java">  public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<WaterSensor> sensorDS = env.fromElements(new WaterSensor("s1", 10L, 10),new WaterSensor("s1", 20L, 11),new WaterSensor("s1", 30L, 10),new WaterSensor("s2", 40L, 2),new WaterSensor("s3", 50L, 3));KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor value) throws Exception {return value.getId();}});SingleOutputStreamOperator<WaterSensor> result = sensorKS.maxBy("vc");//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.max("vc");
//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.min("vc");//  SingleOutputStreamOperator<WaterSensor> result = sensorKS.maxBy("vc");
//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.minby("vc");result.print();env.execute();}

在这里插入图片描述

reduce

reduce用于对分组完的数据流进行聚合处理,把新输入的数据和当前已经归约出来的数据进行聚合计算,因此每组的第一个元素不会执行reduce操作,需要等待同组的下一个元素到来后再进行计算。

java">  public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<WaterSensor> sensorDS = env.fromElements(new WaterSensor("s1", 10L, 1),new WaterSensor("s1", 20L, 11),new WaterSensor("s1", 30L, 21),new WaterSensor("s2", 40L, 2),new WaterSensor("s3", 50L, 3));KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor value) throws Exception {return value.getId();}});SingleOutputStreamOperator<WaterSensor> reduce = sensorKS.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("value1=" + value1);System.out.println("value2=" + value2);return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);}});reduce.print();env.execute();}

在这里插入图片描述

物理分区算子

常见的物理分区策略包含以下几种:随机分区、轮询分区、重缩放,广播,全局分区和自定义分区。

java">    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(8);DataStreamSource<String> socketDS = env.socketTextStream("192.168.235.130", 8888);// shuffle随机分区socketDS.shuffle().print();// rebalance轮询// 如果是数据源倾斜的场景,调用rebalance,就可以解决数据源的数据倾斜//  socketDS.rebalance().print();//rescale缩放:实现轮询,比rebalance更高效// socketDS.rescale().print();// broadcast广播:发送给下游所有的子任务//   socketDS.broadcast().print();// global全局:全部发往第一个子任务// socketDS.global().print();// keyby: 按指定key去发送,相同key发往同一个子任务// one-to-one: Forward分区器env.execute();}

在这里插入图片描述

富函数

Flink函数类都有对应的Rich版本,例如RichMapFunction、RichFilterFunction、RichReduceFunction等,富函数类与常规函数类的主要区别在于,富函数类可以获取运行环境的上下文,并且拥有生命周期的方法,所以富函数类能够实现更复杂的功能。

java">   public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(3);DataStreamSource<String> source = env.socketTextStream("192.168.235.130", 8888);SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<String, Integer>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("子任务编号=" + getRuntimeContext().getIndexOfThisSubtask()+ ",子任务名称=" + getRuntimeContext().getTaskNameWithSubtasks()+ ",调用open()");}@Overridepublic void close() throws Exception {super.close();System.out.println("子任务编号=" + getRuntimeContext().getIndexOfThisSubtask()+ ",子任务名称=" + getRuntimeContext().getTaskNameWithSubtasks()+ ",调用close()");}@Overridepublic Integer map(String value) throws Exception {return Integer.parseInt(value) + 1;}});map.print();env.execute();}

注: 富函数在启动时,open()调用一次,结束时,close()调用一次。

在这里插入图片描述

split

split与side output都是分流算子,分流就是定义一些筛选条件,将一条数据流拆分成多条数据流。

java">    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> source = env.socketTextStream("192.168.235.130", 8888);SingleOutputStreamOperator<String> even = source.filter(value -> Integer.valueOf(value) % 2 == 0);SingleOutputStreamOperator<String> odd = source.filter(value -> Integer.valueOf(value) % 2 == 1);even.print("偶数流");odd.print("奇数流");env.execute();}

在这里插入图片描述

split的缺点:每一个数据都要调用两次filter处理,效率低,一般不用。

side output

side output在处理数据流时,可以将数据流中的元素根据条件发送到额外的输出流中,而不需要复制整个数据流。

java">    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> source = env.socketTextStream("192.168.235.130", 8888);SingleOutputStreamOperator<WaterSensor> map = source.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {String[] data = s.split(",");return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[1]));}});OutputTag<WaterSensor> tag1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));OutputTag<WaterSensor> tag2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<WaterSensor> process = map.process(new ProcessFunction<WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {String id = value.getId();if ("s1".equals(id)) {ctx.output(tag1, value);} else if ("s2".equals(id)) {ctx.output(tag2, value);} else {out.collect(value);}}});SideOutputDataStream<WaterSensor> sideOutput1 = process.getSideOutput(tag1);SideOutputDataStream<WaterSensor> sideOutput2 = process.getSideOutput(tag2);process.print("主流");sideOutput1.printToErr("s1");sideOutput2.printToErr("s2");env.execute();}

在这里插入图片描述

union(联合)

union是最简单的合流操作,可以直接将多条数据流合在一起,但要求流中的数据类型必须相同,

java">  public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> source1 = env.fromElements(10, 20, 30, 40);DataStreamSource<Integer> source2 = env.fromElements(5, 6, 7, 8);DataStreamSource<String> source3 = env.fromElements("100", "200", "300", "400");//  DataStream<Integer> union1 = source1.union(source2, source3.map(value -> Integer.valueOf(value)));DataStream<Integer> union2 = source1.union(source3.map(value -> Integer.valueOf(value)));//   union1.print("union1");union2.print("union2");env.execute();}

在这里插入图片描述
union的缺点:要求数据类型必须相同,不能改变,缺少灵活性,所以很少用。

connect(连接)

connect每次能连接2条流,流的数据类型可以不一样,两条流连接后可以各自调用函数map、flatmap、process等处理。

java">    public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setParallelism(2);SingleOutputStreamOperator<Integer> source1 = environment.socketTextStream("192.168.235.130", 9999).map(value -> Integer.valueOf(value));DataStreamSource<String> source2 = environment.socketTextStream("192.168.235.130", 8888);ConnectedStreams<Integer, String> connect = source1.connect(source2);SingleOutputStreamOperator<Object> map = connect.map(new CoMapFunction<Integer, String, Object>() {@Overridepublic Object map1(Integer value) throws Exception {value *= 10;return "来源于数字流"+value.toString();}@Overridepublic Object map2(String value) throws Exception {return "来源于字母流"+value;}});map.print();environment.execute();}

在这里插入图片描述


http://www.ppmy.cn/embedded/138542.html

相关文章

STM32 | 空气净化器

空气净化器 一、项目背景 空气净化器又称“空气清洁器”、空气清新机、净化器&#xff0c;是指能够吸附、分解或转化各种空气污染物&#xff08;一般包括PM2.5、粉尘、花粉、异味、甲醛之类的装修污染、细菌、过敏原等&#xff09;&#xff0c;有效提高空气清洁度的产品&…

java判断点是否在多边形内(射线法)

1、我是使用的数组记录点的坐标&#xff0c;索引0为x坐标&#xff0c;1为y坐标 2、也可以使用结构体来记录点x&#xff0c;y&#xff0c;再用List管理点集合。 import java.util.Arrays; import java.util.Objects;/*** Author 宇颀休闲* Date 2024/11/10 11:03* Description…

HTTP/2新型DDoS攻击:技术深度剖析与防御指南

在智能化演进和互联网技术高速发展的背景下&#xff0c;黑客攻击手段不断翻新&#xff0c;DDoS攻击的强度、频率和复杂度也随之持续攀升。金融、政务、互联网等多个领域及其关键基础设施正面临着前所未有的DDoS攻击威胁。 一、大流量攻击&#xff1a;秒级加速的威胁 近年来&a…

Rust 语言学习笔记(五)

终于来到了 Rust 的精髓所在了&#xff0c;那就是使之不依赖于垃圾回收又能保障内存安全且高效运行的所有权系统(Ownership System)。想要用 Rust 做一个稍显规模项目必定绕不过它&#xff0c;所有权系统包括所有权(Ownership), 借用(Borrowing), 生命周期(Lifetimes)。 以下概…

CSS Modules中的 :global

最近写需求遇到如下代码&#xff0c;我们来分析一番&#xff1a; .medicine-bot {:global(.cosd-site-vcard-card) {margin-top: -3px;}:global(.cosd-site-vcard-title-text) {font-size: var(--cos-text-headline-sm);}:global(.cosd-site-vcard-button) {background-color: …

快速上手:Docker 安装详细教程(适用于 Windows、macOS、Linux)

### 快速上手&#xff1a;Docker 安装详细教程&#xff08;适用于 Windows、macOS、Linux&#xff09; --- Docker 是一款开源容器化平台&#xff0c;广泛应用于开发、测试和部署。本文将为您提供分步骤的 Docker 安装教程&#xff0c;涵盖 Windows、macOS 和 Linux 系统。 …

JMeter与大模型融合应用之JMeter日志分析服务化实战应用

JMeter与大模型融合应用之JMeter日志分析服务化 引言 在当今的互联网时代,网站和应用程序的性能直接影响到用户的体验和业务的成功。为了保证系统的稳定性和高效性,性能测试成为了软件开发过程中的一个重要环节。在这其中,Apache JMeter作为一款开源的性能测试工具,凭借其…

ISP是什么?

isp全称为Internet Service Provider&#xff0c;即互联网服务提供商&#xff0c;是一种向用户提供互联网接入服务的公司或组织&#xff0c;它们提供的服务包括互联网接入、域名注册、网站托管等等。 ISP的应用场景非常广泛&#xff0c;几乎所有的互联网用户都需要通过ISP来接…