Flink实战教程从入门到精通(基础篇)(二)Flink快速上手

embedded/2025/3/25 21:04:17/

目录

前言:

一、环境准备

二、创建项目

1.创建工程

2、添加项目依赖

 三、WordCount代码编写(有界流)

1、批处理和流处理

2、数据准备

3、编写代码       

1、DataSet API (不推荐)(批处理)

2、DataStreaming(流处理)

总结:

四、WordCount代码编写(无界流) 

1、代码编写

2、测试数据


前言:

        对 Flink有了基本的了解后,接下来就要理论联系实际,真正上手写代码了。Flink底层是以Java编写的,并为开发人员同时提供了完整的Java和ScalaAPI。在本书中,代码示例将全部用Java 实现;而在具体项目应用中,可以根据需要选择合适语言的API进行开发。在这一章,我们将会以大家最熟悉的 IntelliJIDEA 作为开发工具,用实际项目中最常见的Maven 作为包管理工具,在开发环境中编写一个简单的 Flink项目,实现零基础快速上手。

一、环境准备

1、小编的本地测试系统环境为 Windows 11。

2、需提前安装 Java 8。

3、集成开发环境(IDE)使用IntelliJIDEA,具体的安装流程参见IntelliJ官网。

4、安装 IntelliJIDEA之后,还需要安装一些插件--Maven和Git。Maven 用来管理项目依赖;通过Git 可以轻松获取我们的示例代码,并进行本地代码的版本控制。

二、创建项目

1.创建工程

打开IntellJIDEA,创建一个 Maven 工程,如图 1-1 所示

图:1-1

导入Maven仓库,File-> Settings->Maven,如图 1-2 所示

图:1-2

2、添加项目依赖

        在项目的 pom 文件中,增加<properties>标签设置属性,然后增加<denpendencies>标签引入需要的依赖。我们需要添加的依赖最重要的就是Flink的相关组件,包括fink-java、flink-streaming-java,以及 ink-clients(客户端,也可以省略)。另外,为了方便查看运行日志,我们引入 slf4i 和 log4i 进行日志管理。 

将下面依赖导入刚新建项目的pom.xml中

    <properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>

如下图 1-3:

图 1-3 

 三、WordCount代码编写(有界流)

需求:统计一段文字中,每个单词出现的频次。

环境准备:在 src/main/java目录下,新建一个包,命名为cn.konne.wc。

1、批处理和流处理

        批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。

        流处理基本思路:一条一条的读取数据,每读到一条数据后计算结果,保存本地内存的状态,下一条数据进来后,从内存中读取之前状态,并进行计算。

2、数据准备

        在工程根目录下新建一个input 文件夹,并在下面创建文本文件 words.txte。

        在words.txt中输入一些文字,例如:

hello flink
hello worlde
hello iava

3、编写代码       

        在cn.konne.wc下面新建WordCountTest类,在类中编写main方法。并且将待会需要操作的每个步骤进行梳理,见图1-4:

图 1-4 

具体代码如下:

1、DataSet API (不推荐)(批处理)
package cn.konne.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** @author MJ* @date 2025/3/19*/
public class WordCountTest {public static void main(String[] args) throws Exception {// TODO 1.创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// TODO 2.读取数据:从文件中读取DataSource<String> lonDs = env.readTextFile("/input/words.txt");// TODO 3.切分、转换(word,1)FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lonDs.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// TODO 按照空格 切分单词、String[] words = value.split(" ");// TODO 将单词转为为(word,1)for (String word : words) {Tuple2<String, Integer> stringIntegerTuple2 = Tuple2.of(word, 1);// TODO 使用 Collector 向下游发送消息out.collect(stringIntegerTuple2);}}});// TODO 4.按照 word 分组UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordAndOne.groupBy(0);// TODO 5.各分组内聚台AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);// TODO 6.输出/sum.print();}
}

 输出结构如下:

(java,1)
(flink,1)
(world,1)
(he11o,3)

        需要注意的是,这种代码的实现方式,是基于DataSetAPI的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink1.12开始,官方推荐的做法是直接使用 DataStreamAP,在提交任务时通过将执行模式设为 BATCH来进行批处理:←S bin/flink run -Dexecution,runtime-mode=BATCH BatchWordCount.jar。

