使用flink编写WordCount

server/2024/11/27 22:18:23/

1. env-准备环境

2. source-加载数据

3. transformation-数据处理转换

4. sink-数据输出

5. execute-执行

流程图:

DataStream API开发

//nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/

 添加依赖

java"><properties><flink.version>1.13.6</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@bigdata01:/opt/app</url></configuration></plugin></plugins></build>

 

编写代码

java">package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 ,根据流的性质,决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流, 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 获取数据  多态的写法 DataStreamSource 它是 DataStream 的子类DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(" ");for (String word : arr) {// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStreamcollector.collect(word);}}});//flatMapStream.print();// Tuple2 指的是2元组DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1); // ("hello",1)}});DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元素,进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();}
}

批处理结果:前面的序号代表分区

流处理结果:

也可以通过如下方式修改分区数量:

java"> env.setParallelism(2);

关于并行度的代码演示:

系统以及算子都可以设置并行度,或者获取并行度

java">package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 ,根据流的性质,决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流, 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism = env.getParallelism();System.out.println(parallelism);// 获取数据  多态的写法 DataStreamSource 它是 DataStream 的子类DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(" ");for (String word : arr) {// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStreamcollector.collect(word);}}});// 每一个算子也有自己的并行度,一般跟系统保持一致System.out.println("flatMap的并行度:"+flatMapStream.getParallelism());//flatMapStream.print();// Tuple2 指的是2元组DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1); // ("hello",1)}});DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组,进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();}
}
  1. 打包、上传

 文件夹需要提前准备好

提交我们自己开发打包的任务

java">flink run -c com.bigdata.day01.WordCount01 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

 

去界面中查看运行结果:

因为你这个是集群运行的,所以标准输出流中查看,假如第一台没有,去第二台查看,一直点。


http://www.ppmy.cn/server/145446.html

相关文章

智能工厂的设计软件 所有设计“方面”(Respect/ Aspect /Facet)的三种类(基/源/根)及设计用意 之2 鲁棒性/完整性/健壮性

Q&A Q9、今天我们从三个词&#xff08;鲁棒性/完整性/健壮性&#xff09;继续讨论智能工厂的设计软件中对应于软件架构&#xff0c;程序框架和编码实现的全部设计“方面”&#xff08;respect/ aspect /facet&#xff09;以及每个“方面”都具有的三种类&#xff08;基/源…

力扣 LRU缓存-146

LRU缓存-146 /* 定义双向链表节点&#xff0c;用于存储缓存中的每个键值对。 成员变量&#xff1a;key和value存储键值对。preb和next指向前一个和后一个节点&#xff0c;形成双向链表。 构造函数&#xff1a;默认构造函数&#xff1a;初始化空节点。参数化构造函数&#xff1…

Python实战 | 使用 Python 的日志库(logging)和 pandas 库对日志数据进行分析

专栏集锦,大佬们可以收藏以备不时之需 Spring Cloud实战专栏:https://blog.csdn.net/superdangbo/category_9270827.html Python 实战专栏:https://blog.csdn.net/superdangbo/category_9271194.html Logback 详解专栏:https://blog.csdn.net/superdangbo/category_9271…

linux学习:VM虚拟机ext4磁盘扩容操作

先把虚拟机关掉&#xff0c;在VM上面配置&#xff1a; 开机进入系统&#xff0c;查看磁盘信息 用df -h看磁盘挂载和使用情况&#xff1a; 使用fdisk -l可以看全部磁盘&#xff1a; 可以看到&#xff0c;我们磁盘已经是65G了&#xff0c;但只挂载了45G出来。 用lsblk命令&…

Python爬虫:如何优雅地获取1688商品详情接口

在当今这个信息爆炸的时代&#xff0c;数据已经成为了一种宝贵的资源。尤其是在电商领域&#xff0c;获取商品数据对于市场分析、价格比较、库存管理等业务至关重要。1688作为中国领先的B2B电商平台&#xff0c;拥有海量的商品信息。本文将详细介绍如何使用Python爬虫技术&…

文件系统的作用

在一个完整的嵌入式系统中&#xff0c;进行一个简单的操作&#xff08;如读取传感器数据并保存到文件&#xff09;通常会涉及多个步骤。这些步骤包括硬件初始化、数据采集、处理、存储以及与外部系统交互。以下是一个通用的操作流程及文件系统在其中的作用。 嵌入式系统的操作流…

零基础学安全--shell脚本学习(1)脚本创建执行及变量使用

目录 学习连接 什么是shell shell的分类 查看当前系统支持shell 学习前提 开始学习 第一种执行脚本方法 ​编辑 第二种执行脚本方法 第三种执行脚本方法 变量声明和定义 ​编辑 查看变量 删除变量 学习连接 声明&#xff01; 学习视频来自B站up主 **泷羽sec** 有兴趣…

Spring Boot 3启动加载器详解(含源码解析)

一、引言 Spring Boot 3启动加载器是提升开发效率和应用程序启动速度的关键组件。本文将详细介绍Spring Boot 3的启动加载器&#xff0c;包括其实现方式、应用场景及工作原理等。 说明&#xff1a;本文分析使用的Spring Boot源码版本为3.3.5 二、启动加载器简介 启动加载器…