02. Flink 快速上手

news/2024/9/22 22:54:30/

02. Flink 快速上手

1、创建项目导入依赖

pom文件:

<properties><flink.version>1.17.0</flink.version>
</properties><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>

2、需求

批处理基本思路:先逐行读取文本,在根据空格进行单词拆分,最后再去统计每个单词出现的频率。

(1)数据准备

在工程目录下新建文件夹input,新建文本words.txt。

文件输入:

hello world
hello flink
hello java

2.1 批处理

代码编写(使用DataSet API实现)

java">package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class FlinkBatchWords {public static void main(String[] args) throws Exception {// 1、创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2、从文件中读取数据DataSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、切分、转换FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {/**** @param value     读取到的输入* @param out       返回的内容,Tuple2是一个二元分组,(字符串,个数)。* @throws Exception*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 3.1 切分for (String s : value.split(" ")) {// 3.2 将单组转为二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 将二元组发送给下游out.collect(tuple);}}});// 4、按照 word 分组UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroup = wordAndOne.groupBy(0); // 0 表示下标为0的参数,也就是二元组的String单词// 5、各分组聚合AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroup.sum(1);//1 表示下标1的元素,即单词个数// 6、输出sum.print();}
}

运行结果:

image-20240519130034466

2.2 流处理

2.2.1 有界流

代码编写(使用DataStream API实现,读取文件属于有界流)

java">package com.company.onedayflink.demo;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;@Slf4j
public class FlinkStreamWords {public static void main(String[] args) throws Exception {// 1、创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、从文件中读取数据DataStreamSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、处理数据(切换、转换、分组、聚合)// 3.1 切换、转换SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String s : value.split(" ")) {// 构建二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 通过采集器向下游发送数据out.collect(tuple);}}});// 3.2 分组, KeySelector<IN, KEY> 中 IN 表示输入的类型,KEY 表示分组key的类型KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0); // value.f0 表示二元组的第一个元素// 3.3 聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1);  // 1 表示二元组的第二个元素// 4、输出数据sum.print();// 5、执行env.execute();}
}

执行结果:

2> (java,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
6> (world,1)
8> (flink,1)

前面的编号是并行度,线程数。

2.2.2 无界流

(1)使用 netcat 监听7777端口,建立stream流

安装 netcat

brew install netcat

监听 7777 端口

nc -lk 7777

(2)代码编写(使用DataStream API实现,读取stream流属于无界流)

java">package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkSteamSocketWords {public static void main(String[] args) throws Exception {// 1、创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、读取数据(其中hostname 是需要监听的主机名称,mac电脑可以在终端使用hostname命令查看)DataStreamSource<String> socketDS = env.socketTextStream("zgyMacBook-Pro.local", 7777);// 3、数据处理(切割、转换、分组、聚合)SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {// 3.1 切分for (String s : value.split(" ")) {// 3.2 将单组转为二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 将二元组发送给下游out.collect(tuple);}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);// 4、输出sum.print();// 5、执行env.execute();}
}

(3)测试

在终端发送消息

hello flink
hello world

观察程序控制台打印

8> (flink,1)
3> (hello,1)
6> (world,1)
3> (hello,2)

http://www.ppmy.cn/news/1462932.html

相关文章

海康威视硬盘录像机NVR连接公网视频监控平台,注册失败,抓包发现有403 forbidden的问题解决

目录 一、问题描述 二、问题定位 1、查看DVR的配置 2、查看需要使用的端口是否开放 3、查看日志 4、抓包 &#xff08;1&#xff09;找出错误 &#xff08;2&#xff09;查看数据包内容 三、问题分析 1、国标28181中的域的概念 2、域应该如何定义 &#xff08;1&am…

诺兰电影欣赏笔记

2012&#xff1a;蝙蝠侠&#xff1a;黑暗骑士崛起&#xff08;Batman 3: The Dark Knight Rises&#xff09; 播放平台&#xff1a;优酷

民国漫画杂志《时代漫画》第18期.PDF

时代漫画18.PDF: https://url03.ctfile.com/f/1779803-1248612707-27e56b?p9586 (访问密码: 9586) 《时代漫画》的杂志在1934年诞生了&#xff0c;截止1937年6月战争来临被迫停刊共发行了39期。 ps:资源来源网络&#xff01;

持续总结中!2024年面试必问 20 道 Redis面试题(五)

上一篇地址&#xff1a;持续总结中&#xff01;2024年面试必问 20 道 Redis面试题&#xff08;四&#xff09;-CSDN博客 九、Redis的同步机制了解么&#xff1f; Redis 的同步机制是其复制策略的核心部分&#xff0c;确保数据在主节点&#xff08;master&#xff09;和从节点…

C++ socket epoll IO多路复用

IO多路复用通常用于处理单进程高并发&#xff0c;在Linux中&#xff0c;一切皆文件&#xff0c;一个socket连接会对应一个文件描述符&#xff0c;在监听多个文件描述符的状态应用中epoll相对于select和poll效率更高 epoll本质是系统在内核维护了一颗红黑树&#xff0c;监听的文…

软件构造复习1

一、软件构造的多维度视图&#xff1a; 共有三个维度&#xff1a;1.按阶段划分&#xff1a;构造时/运行时视图&#xff0c;2.按动态性划分&#xff1a;时刻/阶段视图&#xff0c;3.按构造对象层次划分&#xff1a;代码/构件视图 具体可如图所示&#xff08;图片来自PPT&#…

两种单例模式的区别

文章目录 看两个栗子传统指针版单例模式现代静态变量版单例模式 分析结论 看两个栗子 传统指针版单例模式 class Singleton { private:// 私有化构造函数Singleton() {}// 禁止拷贝构造函数Singleton(const Singleton&) delete;// 禁止拷贝赋值操作Singleton& opera…

什么是高精度定位平板?

高精度定位平板是一种结合了高精度定位技术和强大计算能力的平板电脑&#xff0c;广泛应用于测绘、地理信息系统&#xff08;GIS&#xff09;、精准农业、工程建设和公共安全等领域。其核心特点在于能够提供亚米级甚至厘米级的定位精度&#xff0c;远超普通GPS设备的精度。 高…