Flink DataSet API

embedded/2025/1/9 0:16:13/

文章目录

      • DataSet Sources
      • DataSet Transformation
      • DataSet Sink
      • 序列化器
      • 样例一:读 csv 文件生成 csv 文件
      • 样例二:读 starrocks 写 starrocks
      • 样例三:DataSet、Table Sql 处理后写入 StarRocks
      • `DataSet<Row>` 遍历
      • 遇到的坑

分类:

  • Source:数据源创建初始数据集,例如来自文件或 Java 集合。
  • Transformation:数据转换将一个或多个 DataSet 转换为新的 DataSet
  • Sink:将计算结果存储或返回

DataSet Sources

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 从本地文件系统读
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");// 读取HDFS文件
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");// 读取CSV文件
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").types(Integer.class, String.class, Double.class);// 读取CSV文件中的部分
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").includeFields("10010").types(String.class, Double.class);// 读取CSV映射为一个java类
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").pojoType(Person.class, "name", "age", "zipcode");// 读取一个指定位置序列化好的文件
DataSet<Tuple2<IntWritable, Text>> tuples =env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");// 从输入字符创建
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");// 创建一个数字序列
DataSet<Long> numbers = env.generateSequence(1, 10000000);// 从关系型数据库读取
DataSet<Tuple2<String, Integer> dbData =
env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("select name, age from persons")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish());

DataSet Transformation

可参考:
Flink从入门到放弃(入门篇3)-DataSetAPI
Flink的DataSet基本算子总结

DataSet Sink

// text data
DataSet<String> textData = // [...]// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",new TextFormatter<Tuple2<Integer, Integer>>() {public String format (Tuple2<Integer, Integer> value) {return value.f1 + " - " + value.f0;}});

  使用自定义输出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]// write Tuple DataSet to a relational database
myResult.output(// build and configure OutputFormatJDBCOutputFormat.buildJDBCOutputFormat().setDrivername("org.apache.derby.jdbc.EmbeddedDriver").setDBUrl("jdbc:derby:memory:persons").setQuery("insert into persons (name, age, height) values (?,?,?)").finish());

序列化器

  • Flink 自带了针对诸如 int,long,String 等标准类型的序列化器
  • 针对 Flink 无法实现序列化的数据类型,我们可以交给 Avro 和 Kryo

  使用方法:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();使用avro序列化:env.getConfig().enableForceAvro();
使用kryo序列化:env.getConfig().enableForceKryo();
使用自定义序列化:env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

样例一:读 csv 文件生成 csv 文件

  参考:(3)Flink学习- Table API & SQL编程

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.9.1</version></dependency><!--使用Java编程语言支持DataStream / DataSet API的Table&SQL API--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><!--表程序规划器和运行时--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency>
import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;public class SQLWordCount {public static void main(String[] args) throws Exception {// 1、获取执行环境 ExecutionEnvironment (批处理用这个对象)ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
//        DataSet<WC> input = env.fromElements(
//                WC.of("hello", 1),
//                WC.of("hqs", 1),
//                WC.of("world", 1),
//                WC.of("hello", 1)
//        );// 注册数据集
//        tEnv.registerDataSet("WordCount", input, "word, frequency");// 2、加载数据源到 DataSetDataSet<Student> csv = env.readCsvFile("D:\\tmp\\data.csv").ignoreFirstLine().pojoType(Student.class, "name", "age");// 3、将DataSet装换为TableTable students = bTableEnv.fromDataSet(csv);bTableEnv.registerTable("student", students);// 4、注册student表Table result = bTableEnv.sqlQuery("select name,age from student");result.printSchema();DataSet<Student> dset = bTableEnv.toDataSet(result, Student.class);// DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);System.out.println("count-->" + dset.count());dset.print();// 5、sink输出CsvTableSink sink1 = new CsvTableSink("D:\\tmp\\result.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);String[] fieldNames = {"name", "age"};TypeInformation[] fieldTypes = {Types.STRING, Types.INT};bTableEnv.registerTableSink("CsvOutPutTable", fieldNames, fieldTypes, sink1);result.insertInto("CsvOutPutTable");env.execute("SQL-Batch");}@Datapublic static class Student {private String name;private int age;}
}

  准备测试文件 data.csv

name,age
zhangsan,23
lisi,43
wangwu,12

  运行程序后会生成 D:\\tmp\\result.csv 文件。

样例二:读 starrocks 写 starrocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 方式一DataSource s = env.createInput(jdbcInputFormat);s.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());// 方式二
//        DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
//
//        dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
//                .setDrivername("com.mysql.jdbc.Driver")
//                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
//                .setUsername("root").setPassword("")
//                .setQuery("insert into student values(?, ?)")
//                .finish()
//        );env.execute("SQL-Batch");}
}

  数据准备:

CREATE TABLE student (name STRING,age INT
) ENGINE=OLAP 
DUPLICATE KEY(`name`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);insert into student values('zhangsan', 23);

参考:
flink 读取mysql源 JDBCInputFormat、自定义数据源
flink1.10中三种数据处理方式的连接器说明
flink读写MySQL的两种方式