        这样,DataSet API就没什么用了,在实际应用中我们只要维护一套 DataStream API就可以。这里只是为了方便大家理解,我们依然用 DataSetAPI做了批处理的实现。

2、DataStreaming(流处理)
package cn.konne.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** DataStreaming*/
public class WordCountStream {public static void main(String[] args) throws Exception {// TODO 1、创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO 2、读取数据DataStreamSource<String> lineDs = env.readTextFile("D:\\JAVA\\konne\\words.txt");// TODO 3、处理数据SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDs.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] s = value.split(" ");for (String string : s) {out.collect(new Tuple2<>(string, 1));}}});// TODO 4、分组KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});// TODO 5、聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1);// TODO 6、输出数据sum.print();// TODO 7、执行env.execute();}
}

输出:

7> (worlde,1)
7> (flink,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
8> (iava,1)
总结:

        对于批处理流处理输出的结果,可以发现批处理的hello是一次性计算出的结果,这就是批处理的特点。而流处理的hello输出了三次,体现了流处理数据一条一条计算的结果。

四、WordCount代码编写(无界流) 

        在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据

        将 StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取 socket文本流的方法socketTextStream。具体代码实现如下:

1、代码编写

package cn.konne.un;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountUn {public static void main(String[] args) throws Exception {// TODO 1、创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO 2、读取数据 从socket读取数据DataStreamSource<String> socketDs = env.socketTextStream("hadoop102", 9999);// TODO 3、处理数据SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = socketDs.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(new Tuple2<>(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT));// TODO 4、聚合KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = tuple2SingleOutputStreamOperator.keyBy((Tuple2<String, Integer> value) -> {return value.f0;});// TODO 5、输出数据SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);sum.print();// TODO 6、执行env.execute();}
}

2、测试数据


http://www.ppmy.cn/embedded/174634.html

相关文章

第二章 EXI协议原理与实现--7.5 Efficient XML库和OpenEXI.jar编解码交叉测试

7.5 Efficient XML库和OpenEXI.jar编解码交叉测试 本节对Efficient XML库和OpenEXI.jar库进行编解码交叉测试&#xff0c;目的是验证Efficient XML库的兼容性。 7.5.1 测试方案 目标文件&#xff1a; flightdata.xml、flightdata.xsd、flightdata.cxs 由于efficientXML库默…

ManiWAV:通过野外的音频-视频数据学习机器人操作

24年6月来自斯坦福大学、哥伦比亚大学和 TRI 的论文“ManiWAV: Learning Robot Manipulation from In-the-Wild Audio-Visual Data”。 音频信号通过接触为机器人交互和物体属性提供丰富的信息。这些信息可以简化接触丰富的机器人操作技能学习&#xff0c;尤其是当视觉信息本身…

kotlin 内联函数 inline

高阶函数实现的原理&#xff1a;函数类型其实是生成了一个对象 。 inline翻译成中文的意思就是内联&#xff0c;在kotlin里面inline被用来修饰函数&#xff0c;表明当前函数在编译时是以内嵌的形式进行编译的&#xff0c;从而减少了一层函数调用栈&#xff1a; inline fun fun…

手机电脑如何通过跨平台远程控制工具来实现无缝互联

在如今数字化办公和生活的场景里&#xff0c;远程控制工具已经成了连接各种设备的关键桥梁。不管是跨系统协作、远程技术支持&#xff0c;还是让移动端和电脑端高效联动&#xff0c;用户对这些工具的要求早就从“能用就行”变成了“得用得顺手”。 接下来&#xff0c;我就从跨平…

Python 在自然语言处理(NLP)领域的应用场景和技术实现方式

Python 在自然语言处理&#xff08;NLP&#xff09;领域拥有丰富的应用场景和技术实现方式&#xff0c;涵盖从基础文本处理到复杂的深度学习模型构建。以下是基于搜索结果的详细分类总结&#xff1a; 一、基础文本处理任务 文本预处理 功能&#xff1a;清洗噪音&#xff08;如H…

LabVIEW运动控制(二):EtherCAT运动控制器的多轴示教加工应用(下)

前面两节课程分别给大家介绍了“控制器连接、定时获取轴状态、轴坐标、控制器型号、轴参数设置、IO控制、Basic文件下载”&#xff08;详情点击→LabVIEW运动控制&#xff08;二&#xff09;&#xff1a;EtherCAT运动控制器的多轴示教加工应用&#xff08;上&#xff09;&#…

存储过程在高并发环境下的重要性

在高并发系统中&#xff0c;数据库的性能和稳定性至关重要。随着系统并发请求的增加&#xff0c;SQL 语句的执行效率、事务管理以及锁机制的优化成为核心问题。存储过程&#xff08;Stored Procedure&#xff09;作为数据库内部的执行逻辑&#xff0c;能够有效提升高并发环境下…

在 ARM 嵌入式 Linux 下使用 C/C++ 实现 MQTT

在 ARM 嵌入式 Linux 下使用 C/C 实现 MQTT 通信是一个常见的需求&#xff0c;尤其是在资源受限的环境中。以下是一个详细的教程&#xff0c;使用 Eclipse Paho C Client 库来实现 MQTT 客户端。 1. 安装 Eclipse Paho C Client 库 Eclipse Paho C Client 是一个轻量级的 MQTT…