Flink将数据写入CSV文件后文件中没有数据

news/2024/10/31 5:30:24/

Flink中有一个过时的sink方法:writeAsCsv,这个方法是将数据写入CSV文件中,有时候我们会发现程序启动后,打开文件查看没有任何数据,日志信息中也没有任何报错,这里我们结合源码分析一下这个原因.

这里先看一下数据处理的代码
代码中我是使用的自定义数据源生产数据的方式,为了方便测试

import lombok.*;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/6/19* @Description: 自定义数据源测试**/
public class FlinkCustomizeSource {public static void main(String[] args) throws Exception {// 创建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(1); // 这里的并行度设置为几就会生成多少个csv文件// 添加自定义数据源DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(new customizeSource());// 先将数据转换成Tuple类型,这样才能写入csv中SingleOutputStreamOperator<Tuple4<String, Integer, String, String>> tuple4Stream = dataStreamSource.map(bean -> Tuple4.of(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit())).returns(new TypeHint<Tuple4<String, Integer, String, String>>() {});// 选择csv类型的sink,模式使用的覆盖tuple4Stream.writeAsCsv("/Users/xxx/data/testData/test.csv", FileSystem.WriteMode.OVERWRITE);env.execute();}
}// 自定义数据源需要实现SourceFunction接口,注意这个接口是单机的数据源,如果是想自定义分布式的数据源需要集成RichParallelSourceFunction类
class customizeSource implements SourceFunction<CustomizeBean> {int flag;// Job执行的线程@Overridepublic void run(SourceContext ctx) throws Exception {/*这个方法里就是具体的数据逻辑,实际内容要根据业务需求编写,这里只是为了演示方便*/CustomizeBean customizeBean = new CustomizeBean();String[] genders = {"M", "W"};String[] hobbits = {"篮球运动爱好者", "钓鱼爱好者", "乒乓球运动爱好者", "美食爱好者", "羽毛球运动爱好者", "天文知识爱好者", "旅游爱好者", "书法爱好者", "非遗文化爱好者", "网吧战神"};while (flag != 100) {// 这里自定义的Bean作为数据源customizeBean.setAge(RandomUtils.nextInt(18, 80)); // 年龄customizeBean.setName("A-" + new Random().nextInt()); // 姓名customizeBean.setGender(genders[RandomUtils.nextInt(0, genders.length)]); // 性别customizeBean.setHobbit(hobbits[RandomUtils.nextInt(0, hobbits.length)]); // 爱好// 将数据收集ctx.collect(customizeBean);// 睡眠时间是为了控制数据生产的速度,演示效果更加明显Thread.sleep(1000);}}// Job取消时就会调用cancel方法@Overridepublic void cancel() {// flag为100时就会停止程序flag = 100;}
}@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
class CustomizeBean{private String name;private int age;private String gender;private String hobbit;
}

上面的代码中我们使用自定义数据源的方式(java bean[CustomizeBean]),通过设置Thread.sleep(1000)可以固定每秒生成一条数据.这里我们先看一下存储CSV文件的目录
在这里插入图片描述
通过上图可以看到程序没有启动时,目录是空的,这里我们启动一下程序
日志内容如下

[2023-06-19 15:26:37,755]-[INFO] -org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader -3206 -org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader.load(StateChangelogStorageLoader.java:98).load(98) | Creating a changelog storage with name 'memory'.
[2023-06-19 15:26:37,766]-[INFO] -org.apache.flink.runtime.taskexecutor.TaskExecutor -3217 -org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:757).submitTask(757) | Received task Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203), deploy into slot with allocation id b691e34573507d585516decbedb36384.
[2023-06-19 15:26:37,768]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3219 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from CREATED to DEPLOYING.
[2023-06-19 15:26:37,769]-[INFO] -org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl -3220 -org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl.markExistingSlotActive(TaskSlotTableImpl.java:388).markExistingSlotActive(388) | Activate slot b691e34573507d585516decbedb36384.
[2023-06-19 15:26:37,773]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3224 -org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:623).doRun(623) | Loading JAR files for task Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) [DEPLOYING].
[2023-06-19 15:26:37,788]-[INFO] -org.apache.flink.streaming.runtime.tasks.StreamTask -3239 -org.apache.flink.runtime.state.StateBackendLoader.loadFromApplicationOrConfigOrDefaultInternal(StateBackendLoader.java:257).loadFromApplicationOrConfigOrDefaultInternal(257) | No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4e1fcd2f
[2023-06-19 15:26:37,789]-[INFO] -org.apache.flink.runtime.state.StateBackendLoader -3240 -org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:315).fromApplicationOrConfigOrDefault(315) | State backend loader loads the state backend as HashMapStateBackend
[2023-06-19 15:26:37,789]-[INFO] -org.apache.flink.streaming.runtime.tasks.StreamTask -3240 -org.apache.flink.runtime.state.CheckpointStorageLoader.createJobManagerCheckpointStorage(CheckpointStorageLoader.java:274).createJobManagerCheckpointStorage(274) | Checkpoint storage is set to 'jobmanager'
[2023-06-19 15:26:37,793]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3244 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from DEPLOYING to INITIALIZING.
[2023-06-19 15:26:37,795]-[INFO] -org.apache.flink.runtime.executiongraph.ExecutionGraph -3246 -org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1416).transitionState(1416) | Source: Custom Source -> Map -> Sink: Unnamed (1/1) (965035c5eef2b8f28ffcfc309b92e203) switched from DEPLOYING to INITIALIZING.
[2023-06-19 15:26:37,836]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3287 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from INITIALIZING to RUNNING.
[2023-06-19 15:26:37,837]-[INFO] -org.apache.flink.runtime.executiongraph.ExecutionGraph -3288 -org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1416).transitionState(1416) | Source: Custom Source -> Map -> Sink: Unnamed (1/1) (965035c5eef2b8f28ffcfc309b92e203) switched from INITIALIZING to RUNNING.

