flink:java集成flink实现流数据处理(一)

embedded/2024/12/22 5:13:18/

文章目录

  • 0. 引言
  • 1. flink安装
  • 2. 流数据处理程序
    • 依赖包简介
    • 流处理4个部分
  • 3. 程序调用
  • 4. 总结

0. 引言

Apache Flink作为一款高性能的流处理框架,已成为企业级流数据处理的优选方案。本文将带领读者深入了解如何利用Java语言集成Flink,实现高效、可靠的流数据处理应用。本文将从Flink的基本原理入手,介绍Java与Flink的集成方法

flink_3">1. flink安装

首先java的安装不再单独说明,大家可参考其他文章部署。我们讲解下开发环境的flink安装

1、在官网下载安装包,我这里选择的是1.13.3
版本,大家可以根据自己的需要选择对应版本

下载地址:https://flink.apache.org/zh/downloads/

在这里插入图片描述

2、解压安装包,修改conf文件夹下的flink-conf.yaml配置文件

# 将任务槽调大一点,默认是1,否则运行会因为任务槽不足报错
taskmanager.numberOfTaskSlots: 30 
# 如果要调整默认端口,则修改
rest.port: 8081

3、运行bin目录下的start-cluster.sh脚本

bin/start-cluster.sh 

在这里插入图片描述

4、然后访问 对应ip:8081端口,即可进入管理页面
在这里插入图片描述
5、同时为了方面后续直接调用flink指令,而不用跟上路径名,我们配置一个环境变量,这步不是必要,如果你没配置,调用脚本时使用全路径即可

export FLINK_HOME=/Library/software/flink/flink-1.13.3
export PATH=$PATH:$FLINK_HOME/bin

2. 流数据处理程序

1、首先创建一个springboot项目

依赖包简介

2、引入依赖

对应flink相关的依赖需要单独说明下,其jar版本需要根据flink版本来定,flink 1.11之前版本使用的是scala2.11, 之后加入了对scala2.12的支持,不同的版本引入的jar名称不同,比如flink-streaming-scala包有3个,对应不同的scala版本,flink-streaming-scala则为对多个版本的兼容版
在这里插入图片描述
一般我们根据scala版本来定,比如我这里使用flink1.13, 则对应scala2.12

<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version><flink-verison>1.13.3</flink-verison></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--        flink start--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink-verison}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-verison}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink-verison}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>${flink-verison}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>${flink-verison}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.12</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>${flink-verison}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink-verison}</version></dependency><!--        flink end--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.7.1</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>

我这里将演示要用的包都添加上了,大家可以根据自己的需求选择,依赖包简介:

  • flink-streaming-java_2.12: 提供java集成flink实现流数据处理的接口
  • flink-clients_2.12:flink客户端与服务端集群交互的接口和工具
  • flink-connector-base:提供flink做各类连接的抽象类、工具等,是flink项目中的一个基础包
  • flink-connector-kafka_2.12:连接kafka的接口和工具
  • flink-connector-jdbc_2.12:连接数据库的jdbc接口和工具
  • flink-jdbc_2.12:与数据库交互的接口,可与flink-connector-jdbc_2.12配合使用
  • flink-runtime-web_2.12:本地模式提供web ui相关功能,不添加不影响运行,但是会报错FlinkException: The module flink-runtime-web could not be found in the class path. Please add this jar in order to enable web based job submission.
  • flink-streaming-scala_2.12:专门为 Scala 2.12 编译的流处理库。这个包提供了 Scala API,允许开发人员使用 Scala 编程语言构建流处理应用程序

3、配置文件application.properties中添加配置项,注意提前启动flink

flink:# flink集群的服务器ip和端口job-manager-host: localhostjob-manager-port: 8081

4、接下来我们实现一个简单的案例,来帮助我们理解java集成flink的过程

在官网有关于流数据的介绍和一些简单案例
在这里插入图片描述

流处理4个部分

我们从官网的实例可以知道,flink流数据处理实际上主要分成4个部分:
在这里插入图片描述

  • 1、StreamExecutionEnvironment.getExecutionEnvironment()执行环境创建,这是固定的书写
  • 2、输入数据的声明,这里是固定取的自定义对象来作为输入流,也可以引入固定字符串、socket流、数据库、kafka等作为输入数据源
  • 3、流数据的处理,这里只是作为一个简单的过滤,还有其他更多的方法来做复杂的数据处理操作, 实际上就是定义flink任务的算子,大家具体可以参考官方文档,后续再带大家通过案例来熟悉
    https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/learn-flink/etl/
    在这里插入图片描述
    5、下面我们来实现一个简单的词频统计的程序,数据输入就取固定文本,后续我们再讲解其他输入源

(1)先创建环境,指定并行数,输入流数据指定为固定字符串

java">// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 指定并行度,默认电脑线程数env.setParallelism(3);DataStream<String> stream = env.fromElements("Flink is a powerful framework","flink 是 一个 强大的 框架");

(2)数据处理上稍微麻烦一旦,我们先对字符串用空格分组,然后作为key, 词频为1放入Tuple2对象中,这里Tuple2的<key, value>类型与我们要放入的要保持一致

然后调用returns方法显示声明结果值类型,否则系统只能得到Tuple2泛型,而无法知道具体的Tuple2<String, Integer>类型

然后使用keyBy进行分组,条件为key,即单词名,然后调用sum做个数汇总,其参数positionToSum是指做合计的字段位置,0 是key, 1就是value,即前面放入的词频数1,或者也可以传入字段名

java">SingleOutputStreamOperator<Tuple2<String, Integer>> result = stream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(new TypeHint<Tuple2<String, Integer>>() {}).keyBy(value -> value.f0).sum(1);

