4.2、Flink任务怎样读取文件中的数据

news/2024/10/20 6:41:21/

目录

1、前言

2、readTextFile(已过时,不推荐使用)

3、readFile(已过时,不推荐使用)

4、fromSource(FileSource) 推荐使用


1、前言

思考: 读取文件时可以设置哪些规则呢?

         1. 文件的格式(txt、csv、二进制...)        

         2. 文件的分隔符(按\n 分割)

         3. 是否需要监控文件变化(一次读取、持续读取)

基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法


2、readTextFile(已过时,不推荐使用)

语法说明:

定义:def readTextFile(filePath: String): DataStream[String]def readTextFile(filePath: String, charsetName: String)功能:1.读取文本格式的文件2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素3.可以指定字符集(默认为UDF-8)4.文件只会读取一次源码分析:public DataStreamSource<String> readTextFile(String filePath, String charsetName) {// 初始化 TextInputFormat对象TextInputFormat format = new TextInputFormat(new Path(filePath));  // 指定路径过滤器(使用默认过滤器)format.setFilesFilter(FilePathFilter.createDefaultFilter());  // 指定Flink中的数据类型    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; // 指定字符集format.setCharsetName(charsetName);     // 调用 readFile 方法return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }

代码示例:

    public static void readTextFile() throws Exception {/** TODO 功能说明*   readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源env.readTextFile("data/1.txt").setParallelism(4).print();// 3.触发程序执行env.execute();}

3、readFile(已过时,不推荐使用)

语法说明:

定义:def readFile[T: TypeInformation](inputFormat: FileInputFormat[T],filePath: String,watchType: FileProcessingMode,interval: Long): DataStream[T] = {val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))}参数:inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)filePath    : 指定 文件路径watchType   : 指定 读取模式(提供了2个枚举值)PROCESS_ONCE :只读取一次PROCESS_CONTINUOUSLY :按照指定周期扫描文件interval    : 指定 扫描文件的周期(单位为毫秒)功能:按照 指定的 文件格式 和 读取方式 读取数据
FileInputFormat 的实现类
FileInputFormat 的实现类

代码示例:

    public static void readFile() throws Exception {/** TODO 功能说明*    readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。*    readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)*       按照指定的文件输入格式读取(持续的读取)文件* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源String filePath = "data/1.txt";TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器textInputFormat.setCharsetName("UTF-8"); // 指定编码格式/** readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)* 参数说明:*      @inputFormat : 指定文件输入格式*      @filePath    : 指定文件路径*      @watchType   : 指定监控类型,提供了两种读取策略*            PROCESS_ONCE : 只读取一次*            PROCESS_CONTINUOUSLY :持续读取,监控新增数据*      @interval : 指定连续扫描文件的周期(毫秒)* 重点提示:*      1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该*           文件的全部内容,这将会打破`精确一次`的语义* */env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).print();// 3.触发程序执行env.execute();}

4、fromSource(FileSource) 推荐使用

    public static void FileSource() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/1.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "read fileSource").print();// 3.触发程序执行env.execute();}


http://www.ppmy.cn/news/1026293.html

相关文章

win10安装mysql和c++读取调用举例

一、下载mysql8.rar解压到C盘(也可以解压到其他位置) 在系统环境变量添加JAVA_HOMEC:\myslq8&#xff0c;并在path中添加%JAVA_HOME%\bin; 二、以管理员身份进入命令窗口 三、修改配置文件指定安装路径和数据库的存放路径 四、键入如下命令初始化并启动mysql服务,然后修改登录…

【编程指南】ES2016到ES2023新特性解析一网打尽

ES2016 Array.prototype.includes() Array.prototype.includes 方法&#xff1a; 这个方法用于检查数组是否包含特定元素&#xff0c;如果包含则返回 true&#xff0c;否则返回 false // 我有一个水果篮子 const fruitBasket [apple, banana, orange, grape];// 我要检查篮…

在Linux虚拟机内配置nginx以及docker

目录 1、nginx源码包编译以及安装依赖 1、配置安装所需的编译环境 2、安装函数库&#xff08;pcre、zlib、openssl&#xff09; 2、安装nginx 1、获取源码包 2、解压编译 3、启动nginx服务 1、关闭防火墙 2、运行nginx 3、使用本地浏览器进行验证 3、安装docker 1、…

Ubuntu上硬盘挂载及卸载

1、连接硬盘 将新硬盘通过SATA接口&#xff08;或其他适当的接口&#xff09;连接到计算机上 2、检查硬盘 在Ubuntu系统中打开终端&#xff0c;使用lsblk或fdisk -l命令来查看新硬盘是否被系统识别 lsblk 或 fdisk -l 3、分区和格式化 如果新硬盘还没有分区和文件系统&…

MySQL单表查询操作(SELECT语句)

目录 步骤 1、创建表单 2、添加数据 3、显示所有职工的基本信息 4、查询所有职工所属部门的部门号&#xff0c;不显示重复的部门号 5、 求出所有职工的人数 6、 列出最高工资和最低工资 7、 列出职工的平均工资和总工资 8、 创建一个只有职工号、姓名和参加工作的新表…

24届近3年南京信息工程大学自动化考研院校分析

今天给大家带来的是南京信息工程大学控制考研分析 满满干货&#xff5e;还不快快点赞收藏 一、南京信息工程大学 学校简介 南京信息工程大学位于南京江北新区&#xff0c;是一所以大气科学为特色的全国重点大学&#xff0c;由江苏省人民政府、中华人民共和国教育部、中国气…

AI绘图(4)stable diffusion如何写好prompt 二

在进行绘制时&#xff0c;会添加很多的风格或细节来进行描述土拍你&#xff0c;当然也会由一些专业的词汇&#xff0c;如广角、相片、3d模型等&#xff0c;下面我们就会具体进行介绍。 1、「风格」的提示词 相片(photshop)、3D建模(3d model)、装饰艺术(art deco)、石像(ston…

【muduo】关于自动增长的缓冲区

目录 为什么需要缓冲区自动增长的缓冲区buffer数据结构buffer类 写详细比较费时间&#xff0c;就简单总结下。 总结自Linux 多线程服务端编程&#xff1a;使用 muduo C 网络库 Muduo网络编程&#xff1a; IO-multiplexnon-blocking 为什么需要缓冲区 Non-blocking IO 的核心…