Flink_DataStreamAPI_源算子Source

embedded/2024/11/17 0:20:45/

Flink_DataStreamAPI_源算子Source

  • 1从集合中读取数据
  • 2从文件读取数据
  • 3从Socket读取数据
  • 4从Kafka读取数据
  • 5从数据生成器读取数据
  • Flink支持的数据类型
    • 1)Flink的类型系统
    • 2)Flink支持的数据类型
    • 3)类型提示(Type Hints)

1从集合中读取数据

最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();List<Integer> data = Arrays.asList(1, 22, 3);DataStreamSource<Integer> ds = env.fromCollection(data);stream.print();env.execute();
}

2从文件读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。读取文件,需要添加文件连接器依赖:

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt")).build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file").print();env.execute();
}

说明:
参数可以是目录,也可以是文件;还可以从HDFS目录下读取,使用路径hdfs://…;
路径可以是相对路径,也可以是绝对路径;
相对路径是从系统属性user.dir获取路径:idea下是project的根目录,standalone模式下是集群节点根目录;

3从Socket读取数据

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。
我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStream<String> stream = env.socketTextStream("localhost", 7777);

4从Kafka读取数据

Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。
所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version>
</dependency>
public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setTopics("topic_1").setGroupId("atguigu").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");stream.print("Kafka");env.execute();}
}

5从数据生成器读取数据

Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency>
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource =new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:"+value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();env.execute();}
}

Flink支持的数据类型

1)Flink的类型系统

Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

2)Flink支持的数据类型

对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:

(1)基本类型
所有Java基本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。

(2)数组类型
包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。

(3)复合数据类型
-Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
-Scala 样例类及Scala元组:不支持空字段。
-行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
-POJO:Flink自定义的类似于Java bean模式的类。

(4)辅助类型
Option、Either、List、Map等。

(5)泛型类型(GENERIC)
Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化的。
在这些类型中,元组类型和POJO类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型。
Flink对POJO类型的要求如下:
-类是公有(public)的
-有一个无参的构造方法
-所有属性都是公有(public)的
-所有属性的类型都是可以序列化的

3)类型提示(Type Hints)

Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
为了解决这类问题,Java API提供了专门的“类型提示”(type hints)。
回忆一下之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

Flink还专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的DataStream里元素的类型。

returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

http://www.ppmy.cn/embedded/138126.html

相关文章

C++单例模式实现

单例模式&#xff08;Singleton Pattern&#xff09;是软件设计模式中的一种&#xff0c;用于确保一个类只有一个实例&#xff0c;并提供一个全局访问点来获取这个实例。 一、初始版本&#xff08;手动创建释放&#xff09; 一个类只有一个实例的实现方法&#xff1a; 隐藏构…

git命令提交项目

此为linux下的命&#xff0c; windows的话&#xff0c;去掉sudo即可 *转载至链接 http://www.eqicode.com/ 1、进入项目代码根目录&#xff0c;执行&#xff1a; sudo git init 把这个目录变成git可以管理的仓库。此时在文件加下&#xff0c;会出现一个 .git的隐藏文件&#…

LeetCode 40-组合总数Ⅱ

题目链接&#xff1a;LeetCode40 欢迎留言交流&#xff0c;每天都会回消息。 class Solution {List<List<Integer>> rs new ArrayList<>();LinkedList<Integer> path new LinkedList<>();public List<List<Integer>> combinatio…

【golang-技巧】-线上死锁问题排查-by pprof

1.背景 由于目前项目使用 cgo golang 本地不能debug, 发生死锁问题&#xff0c;程序运行和期待不一致&#xff0c;通过日志排查可以大概率找到 阻塞范围&#xff0c;但是不能找到具体问题在哪里&#xff0c;同时服务器 通过k8s daemonset 部署没有更好的方式暴露端口 获取ppr…

开发中SQL积累

1.SQL中判断varchar类型是否为空&#xff1f; 检查 NULL 值&#xff1a; WHERE column_name IS NULL 检查空字符串&#xff1a; WHERE column_name 结合 NULL 和空字符串的检查&#xff1a; WHERE column_name IS NULL OR column_name 2.TRIM函数 作用&#xff1a;…

ODC 如何精确呈现SQL耗时 | OceanBase 开发者工具解析

前言 在程序员或DBA的日常工作中&#xff0c;编写并执行SQL语句如同日常饮食中的一餐一饭&#xff0c;再寻常不过。然而&#xff0c;在使用命令行或黑屏客户端处理SQL时&#xff0c;常会遇到编写难、错误排查缓慢以及查询结果可读性不佳等难题&#xff0c;因此&#xff0c;图形…

redis高性能键值数据库技术简介

什么是redis redis是远程字典服务&#xff08;Remote Dictionary Server &#xff09;的简写&#xff0c;是一个完全开源的高性能的Key-Value数据库&#xff0c;提供了丰富的数据结构如string、Hash、List、SetSortedset等等。数据是存在内存中的&#xff0c;同时Redis支持事务…

Verilog HDL学习笔记

Verilog HDL&#xff08;Hardware Description Language&#xff09;是在一种硬件描述语言&#xff0c;类似于计算机的高级编程设计语言&#xff0c;它具有灵活性高&#xff0c;容易学习和使用等特点&#xff0c;同时Verilog能够通过文本的形式来描述数字系统的硬件结构和功能。…