(3)然后我们将其做一个打印输出,其结果输出是通过添加不同的SinkFunction来实现的

java">result.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {@Overridepublic void invoke(Tuple2<String, Integer> value, Context context) throws Exception {String world = value.getField(0);Integer count = value.getField(1);// 打印System.out.println("单词:"+world + ",次数:"+count);}});

(4)最后不要忘记,执行任务

java">// 执行
env.execute("fixed text stream job");

完整代码

java">import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;/*** @author benjamin_5* @Description 固定文本输出:统计词频* @date 2024/9/24*/
public class FixedStringJob {private static final Log logger = LogFactory.getLog(FixedStringJob.class);// 启动本地flink ./bin/start-cluster.shpublic static void main(String[] args) {try {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 指定并行度,默认电脑线程数env.setParallelism(3);DataStream<String> stream = env.fromElements("Flink is a powerful framework","flink 是 一个 强大的 框架");// 处理数据: 切换、转换、分组、聚合 得到统计结果SingleOutputStreamOperator<Tuple2<String, Integer>> result = stream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}})// 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据.returns(new TypeHint<Tuple2<String, Integer>>() {}).keyBy(value -> value.f0).sum(1);result.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {@Overridepublic void invoke(Tuple2<String, Integer> value, Context context) throws Exception {String world = value.getField(0);Integer count = value.getField(1);// 打印System.out.println("单词:"+world + ",次数:"+count);}});System.out.println("执行完成");// 执行env.execute("fixed text stream job");}catch (Exception e){e.printStackTrace();logger.error("流任务执行失败:", e);}}
}

3. 程序调用

1、通过flink的官方文档我们可以知道,其脚本的运行方式是通过flink指令调用jar脚本,然后将任务提交到flink服务端集群,在集群中执行对应任务
在这里插入图片描述
2、在idea中,我们可以直接运行类来进行调试调用,工作台可以看到打印信息
在这里插入图片描述
3、如果我们要放到服务器上运行时,就需要将其打包为jar,首先我们要调整下部署方式,并声明这里书写的class为mainClass,否则运行时会找不到主类

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.example.flink_demo.job.FixedStringJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

4、打包项目

mvn clean package -DskipTests

5、运行指定类

flink run -sae -c com.example.flink_demo.job.FixedStringJob flink_demo-0.0.1.jar

在这里插入图片描述
flink的日志目录log下可以看到打印信息
在这里插入图片描述

在这里插入图片描述

4. 总结

如上我们实现了一个简单的flink处理流数据的案例,下一期我们继续讲解实现数据库输入输出、kafka接入流数据等案例


http://www.ppmy.cn/embedded/121729.html

相关文章

开发微信小程序 基础02

WX模板 1.对比 ①标签名称不同 ②属性节点不同 ③提供类似vue的模板语法 2.模板语法 2.1数据动态绑定 2.1.1在data种定义数据 在页面对应的.js文件中&#xff0c;把数据定义到data对象中即可 例---data &#xff1a; { info : init data , msList : [{msg : hello}, { ms…

【PCL】Ubuntu22.04 安装 PCL 库

文章目录 前言一、更新系统软件包二、安装依赖项三、下载 PCL 源码四、编译和安装 PCL五、测试安装成功1、 pcd_write.cpp2、CMakeLists.txt3、build 前言 PCL&#xff08;Point Cloud Library&#xff09;是一个开源的大型项目&#xff0c;专注于2D/3D图像和点云处理。PCL为点…

C(十)for循环 --- 黑神话情景

前言&#xff1a; "踏过三界宝刹&#xff0c;阅过四洲繁华。笑过五蕴痴缠&#xff0c;舍过六根牵挂。怕什么欲念不休&#xff0c;怕什么浪迹天涯。步履不停&#xff0c;便是得救之法。" 国际惯例&#xff0c;开篇先喝碗鸡汤。 今天&#xff0c;杰哥写的 for 循环相…

负载均衡--相关面试题(六)

在负载均衡的面试中&#xff0c;可能会遇到一系列涉及概念、原理、实践应用以及技术细节的问题。以下是一些常见的负载均衡面试题及其详细解答&#xff1a; 一、什么是负载均衡&#xff1f; 回答&#xff1a;负载均衡是一种将网络请求或数据传输工作分配给多个服务器或网络资源…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-02

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-02 1. APM: Large Language Model Agent-based Asset Pricing Models Authors: Junyan Cheng, Peter Chin https://arxiv.org/abs/2409.17266 APM: 基于大型语言模型的代理资产定价模型&#xff08;LLM Agent-b…

版本发布 | IvorySQL 3.4 发版

[发行日期&#xff1a;2024年9月26日] IvorySQL 3.4基于PostgreSQL 16.4&#xff0c;并修复了多个问题。更多信息请参考文档网站。 >>>新版本体验链接&#xff1a; https://docs.ivorysql.org/cn/ivorysql-doc/v3.4/v3.4/1.html 1 增强功能 >>>PostgreSQL…

【Spring】运行Spring Boot项目,请求响应流程分析以及404和500报错

1. 运行项目 import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Appl…

什么是 HTTP 请求中的 options 请求?

在 Chrome 开发者工具中的 Network 面板看到的 HTTP 方法 OPTIONS&#xff0c;其实是 HTTP 协议的一部分&#xff0c;用于客户端和服务器之间进行“预检”或“协商”。OPTIONS 请求的作用是让客户端能够获取关于服务器支持的 HTTP 方法和其他跨域资源共享 (CORS) 相关的信息&am…