这里的日志我截取了最后的部分,可以看到没有任何报错的,我们在看一下生成的CSV文件
在这里插入图片描述
这里我们再将文件打开,看一下有没有数据
在这里插入图片描述
通过图片可以看到这个文件中是没有任何数据的.
这里我先说一下原因,然后再结合源码看一下,没有数据的原因是数据在内存中还没有达到4k的缓存,没有到这个数据量就不会将数据刷新到磁盘上,代码中我们加入了睡眠时间Thread.sleep(1000)就是为了看到这个效果,接下来我们就结合源码看一下.writeAsCsv这个方法的缓存刷新是不是4k,我们先看一下.writeAsCsv的内容,点击去源码后我们先找到下面这段代码

    @Deprecated@PublicEvolvingpublic <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) {Preconditions.checkArgument(getType().isTupleType(),"The writeAsCsv() method can only be used on data streams of tuples.");CsvOutputFormat<X> of = new CsvOutputFormat<>(new Path(path), rowDelimiter, fieldDelimiter);// 着重看这里,我们在看一下CsvOutputFormat里面的内容if (writeMode != null) {of.setWriteMode(writeMode);}return writeUsingOutputFormat((OutputFormat<T>) of);}

这里我们在点击去看CsvOutputFormat这个输出,找到如下内容

 @Overridepublic void writeRecord(T element) throws IOException {int numFields = element.getArity();for (int i = 0; i < numFields; i++) {Object v = element.getField(i);if (v != null) {if (i != 0) {this.wrt.write(this.fieldDelimiter);}if (quoteStrings) {if (v instanceof String || v instanceof StringValue) {this.wrt.write('"'); // 我们要注意到wrt这个变量this.wrt.write(v.toString());this.wrt.write('"');} else {this.wrt.write(v.toString());}} else {this.wrt.write(v.toString());}} else {if (this.allowNullValues) {if (i != 0) {this.wrt.write(this.fieldDelimiter);}} else {throw new RuntimeException("Cannot write tuple with <null> value at position: " + i);}}}// add the record delimiterthis.wrt.write(this.recordDelimiter);}

这里我们先看一下writeRecord(T element)这个方法,实际上在我们调用writeAsCsv的时候底层就是通过writeRecord方法将数据写入csv文件,我们看上面代码的时候要注意到this.wrt这个变量,通过wrt我们就可以找到,对数据刷新到磁盘定义的数据量的大小,看一下对wrt的定义,源码内容如下

    @Overridepublic void open(int taskNumber, int numTasks) throws IOException {super.open(taskNumber, numTasks);this.wrt =this.charsetName == null? new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) // 看一下这里: new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096), this.charsetName); // 还有这里}

通过上面的源码我们可以看到BufferedOutputStream的缓冲流定义死了为4096,也就是4k大小,这个参数是写死的,我们改变不了,所以在使用writeAsCsv这个方法时,代码没有报错,并且文件中也没有数据时先不要慌,通过源码先看看具体的实现逻辑,我们就可以很快定位到问题,如果代码中我将Thread.sleep(1000)这行代码删除掉的话CSV文件中很快就会有数据的,代码中我使用的自定义数据源,并且每条数据其实很小,还有睡眠1秒的限制,所以导致很久CSV文件中都没有数据生成.
文章内容写到现在也过了很久了,数据的大小也满足4k的条件了,我们看一下文件内容
在这里插入图片描述
可以看到文件中已经生成了数据,我们在看一下文件的大小
在这里插入图片描述
说到这里我想大家应该都理解了,虽然说了这么多关于writeAsCsv这个方法的内容,但是不建议大家使用这个方法毕竟属于过时的方法,用起来弊端也比较大.


http://www.ppmy.cn/news/470704.html

相关文章

红米note2能刷机没显示无服务器,我的红米note2彻底黑屏变砖了,进不了Fastboot和Recovery模式了,还能救回来吗?...

满意答案 mojsi 2020.03.09 采纳率&#xff1a;44% 等级&#xff1a;12 已帮助&#xff1a;7771人 小米2手机&#xff0c;进入recovery模式和fastboot模式的方法&#xff1a;recovery模式进入方式&#xff1a;关机按住音量上开机键recovery模式进入方式&#xff1a;关机按住…

红米手机TWRP读不了刷机包成功解决记录

我有部没用的红米4A手机&#xff0c;打算刷个魔趣试试&#xff0c;结果问题来了 我的魔趣版本要求TWRP3.2.1以上&#xff0c;好不容易找到红米4A的TWRP3.3.1 结果遇到recovery读不了刷机包问题&#xff0c;百度一下&#xff0c;说是data要格式化&#xff0c; 我用3.3.1版本格式…

红米note手机GPS定位不了

http://jingyan.baidu.com/article/380abd0a71bc061d90192ce2.html 方法/步骤 打开红米的“安全中心” 选择“授权管理” 点击“应用权限管理” 选择“应用管理” 选择地图程序 将“定位”勾选&#xff0c;再重新打开导航&#xff0c;GPS就可以定位了。 来自 “ ITPUB博…

红米除线刷的另外一种救砖方法fastboot

原文来自&#xff1a;https://jingyan.baidu.com/article/48a42057e945bca9242504d7.html &#xff0c; 按照它操做了一下&#xff0c;虽然没有救活我的红米1&#xff0c;但是让我更好的了解了红米的fastboot功能&#xff0c;可以留着作参考。斜体的内容是我加入的&#xff0c;…

红米note9 4G无缘无故不开机 重启后不开机 插充电器无反应 通病维修视频教程

红米note9 4G 不开机 通病 红米note9进9008 红米note9重启&#xff0c;不是电源虚焊&#xff0c;不是cpu和字库虚焊 返修 总结起来大概是换电源IC后可以开机&#xff0c;重新固化电源IC或CPU可以开机&#xff0c;但是用几天后还是会死机没反应&#xff0c;故障依旧。无限返修。…

山东泰安电力学校,华为ensp考试

文章目录 一、考试要求二、作者的拓扑图&#xff0c;作者的x27&#xff0c;y5三、每个设备的代码&#xff08;可直接复制粘贴运行&#xff0c;端口和连线要一样&#xff09;SW1SW2R0R1R2 四、每个部分的有运行截图SW1SW2R0R1R2 五、运行成功截图 一、考试要求 考试初始化文件下…

接口自动化项目持续集成到jenkins

目录 前言 一、jdk安装 二、maven的安装和配置 三、git的安装和配置 四、jenkins下载及运行 五、构建项目 总结&#xff1a; 前言 之前项目将接口自动化项目持续集成到了jenkins上&#xff0c;这次专门写篇文章记录下。 持续集成时&#xff0c;主要分为以下几个步骤&…

【C语言初阶】带你玩转C语言中的数组,并逐步实现冒泡排序,三子棋,扫雷

君兮_的个人主页 勤时当勉励 岁月不待人 C/C 游戏开发 数组的使用 前言一维数组1.一维数组的定义数组的分类 2.数组的初始化第一种越界情况 3.数组的使用数组的下标&#xff1a;第二种越界情况 4.数组在内存中的存储 二维数组1.二维数组的创建2.二维数组的初始化3.二维数组的…