Flink 流处理API

news/2024/12/29 18:38:28/

目录

一、环境

1.1getExecutionEnvironment

1.2createLocalEnvironment

1.3createRemoteEnvironment

二、从集合中读取数据

三、从文件中读取数据

四、从KafKa中读取数据

1.导入依赖

2.启动KafKa

3.java代码


一、环境

1.1getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

#批处理环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();#流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

设置并行度:如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认为1

 //设置并行度为8env.setParallelism(8);

1.2createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); 

1.3createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("IP",端口号,jar包路径)

二、从集合中读取数据


import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;/*** @author : Ashiamd email: ashiamd@foxmail.com* @date : 2021/1/31 5:13 PM* 测试Flink从集合中获取数据*/
public class SourceTest1_Collection {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置env并行度1,使得整个任务抢占同一个线程执行env.setParallelism(1);// Source: 从集合Collection中获取数据DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(new SensorReading("sensor_1", 1547718199L, 35.8),new SensorReading("sensor_6", 1547718201L, 15.4),new SensorReading("sensor_7", 1547718202L, 6.7),new SensorReading("sensor_10", 1547718205L, 38.1)));DataStream<Integer> intStream = env.fromElements(1,2,3,4,5,6,7,8,9);// 打印输出dataStream.print("SENSOR");intStream.print("INT");// 执行env.execute("JobName");}}

三、从文件中读取数据

文件由自己创建一个txt文件

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author : Ashiamd email: ashiamd@foxmail.com* @date : 2021/1/31 5:26 PM* Flink从文件中获取数据*/
public class SourceTest2_File {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 使得任务抢占同一个线程env.setParallelism(1);// 从文件中获取数据输出DataStream<String> dataStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");dataStream.print();env.execute();}
}

四、从KafKa中读取数据

1.导入依赖

<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.10.1</version></dependency><!-- kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.12.1</version></dependency></dependencies>

2.启动KafKa

启动Zookeeper

./bin/zookeeper-server-start.sh [config/zookeeper.properties]

启动KafKa服务

./bin/kafka-server-start.sh -daemon ./config/server.properties

启动KafKa生产者

./bin/kafka-console-producer.sh --broker-list localhost:9092  --topic sensor

3.java代码

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;/*** @author : Ashiamd email: ashiamd@foxmail.com* @date : 2021/1/31 5:44 PM*/
public class SourceTest3_Kafka {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);Properties properties = new Properties();//监听的kafka端口properties.setProperty("bootstrap.servers", "localhost:9092");// 下面这些次要参数properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");// flink添加外部数据源DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("sensor", new SimpleStringSchema(),properties));// 打印输出dataStream.print();env.execute();}
}


http://www.ppmy.cn/news/98346.html

相关文章

mysql中如何使用乐观锁和悲观锁

MySQL中可以使用SELECT ... FOR UPDATE语句来实现悲观锁。这个语句会在查询时锁定被查询的行&#xff0c;在事务结束前都不会释放锁。 例如&#xff0c;我们可以使用以下的 SQL 语句来锁定一个特定的行&#xff1a; BEGIN; SELECT * FROM table WHERE id 1 FOR UPDATE; ... C…

Redis的数据结构

一)SDS 在redis中&#xff0c;保存key的是字符串&#xff0c;value往往是字符串或者是字符串的集合&#xff0c;可见字符串是redis中最常用的一种数据结构: 但是在redis中并没有直接使用C语言的字符串&#xff0c;因为C语言的字符串存在很多问题 1)获取字符串的长度需要通过运算…

Linux:命令tar、zip、unzip对文件或文件夹进行压缩与解压

Linux&#xff1a;命令tar、zip、unzip对文件或文件夹进行压缩与解压 .tar压缩操作&#xff1a; 创建要进行压缩的文件&#xff1a; 对文件进行压缩&#xff1a; 将三个文件压缩成text.tar文件&#xff0c;压缩到当前路径下(默认也是在当前路径) 对比体积&#xff1a; 发现&…

linux命令大全(最简版)

一、基本命令 &#xff08;1&#xff09;查看、进入 ls # 查看当前目录所有文件、目录 cd /data/local/tmp # 进入/data/local/tmp目录 cd .. # 返回上一级目录 cd # 返回根目录&#xff08;2&#xff09;创建、找查、删除 mkdir dir_name # 当前目录创建…

2023全国酒店数据

数据内容字段结构 hotel_id int(11) NOT NULL, name varchar(100) DEFAULT NULL, name_en varchar(100) DEFAULT NULL, short_name varchar(100) DEFAULT NULL, province varchar(20) DEFAULT NULL, city_id int(11) DEFAULT NULL, city varchar(20…

如何在华为OD机试B卷中获得满分?Java实现【食堂供餐】一文详解

✅创作者&#xff1a;陈书予 &#x1f389;个人主页&#xff1a;陈书予的个人主页 &#x1f341;陈书予的个人社区&#xff0c;欢迎你的加入: 陈书予的社区 &#x1f31f;专栏地址: Java华为OD机试真题&#xff08;2022&2023) 文章目录 1. 题目描述2. 输入描述3. 输出描述…

重学Ajax

概述 Ajax&#xff08;Asynchronous JavaScript And XML&#xff09;即异步 JavaScript 和 XML&#xff0c;是一组用于在网页上进行异步数据交换的Web开发技术&#xff0c;可以在不刷新整个页面的情况下向服务器发起请求并获取数据&#xff0c;然后将数据插入到网页中的某个位置…

基于STM32的定时器--定时中断(HAL库)

基于STM32的定时器--定时中断&#xff08;HAL库&#xff09; 介绍引言定时器介绍 实例项目介绍准备设计流程 介绍 引言 本文旨在介绍如何使用STM32CubeMX配置KEIL 5开发一个每10us定时器中断触发一次的项目。帮助初学者入门STM32的定时器使用。 定时器介绍 定时器是STM32微…