【Flink系列】2. Flink快速上手

server/2025/1/19 19:45:37/

2. Flink快速上手

2.1 创建项目

在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink程序了。首先我们要做的,就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。

1)创建工程

(1)打开IntelliJ IDEA,创建一个Maven工程。

在这里插入图片描述

####(2)将这个Maven工程命名为FlinkTutorial。
在这里插入图片描述

####(3)选定这个Maven工程所在存储路径,并点击Finish,Maven工程即创建成功。
在这里插入图片描述

2)添加项目依赖

在项目的pom文件中,添加Flink的依赖,包括flink-java、flink-streaming-java,以及flink-clients(客户端,也可以省略)。

<properties><flink.version>1.17.0</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
</dependencies>

2.2 WordCount代码编写

需求:统计一段文字中,每个单词出现的频次。
环境准备:在src/main/java目录下,新建一个包,命名为com.atguigu.wc。

2.2.1 批处理

批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。

1)数据准备
(1)在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt
(2)在words.txt中输入一些文字,例如:
hello flink
hello world
hello java

2)代码编写
(1)在com.atguigu.wc包下新建Java类BatchWordCount,在静态main方法中编写代码。具体代码实现如下:


import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据  按行读取(存储的元素就是每行的文本)DataSource<String> lineDS = env.readTextFile("input/words.txt");// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1L));}}});// 4. 按照 word 进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}
(2)输出
(flink,1)
(world,1)
(hello,3)
(java,1)

需要注意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

这样,DataSet API就没什么用了,在实际应用中我们只要维护一套DataStream API就可以。这里只是为了方便大家理解,我们依然用DataSet API做了批处理的实现。

2.2.2 流处理

对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景。
下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

1)读取文件

我们同样试图读取文档words.txt中的数据,并统计每个单词出现的频次。整体思路与之前的批处理非常类似,代码模式也基本一致。
在com.atguigu.wc包下新建Java类StreamWordCount,在静态main方法中编写代码。具体代码实现如下:


import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Arrays;public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

输出:

3> (java,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
13> (flink,1)
9> (world,1)

主要观察与批处理程序BatchWordCount的不同:

List item

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment.

  • 转换处理之后,得到的数据对象类型不同。

  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。

  • 代码末尾需要调用env的execute方法,开始执行任务。

2)读取socket文本流

在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听socket端口,然后向该端口不断的发送数据。
(1)将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。具体代码实现如下:


import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Arrays;public class SocketStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文本流:hadoop102表示发送端主机名、7777表示端口号DataStreamSource<String> lineStream = env.socketTextStream("hadoop102", 7777);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

(2)在Linux环境的主机hadoop102上,执行下列命令,发送数据进行测试

[atguigu@hadoop102 ~]$ nc -lk 7777

注意:要先启动端口,后启动StreamWordCount程序,否则会报超时连接异常。
(3)启动StreamWordCount程序
我们会发现程序启动之后没有任何输出、也不会退出。这是正常的,因为Flink的流处理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。
(4)从hadoop102发送数据
①在hadoop102主机中,输入“hello flink”,输出如下内容

13> (flink,1)
5> (hello,1)

②再输入“hello world”,输出如下内容

2> (world,1)
5> (hello,2)

说明:
Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
因为对于flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。


http://www.ppmy.cn/server/159702.html

相关文章

Windows 通过 openssh 连接 Ubuntu

Ubuntu 配置 sudo apt update sudo apt install openssh-server sudo systemctl start ssh sudo systemctl enable ssh sudo systemctl status ssh sudo ufw status sudo ufw allow ssh sudo ufw reload sudo ufw status安装 OpenSSH 服务器 首先&#xff0c;您需要安装 Open…

Web开发(二)CSS3基础与进阶

Web开发&#xff08;二&#xff09;CSS3基础与进阶 写在前面 参考黑马程序员前端Web教程做的笔记&#xff0c;主要是想后面自己搭建网页玩。 这部分是前端HTML5CSS3移动web视频教程的CSS3基础与进阶部分&#xff0c;包括CSS3的选择器、文字控制属性、背景属性、显示模式等CS…

浅谈云计算13 | 网络虚拟化

网络虚拟化 一、网络虚拟化技术原理剖析1.1 网络虚拟化基础概念2.2 关键技术原理2.2.1 虚拟交换机原理2.2.2 虚拟机网络连接原理2.2.3 网络功能虚拟化&#xff08;NFV&#xff09;原理 三、网络虚拟化的应用场景3.1 数据中心中的应用3.1.1 资源隔离与多租户支持3.1.2 网络流量优…

基于SSM实现的乡村振兴文化平台系统功能实现六

一、前言介绍&#xff1a; 1.1 项目摘要 农耕文明是广大群众在几千年的农业生产生活中智慧的结晶&#xff0c;不仅是乡土文化的核心和精髓&#xff0c;还是中华文明的起源和基因。因此&#xff0c;传承和发扬优秀乡村文化&#xff0c;是传承农耕文明的必然要求。 文化振兴是乡…

springboot基于微信小程序的智慧小区管理系统

Spring Boot基于微信小程序的智慧小区管理系统是一种集信息化、智能化于一体的现代小区管理工具。它充分利用Spring Boot框架的高效性和微信小程序的便捷性&#xff0c;为小区居民、物业管理人员以及服务提供者提供了一个全面、高效、易用的管理平台。 一、系统背景与意义 随…

Ubuntu 22.04.5 修改IP

Ubuntu22.04.5使用的是netplan管理网络&#xff0c;因此需要在文件夹/etc/netplan下的01-network-manager-all.yaml中修改&#xff0c;需要权限&#xff0c;使用sudo vim或者其他编辑器&#xff0c;修改后的内容如下&#xff1a; # Let NetworkManager manage all devices on …

通过视觉语言模型蒸馏进行 3D 形状零件分割

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01;对应英文要求比较高&#xff0c;特此说明&#xff01; Abstract This paper proposes a cross-modal distillation framework, PartDistill, which transfers 2D knowledge from vision-language models …

SSM课设-酒店管理系统功能设计

【课设者】SSM课设-酒店管理系统 分为用户端管理员端 技术栈: 后端: Spring Spring MVC MyBatis Mysql JSP 前端: HtmlCssJavaScriptAjax 功能: 用户端主要功能包括&#xff1a; 登录注册 客房预订 客房评论 首页 管理员端主要功能包括&#xff1a; 会员信息管理 客房信息…