注意:如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount 时报错:Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer}

  解决:报错:Flink Could not resolve substitution to a value: ${akka.stream.materializer}

    <build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>flink.KafkaDemo1</mainClass></transformer>--><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

样例三:DataSet、Table Sql 处理后写入 StarRocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);DataSet<Row> dataSource = env.createInput(jdbcInputFormat);dataSource.print();Table students = bTableEnv.fromDataSet(dataSource);bTableEnv.registerTable("student", students);Table result = bTableEnv.sqlQuery("select name, age from (select f0 as name, f1 as age from student) group by name, age");result.printSchema();DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);dset.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());env.execute("SQL-Batch");}
}

DataSet<Row> 遍历

try {dataSet.map(new MapFunction<Row, String>() {@Overridepublic String map(Row value) throws Exception {// 在这里处理每一行的数据float dateNum = (float) value.getField(2);float dateAge = (float) value.getField(3);return "dataSet 遍历完成";}}).print();
} catch (Exception e) {throw new RuntimeException(e);
}

  如果你需要转换每一行为多行输出,可以使用 FlatMapFunction

rows.flatMap(new FlatMapFunction<Row, Row>() {@Overridepublic void flatMap(Row value, Collector<Row> out) throws Exception {// 转换逻辑// 例如,复制原行并输出out.collect(value);// 如果需要多行输出,可以再次调用out.collect(...)}
}).collect();

  请注意,.collect() 方法用于强制执行计算,并获取结果到客户端。在生产代码中,你可能想要将结果输出到文件或者发送到外部系统,而不是仅仅收集到客户端。

遇到的坑

  坑1:Bang equal '!=' is not allowed under the current SQL conformance level
  解决:将 sql 中的 != 修改为 <>

  坑2:java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
  解释:在最后一行代码 env.execute() 执行的时候,没有新的数据接收器被定义,对于 Flink 批处理而前一行代码 result.print() 已经触发了代码的执行和输出,所以再执行 env.execute(),就是多余的了,因此报了上面的异常。
  解决方法:去掉最后一行代码 env.execute(); 就可以了。


http://www.ppmy.cn/embedded/151318.html

相关文章

python识别outlook邮件并且将pdf文件作为附件发送邮件

为了发送 PDF 文件作为附件&#xff0c;可以在代码的邮件发送部分添加附件功能。以下是更新后的代码&#xff0c;展示如何将 PDF 文件作为附件发送。 修改后的代码 import win32com.client import pythoncom import osclass OutlookEventHandler:def __init__(self, specifie…

uni-app 多平台分享实现指南

uni-app 多平台分享实现指南 在移动应用开发中&#xff0c;分享功能是一个非常常见的需求&#xff0c;尤其是在社交媒体、营销活动等场景中。使用 uni-app 进行多平台开发时&#xff0c;可以通过一套代码实现跨平台的分享功能&#xff0c;涵盖微信小程序、H5、App 等多个平台。…

Large-Vision-Language-Models-LVLMs--info:deepseek-vl模型

LVLMs-info: Deepseek-VL deepseek-vl paper: https://arxiv.org/abs/2403.05525, code: https://github.com/deepseek-ai/DeepSeek-VL模型基本结构&#xff1a;基本和Qwen-VL一样&#xff0c;三部分&#xff0c;a hybrid vision encoder, a vision adaptor, and a language mo…

C++ 并发专题 - std::promise 和 std::future 介绍

一&#xff1a;概述 std::promise 和 std::future 是C标准库的两种工具&#xff0c;主要用于实现线程之间的异步通信。它们属于C并发库的一部分&#xff0c;提供了一种安全&#xff0c;优雅的方式来在线程之间传递结果或状态。 二&#xff1a;std::promise 介绍 std::promise …

工业相机基本参数

分辨率&#xff08;Resolution&#xff09; 定义&#xff1a;分辨率指的是相机图像的像素数&#xff0c;通常以 宽度 x 高度 的形式表示&#xff0c;如 1920x1080 或 2592x1944。作用&#xff1a;分辨率越高&#xff0c;相机可以捕捉到更多的细节。高分辨率相机适用于需要精确…

商米电子秤服务插件

概述 SunmiScaleUTS封装商米电子秤服务模块&#xff0c;支持商米旗下S2, S2CC, S2L CC等设备&#xff0c;设备应用于超市、菜市场、水果店等,用于测量商品的重量,帮助实现快捷、准确、公正的交易等一系列商业场景。 功能说明 SDK插件下载 一. 电子秤参数 型号:S2, S2CC, …

第R3周:RNN-心脏病预测

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 文章目录 一、前言二、代码流程1、导入包&#xff0c;设置GPU2、导入数据3、数据处理4、构建RNN模型5、编译模型6、模型训练7、模型评估 电脑环境&#xff1a;…

高质量C++小白教程:2.10-预处理器简介

当你在编译项目时,你可能希望编译器完全按照你编写的方式编译每一个代码文件,当事实并非如此。 相反,在编译之前,每一个.cpp文件都会经历一个预处理的阶段,在此阶段中,称为预处理器的程序对代码文件的文本进行各种更改. 预处理器实际上不会以任何方式修改原始代码文件,预处理…