flink学习(4)——方法的使用—对流的处理(keyBy,Reduce)

devtools/2024/11/26 1:33:18/

keyBy案例

package com.bigdata.day02;public class _04_keyBy {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<String> dataStreamSource = env.readTextFile("datas/a.log");//2. source-加载数据dataStreamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String s) throws Exception {String[] line = s.split("\\s+");LogBean logBean = new LogBean();logBean.setIp(line[0]);logBean.setUserId(Integer.parseInt(line[1]));logBean.setMethod(line[3]);// 17/05/2015:10:05:30SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:hh:mm:ss");Date date = simpleDateFormat.parse(line[2]);// 另一种方法Date date1 = DateUtils.parseDate(line[2], "dd/MM/yyyy:hh:mm:ss");logBean.setTimestamp(date1.getTime());logBean.setPath(line[4]);return logBean;}}).map(new MapFunction<LogBean, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(LogBean logBean) throws Exception {return new Tuple2<>(logBean.getIp(), 1);}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}}).print();env.execute();}
}
可以认为是将key值一样的数据放在一个分区中
new KeySelector<Tuple2<String, Integer>, String>()
第一个参数表示传入的值为Tuple2 ,String表示key是该类型的
public String getKey(Tuple2<String, Integer> stringIntegerTuple2)
可以看出返回值是String  类型的,也就是说可以随意指定按照某个key进行group(有点类似)

元组类型

单个字段keyBy
//用字段位置(已经被废弃)
wordAndOne.keyBy(0)//用字段表达式
同上
多个字段keyBy
//用字段位置
wordAndOne.keyBy(0, 1);//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {return Tuple2.of(value.f0, value.f1);}
});//也就是说多个字段返回值就为Tuple类型

POJO

单个字段keyBy
source.keyBy(a -> a.getProvince());
多个字段keyBy
source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(PeopleCount value) throws Exception {return Tuple2.of(value.getProvince(), value.getCity());}
});
//也就是说多个字段返回值就为Tuple类型

Reduce案例

——sum的底层是reduce

package com.bigdata.day02;public class _04_keyBy {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<String> dataStreamSource = env.readTextFile("datas/a.log");KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = dataStreamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String s) throws Exception {String[] line = s.split("\\s+");LogBean logBean = new LogBean();logBean.setIp(line[0]);logBean.setUserId(Integer.parseInt(line[1]));logBean.setMethod(line[3]);// 17/05/2015:10:05:30Date date = DateUtils.parseDate(line[2], "dd/MM/yyyy:hh:mm:ss");logBean.setTimestamp(date.getTime());logBean.setPath(line[4]);   return logBean;}}).map(new MapFunction<LogBean, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(LogBean logBean) throws Exception {return new Tuple2<>(logBean.getIp(), 1);}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}});//        tuple2StringKeyedStream.sum(1).print();tuple2StringKeyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {return new Tuple2<>(t1.f0,t1.f1+t2.f1);}}).print();env.execute();}
}

new ReduceFunction<Tuple2<String, Integer>>()

public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2)

t1 用于表示每一条数据,t2 用于表示 结果的累计 ,

其中t1 和 t2 的第一个值是一样的,t2 的第二个值 一直再累加,而t1的为1 1 1 1 1 1


http://www.ppmy.cn/devtools/136991.html

相关文章

51c自动驾驶~合集31

我自己的原文哦~ https://blog.51cto.com/whaosoft/12121357 #大语言模型会成为自动驾驶的灵丹妙药吗 人工智能&#xff08;AI&#xff09;在自动驾驶&#xff08;AD&#xff09;研究中起着至关重要的作用&#xff0c;推动其向智能化和高效化发展。目前AD技术的发展主要遵循…

【开源风云】从若依系列脚手架汲取编程之道(八)

&#x1f4d5;开源风云系列 &#x1f34a;本系列将从开源名将若依出发&#xff0c;探究优质开源项目脚手架汲取编程之道。 &#x1f349;从不分离版本开写到前后端分离版&#xff0c;再到微服务版本&#xff0c;乃至其中好玩的一系列增强Plus操作。 &#x1f348;希望你具备如下…

论文阅读 SimpleNet: A Simple Network for Image Anomaly Detection and Localization

SimpleNet: A Simple Network for Image Anomaly Detection and Localization 摘要&#xff1a; 该论文提出了一个简单且应用友好的网络&#xff08;称为 SimpleNet&#xff09;来检测和定位异常。SimpleNet 由四个组件组成&#xff1a;&#xff08;1&#xff09;一个预先训练的…

Spring Boot与MyBatis-Plus的高效集成

Spring Boot与MyBatis-Plus的高效集成 引言 在现代 Java 开发中&#xff0c;MyBatis-Plus 作为 MyBatis 的增强工具&#xff0c;以其简化 CRUD 操作和无需编写 XML 映射文件的特点&#xff0c;受到了开发者的青睐。本篇文章将带你一步步整合 Spring Boot 与 MyBatis-Plus&…

ubuntu增加swap交换空间

论坛_开发者_技术论坛_鲲鹏_昇腾_华为 先关闭 &#xff0c;否则报错 fallocate: fallocate 失败: 文本文件忙 sudo swapoff /swapfile sudo fallocate -l 8G /swapfile sudo chmod 600 /swapfile sudo mkswap /swapfile sudo swapon /swapfile sudo vim /etc/fstab 添加…

城电科技|太阳能折叠灯:点亮你的便捷之光

朋友们&#xff0c;今天要给你们介绍一款能让生活变得更加美好的神器 —— 太阳能折叠灯&#xff01; 【超便捷折叠设计】 它就像一个百变精灵&#xff0c;轻松折叠起来后小巧玲珑。可以随意塞进背包的缝隙&#xff0c;或者放在车载储物箱里&#xff0c;完全不占地方&#xff…

Elasticsearch面试内容整理-安全与权限管理

在 Elasticsearch 中,安全与权限管理至关重要,特别是当系统处理敏感数据时。Elasticsearch 提供了一套全面的安全机制来确保数据的机密性、完整性和可用性。以下是 Elasticsearch 安全与权限管理的详细介绍。 安全组件概述 Elasticsearch 的安全功能由 Elastic Stack 提供的一…

【视觉SLAM】4b-特征点法估计相机运动之PnP 3D-2D

文章目录 0. 前言1. PnP求解1.1 直接线性变换DLT1.2 P3P1.3 光束平差法BA2. 实现0. 前言 透视n点(Perspective-n-Point,PnP)问题是计算机视觉领域的经典问题,用于求解3D-2D的点运动。换句话说,当知道 N N N个世界坐标系中3D空间点的坐标以及它们在图像上的投影点像素坐标…