使用flink编写WordCount

ops/2024/11/29 3:22:46/

1. env-准备环境

2. source-加载数据

3. transformation-数据处理转换

4. sink-数据输出

5. execute-执行

流程图:

DataStream API开发

//nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/

 添加依赖

java"><properties><flink.version>1.13.6</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@bigdata01:/opt/app</url></configuration></plugin></plugins></build>

 

编写代码

java">package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 ,根据流的性质,决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流, 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 获取数据  多态的写法 DataStreamSource 它是 DataStream 的子类DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(" ");for (String word : arr) {// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStreamcollector.collect(word);}}});//flatMapStream.print();// Tuple2 指的是2元组DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1); // ("hello",1)}});DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元素,进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();}
}

批处理结果:前面的序号代表分区

流处理结果:

也可以通过如下方式修改分区数量:

java"> env.setParallelism(2);

关于并行度的代码演示:

系统以及算子都可以设置并行度,或者获取并行度

java">package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 ,根据流的性质,决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流, 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism = env.getParallelism();System.out.println(parallelism);// 获取数据  多态的写法 DataStreamSource 它是 DataStream 的子类DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(" ");for (String word : arr) {// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStreamcollector.collect(word);}}});// 每一个算子也有自己的并行度,一般跟系统保持一致System.out.println("flatMap的并行度:"+flatMapStream.getParallelism());//flatMapStream.print();// Tuple2 指的是2元组DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1); // ("hello",1)}});DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组,进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();}
}
  1. 打包、上传

 文件夹需要提前准备好

提交我们自己开发打包的任务

java">flink run -c com.bigdata.day01.WordCount01 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

 

去界面中查看运行结果:

因为你这个是集群运行的,所以标准输出流中查看,假如第一台没有,去第二台查看,一直点。


http://www.ppmy.cn/ops/137545.html

相关文章

C语言超详细教程

系列文章目录 文章目录 系列文章目录1 运算符1.1 算术运算符:2 控制语句2.1 条件语句:2.2 循环语句:3 函数3.1 函数的定义与声明:3.2 递归函数:4 指针4.1 指针的定义与使用函数指针:5. 数组与字符串5.1 数组一维数组:相同类型元素的集合(如:多维数组:数组的数组(如:…

【多线程-第一天-多线程的技术方案-pthread演示 Objective-C语言】

一、多线程的技术方案 1.我们来看一下多线程的技术方案 技术方案 pthread:一套通用的多线程API、适用于Unix\Linux Windows等系统、跨平台、可移植、使用难度大、C语言、线程的生命周期由程序员管理、使用频率:几乎不用 NSThread:使用更加面向对象、简单易用、可直接操作…

macos 14.0 Monoma 修改顶部菜单栏颜色

macos 14.0 设置暗色后顶部菜单栏还维持浅色&#xff0c;与整体不协调。 修改方式如下&#xff1a;

小程序-基于java+SpringBoot+Vue的铁路订票平台小程序设计与实现

项目运行 1.运行环境&#xff1a;最好是java jdk 1.8&#xff0c;我们在这个平台上运行的。其他版本理论上也可以。 2.IDE环境&#xff1a;IDEA&#xff0c;Eclipse,Myeclipse都可以。推荐IDEA; 3.tomcat环境&#xff1a;Tomcat 7.x,8.x,9.x版本均可 4.硬件环境&#xff1a…

树莓派3:64位系统串口(UART)使用问题的解决方法

前言 当我们要使用串口进行zigbee的短距离通信时,发现无法使用串口. 原因 树莓派3bCPU内部有两个串口,一个硬件串口(就是我们平时使用的UART),还有一个迷你串口(mini-uart),在老版本的树莓派中把硬件串口分配在GPIO上,可以单独使用.但是在新的树莓派中官方把硬件串口给了蓝牙…

学习HTML第三十三天

学习文章目录 一.fieldset 与 legend 的使用&#xff08;了解&#xff09;二.表单总结三.框架标签 一.fieldset 与 legend 的使用&#xff08;了解&#xff09; fieldset 可以为表单控件分组、 legend 标签是分组的标题 二.表单总结 form表单&#xff1a; action 属性&#…

Python 网络爬虫进阶:动态网页爬取与反爬机制应对

在上一篇文章中&#xff0c;我们学习了如何使用 Python 构建一个基本的网络爬虫。然而&#xff0c;在实际应用中&#xff0c;许多网站使用动态内容加载或实现反爬机制来阻止未经授权的抓取。因此&#xff0c;本篇文章将深入探讨以下进阶主题&#xff1a; 如何处理动态加载的网…

【入门篇】小游戏——多语言求解版

题目链接 样例说明 样例中&#xff1a; 小明的第一次操作是查询&#xff0c;我们可以得到字符串 “010011” 中第一次出现 ‘1’ 的位置为 2 。 小明的第二次操作将下标为 2的字符反转&#xff0c;此时字符串变为 “000011” 。 小明的第三次操作是查询&#xff0c;我们可…