大数据 flink 01 | 从零环境搭建 简单Demo 运行

devtools/2024/9/29 16:26:15/

什么是Flink

Flink是一个开源的流处理批处理框架,它能够处理无界和有界的数据流,具有高吞吐量、低延迟和容错性等特点

Flink 可以应用于多个领域如:实时数据处理、数据分析、机器学习、事件驱动等

什么是流式处理?什么是批处理

流处理是一种针对实时数据流进行连续处理的技术。它的数据通常是无界,数据以持续不断的流的形式到达。

批处理是一种将大量数据集合在一起进行统一处理的技术。在批处理中,首先要收集存储数据,批处理通常用于处理历史数据或离线数据

下载与安装

flink 依赖jdk ,版本推荐 Java 8 or 11

flink 下载与安装

本文使用的是 flink-1.17.2-bin-scala_2.12.tgz

tar -xzf flink-*.tgz

web UI 配置

vim ./conf/flink-conf.yaml

rest.bind-address: 0.0.0.0

启动与停止

 ./bin/start-cluster.sh

输入 ip:8081 进入UI 管理页面

图片

Flink WebUI 页面

一个简单的例子

新建Maven 项目

添加maven 依赖
 <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.4</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- This dependency is provided, because it should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies>
官方文档一个简单的Demo
package com.codetonight;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;public class Example {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});adults.print();env.execute();}public static class Person {public String name;public Integer age;public Person() {}public Person(String name, Integer age) {this.name = name;this.age = age;}public String toString() {return this.name.toString() + ": age " + this.age.toString();}}
}
本地 idea 运行

本地启动报java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction时,

idea 需要勾选 add dependencies with provided scope to classpath

操作路径  Edit Configurations

图片

提交任务到集群

通过UI页面提交Flink 任务,操作路径 Submit New Job -> Add New

图片

任务提交

上传jar,填写处理任务类(包含main 方法)的类全路径

图片

Jobs菜单下可以查看 运行中 和 已完成的 任务

图片

查看任务的日志

图片

图片


http://www.ppmy.cn/devtools/118767.html

相关文章

AI大火的这几年,微软做了什么有效的工作

在过去几年中&#xff0c;人工智能&#xff08;AI&#xff09;领域蓬勃发展&#xff0c;微软作为全球领先的科技公司&#xff0c;在这一领域进行了大量的投入和创新。微软不仅在AI技术研究和开发上做出了显著贡献&#xff0c;还通过其产品和服务推动了AI技术的实际应用。下面从…

jenkins汉化一部分问题

安装插件 Manage Jenkins - plugin Locale pluginLocalization: Chinese (Simplified): 默认已经安装需要确认 配置语言 Manage Jenkins - Appearance(和以前不一样) 将语言设定为zh_US&#xff0c;Jenkins切换为英文。调用restart重启Jenkins&#xff1a;http://jenkisn网…

数据结构:树的定义及其性质

树的定义 树是一种重要的非线性数据结构&#xff0c;树作为一种逻辑结构&#xff0c;同时也是一种分层结构。具有以下两个特点&#xff1a; 1.树的根结点没有前驱&#xff0c;除根结点意外的节点只有一个前驱 2.树中所有结点都可以有0个或多个后继 树结构在多个领域都有广泛…

828华为云征文|华为云Flexus X实例Windows Server 2019安装护卫神防火墙——为企业运维安全发挥重要作用!!!

前言 公司最近需要选购一台华为云Windows服务器部署产品应用&#xff0c;但是考虑到Windows的安全性至关重要。护卫神防火墙无疑是守护Windows系统安全的得力助手。 华为云以其强大的性能和稳定的服务&#xff0c;为众多企业和开发者提供了可靠的云端基础设施。在网络环境日益复…

android13 系统默认设置静态IP

android11系统的时候&#xff0c;默认静态IP设置很简单&#xff0c;修改frameworks\base\core\res\res\values\config.xml中的config_ethernet_interfaces字符数组&#xff0c;在里面添加静态IP的参数就可以了。 <string-array translatable"false" name"c…

水波荡漾效果+渲染顺序+简单UI绘制

创建场景及布置 创建新场景Main,在Main场景中创建一个plane物体&#xff0c;命名为WaterWavePla,具体数值及层级面板排布如下&#xff1a; 编写脚本 创建一个文件夹&#xff0c;用于存放脚本&#xff0c;命名Scripts,创建一个子文件夹Effect,存放特效相关脚本&#xff0c;创建…

python学习记录4

目录 (1)位运算 (2)运算的优先级 (1)位运算 位运算是将数字看做二进制数来运算的&#xff0c;位运算分为按位与&#xff08;&&#xff09;、按位或&#xff08;|&#xff09;、按位异或&#xff08;^&#xff09;、按位取反(~)。还有移位运算&#xff08;左移位<<、…

【分布式微服务云原生】K8s(Kubernetes)基本概念和使用方法

Kubernetes简称K8S,是一个强大的开源容器编排平台&#xff0c;用于自动化部署、扩展和管理容器化应用程序。它最初由Google设计&#xff0c;并由Cloud Native Computing Foundation&#xff08;CNCF&#xff09;维护。以下是Kubernetes的一些基本概念和使用方法。 基本概念 集…