Flink-DataStream API

server/2025/2/13 7:38:06/

一、什么样的数据可以用于流式传输

Flink的DataStream API 允许流式传输他们可以序列化的任何内容。Flink自己的序列化程序用于

  • 基本类型:即字符串、长、整数、布尔值、数组
  • 复合类型:元组、POJO和Scala样例类

基本类型我们已经很熟悉了,下面我们看下复合类型。

1、元组

对于java,Flink定义了Tuple0Tuple25类型,例如:

Tuple2<String, Integer> person = Tuple2.apply("Fred",35);String name = person._1;
Integer age = person._2;

2、POJO

如果满足以下条件,Flink将数据类型识别为POJO类型(并允许“按名称”字段引用)

  • 该类是公共且独立的(没有非静态内部类)
  • 该类有一个公共的无参数构造函数
  • 类(以及所有超类)中的所有非静态、非瞬态字段要么是公共的(和非最终的),要么具有遵循getter和setterJavabean命名约定的公共getter和setter方法。

示例:

public class Person {public String name;  public Integer age;  public Person() {}public Person(String name, Integer age) {  . . .}
}  Person person = new Person("Fred Flintstone", 35);

3、样例类

样例类(Case classes)和普通类差不多,只有几点关键差别。样例类非常适合用于不可变的数据,多用于模式匹配。

case class Book(isbn: String)val frankenstein = Book("978-0486282114")

注意在实例化样例类Book时,并没有使用关键字new,这是因为样例类有一个默认的apply方法来负责对象的创建。

二、完整示例

该示例来自官方网站,是将有关人员的记录流作为输入,并对其进行过滤以仅包含成年人

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;public class Example {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});adults.print();env.execute();}public static class Person {public String name;public Integer age;public Person() {}public Person(String name, Integer age) {this.name = name;this.age = age;}public String toString() {return this.name.toString() + ": age " + this.age.toString();}}
}

1、执行环境

每个Flink应用程序都需要一个执行环境,在该例中为env。流应用程序需要使用StreamExecutionEnvironment

在应用程序中进行的DataStream API调用会构建一个附加到StreamExecutionEnvironment的作业图。当调用env.execute()时,此图会打包并发送到JobManager,JobManager会并行化作业并将其切片分发给TaskManager执行。作业的每个并行切片都将在一个任务槽中执行。

如果不调用execute(), 应用程序则不会执行。

分布式运行时取决于您的应用程序是否可序列化。它还要求集群中的每个节点都可以使用所有依赖项。

2、source

上面的示例使用env.fromElements(...)构造DataStream<Person>。这是一种将简单流组合在一起以用于原型或测试的便捷方法。StreamExecutionEnvironment上还有一个fromCollection(Collection)方法。因此,也可以这样做:

List<Person> people = new ArrayList<Person>();people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));DataStream<Person> flintstones = env.fromCollection(people);

在原型设计时将一些数据导入流的另一种方便方法是使用socket或文件

DataStream<String> lines = env.socketTextStream("localhost", 9999);
DataStream<String> lines = env.readTextFile("file:///path");

在实际应用中,最常用的数据源是那些支持低延迟、高吞吐量并行读取以及倒带和重放的数据源——这是高性能和容错的先决条件——例如Apache Kafka、Kinesis和各种文件系统。REST API和数据库也经常使用。

3、sink

上面的示例使用adults.print()将其结果打印到任务管理器日志(在IDE中运行时将显示在IDE的控制台中)。这将在流的每个元素上调用toString()

例如输出如下:

1> Fred: age 35
2> Wilma: age 35

其中1>和2>表示哪个子任务(即线程)产生了输出。

在生产中,常用的接收器包括FileSink、各种数据库和几个发布子系统。

---------------------------------------------------------------------------------------------------------------------------------

大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

 第八届大数据与应用统计国际学术研讨会(ISBDAS 2025)

https://ais.cn/u/fEzmy2

第二届生成式人工智能与信息安全国际学术会议(GAIIS 2025)

https://ais.cn/u/uAbENn

第四届电子技术与人工智能国际学术会议(ETAI 2025)

https://ais.cn/u/vqM7Nj

第四届网络安全、人工智能与数字经济国际学术会议(CSAIDE 2025)

https://ais.cn/u/ZrERn2


http://www.ppmy.cn/server/167275.html

相关文章

5、《Spring Boot自动配置黑魔法:原理深度剖析》

Spring Boot自动配置黑魔法&#xff1a;原理深度剖析 一、引言&#xff1a;为什么Spring Boot能“开箱即用”&#xff1f; Spring Boot的核心理念是**“约定优于配置”&#xff0c;开发者只需引入一个spring-boot-starter-web依赖&#xff0c;就能直接编写RESTful API&#xf…

github不翻墙就可以访问

目录 简介资料准备windows平台设置下载运行git设置firefox设置 ubuntu平台设置下载启动服务设置系统代理git设置firefox设置证书 注意事项 简介 由于github访问不稳定,严重影响了国内软件开发,在网上搜索并验证了一些方法.现在整理出来一个可以正常使用的方法, 在windows和Lin…

使用STM32F103C8T6和ESP8266链接阿里云

一、项目简介 基于 STM32F103C8T6 单片机和 ESP8266 Wi-Fi 模块&#xff0c;旨在实现通过 Wi-Fi 连接阿里云物联网平台&#xff0c;进行数据上传和远程控制 STM32F103C8T6&#xff1a;作为核心控制单元&#xff0c;负责系统的运算、数据处理和与外设的交互。STM32F103C8T6 具有…

解决珠玑妙算游戏问题:C 语言实现

一、引言 珠玑妙算游戏&#xff08;the game of master mind&#xff09;是一个有趣的逻辑推理游戏。在编程领域&#xff0c;我们可以通过编写代码来模拟游戏中计算猜中与伪猜中次数的过程。本文将详细介绍如何使用 C 语言实现这一功能&#xff0c;并对核心代码进行解析。 二、…

eclipse配置Spring

1、从eclipse下载Spring工具 进入 help – install new software… &#xff0c;如下图&#xff1a; 点击 add &#xff0c;按以下方式输入&#xff1a; Name : Spring Location : http://dist.springsource.com/release/TOOLS/update/e4.10/ 之后点击 add &#xff0c;等待…

AH比价格策略源代码

用python 获取在A股和香港上市的公司和在A股和香港上市的公司股票代码和名称并且选出港股和A股涨幅相差比较大的股票 import akshare as akdef get_ah_stocks():# 获取A股股票列表a_stock_list ak.stock_zh_a_spot_em()print(a_stock_list)a_stock_list a_stock_list[[&quo…

Java 魔法:精准掌控 PDF 合同模板,指定页码与关键字替换签章日期

朋友们&#xff01;在实际业务场景中&#xff0c;经常会碰到处理 PDF 合同模板的需求&#xff0c;要在几十页的合同里对指定页面替换公章、签名和日期&#xff0c;还涉及多人签名以及多个公司盖公章。下面就给大家分享两种用 Java 处理这类问题的方法&#xff0c;一种是通过指定…

Python——批量图片转PDF(GUI版本)

目录 专栏导读1、背景介绍2、库的安装3、核心代码4、完整代码总结专栏导读 🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双手 🏳️‍🌈 博客主页:请点击——> 一晌小贪欢的博客主页求关注 👍 该系列文章专栏:请点击——>Python办公自动化专…