flinkSQL Table转DataStream

news/2024/10/23 7:20:15/

flink版本1.14

flinksql 来源于kafka json格式数据

变化的表

业务中sql可能不完全满足使用,需要转换成DataStream 更灵活一些,所以需要互相转换,发挥各自的优势。

 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//两个来自kafka的表tEnv.executeSql(CreateTableSQL.kafkaTableInfo);tEnv.executeSql(CreateTableSQL.kafkaTablePerson);//join两个表Table tableResult = tEnv.sqlQuery(SelectSQL.selectPerMaxScore);tEnv.toDataStream(tableResult, Row.class);dataStream.flatMap(new FlatMapFunction<Row, Object>() {@Overridepublic void flatMap(Row value, Collector<Object> out) throws Exception {String s = value.getField(0) + ":" + value.getField(1);out.collect(s);}}).print();env.execute();

会发现报错

Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

那是因为在table-to-stream中,数据在发生变化,因此需要用toChangelogStream来转换

修改成如下内容:

 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//两个来自kafka的表tEnv.executeSql(CreateTableSQL.kafkaTableInfo);tEnv.executeSql(CreateTableSQL.kafkaTablePerson);//join两个表Table tableResult = tEnv.sqlQuery(SelectSQL.selectPerMaxScore);tEnv.toChangelogStream(tableResult).flatMap(new FlatMapFunction<Row, Object>() {@Overridepublic void flatMap(Row value, Collector<Object> out) throws Exception {String s = value.getField(0) + ":" + value.getField(1);out.collect(s);}}).print();env.execute();

在网上找了一圈,没有类似的文章,自己做个记录。
这里虽然也可以用createTemporaryView来把表转换成DataStream,但是这种方式的表是固定的,在实际应用的使用场景中,还是toChangelogStream的应用更广一些。

executeSql() 与 sqlQuery()

你可能会发现 有的地方用的是executeSql() 有的地方用的是 sqlQuery() ,这两者是什么不同呢,对此我特意去看了一下源码里的注释。

sqlQuery(String query)

针对sqlQuery()他是这么说的

评估对已注册表的 SQL 查询并返回一个 Table 对象,该对象描述了进一步转换的管道。查询引用的所有表和其他对象都必须在 TableEnvironment 中注册。例如,使用 createTemporaryView(String, Table)) 来引用 Table 对象或使用 createTemporarySystemFunction(String, Class) 来引用函数。
或者,当调用 Table.toString() 方法时,会自动注册 Table 对象,例如当它被嵌入到字符串中时。因此,SQL 查询可以直接内联(即匿名)引用 Table 对象,如下所示:
Table table = ...; 
String tableName = table.toString(); 
// 表没有注册到表环境 
tEnv.sqlQuery("SELECT * FROM " + tableName + " WHERE a > 12");
请注意,返回的 Table 是一个 API 对象,仅包含管道描述。它实际上对应于 SQL 术语中的视图。调用 Table.execute() 触发执行或直接使用 executeSql(String)。

executeSql(String statement)

执行给定的单个语句并返回执行结果。
语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。对于 DML 和 DQL,此方法在提交作业后返回 TableResult。对于 DDL 和 DCL 语句,操作完成后将返回 TableResult。
如果多个管道应将数据作为单个执行的一部分插入到一个或多个接收器表中,请使用 StatementSet(请参阅 createStatementSet())。
默认情况下,所有 DML 操作都是异步执行的。使用 TableResult.await() 或 TableResult.getJobClient() 来监控执行。设置 TableConfigOptions.TABLE_DML_SYNC 以始终同步执行。

需要强调的是,这两个是不能强行互相转换的,出现以下报错

java.lang.ClassCastException:org.apache.flink.table.api.internal.TableResultImpl cannot be cast to org.apache.flink.table.api.Table

还有一点, sqlQuery(String query)不能执行复杂的语句,会出现以下报错

Unsupported SQL query! sqlQuery() only accepts a single SQL query of type SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.

固定的表

如果需求只是一个固定的表可以通过这种下面的案例

		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// connector doris的一个表 PerCountNametEnv.executeSql(CreateTableSQL.PerCountNDoris);Table perCountName = tEnv.from("PerCountName");// PerCountName表对应的实体类 PerCountName.classtEnv.toDataStream(perCountName, PerCountName.class).flatMap(new FlatMapFunction<PerCountName, Object>() {@Overridepublic void flatMap(PerCountName value, Collector<Object> out) throws Exception {String s = value.getName()+"->"+value.getNum();out.collect(s);}}).print();env.execute();

http://www.ppmy.cn/news/52667.html

相关文章

「SQL面试题库」 No_49 产品销售分析 I

&#x1f345; 1、专栏介绍 「SQL面试题库」是由 不是西红柿 发起&#xff0c;全员免费参与的SQL学习活动。我每天发布1道SQL面试真题&#xff0c;从简单到困难&#xff0c;涵盖所有SQL知识点&#xff0c;我敢保证只要做完这100道题&#xff0c;不仅能轻松搞定面试&#xff0…

【社区图书馆】30+危机,最值得读烂的5本书|必读

“年少时总会抱怨读书无用&#xff0c;殊不知那只是你没有用心感受读书带给你的好处。” ㅤ 一直觉得迈入30大门并不可怕&#xff0c;“35岁危机”也离自己甚远。然而&#xff0c;近1年自己身上发生的一些事情&#xff0c;让自己越来越认识到&#xff1a;只有不断丰富内在、提高…

优漫动游APP四类页面样式设计

在设计过程中&#xff0c;在新设计一个页面时&#xff0c;会遇到一个基本的问题&#xff0c;我的页面背景应该是底色&#xff1f;页面的要素怎么组合展现才能达到更好的展现美观度&#xff0c;贴合业务需要&#xff0c;实现更高的转化率&#xff1f;   基于上面的问题&…

java版本电子招标采购系统源码—企业战略布局下的采购

​ 智慧寻源 多策略、多场景寻源&#xff0c;多种看板让寻源过程全程可监控&#xff0c;根据不同采购场景&#xff0c;采取不同寻源策略&#xff0c; 实现采购寻源线上化管控&#xff1b;同时支持公域和私域寻源。 询价比价 全程线上询比价&#xff0c;信息公开透明&#xff0…

ISO-27145故障诊断说明

ISO-27145故障诊断说明 2.1 27145目录说明 ISO27145-1: 这里边介绍的是一般信息和用例定义&#xff1b; ISO27145-2: 这里边介绍的是与排放相关的通用数据规则&#xff0c;用于查询&#xff1b; ISO27145-3: 这里边主要介绍了支持的服务 12服务 14服务 19服务 22服务 31服务&…

StringRedisTemplate-基本使用

StringRedisTemplate继承自RedisTemplate,在这里说明一下&#xff0c;当我们使用RedisTemplate往redis中存储java对象的时候&#xff0c;他会顺带着将该java对象的字节码文件也同时存进了内存中&#xff0c;这是为了实现自动反序列化Autowired private StringRedisTemplate red…

【JavaWeb】JavaScript

1、JavaScript 介绍 Javascript 语言诞生主要是完成页面的数据验证。因此它运行在客户端&#xff0c;需要运行浏览器来解析执行 JavaScript 代码。 JS 是 Netscape 网景公司的产品&#xff0c;最早取名为 LiveScript;为了吸引更多 java 程序员。更名为 JavaScript。 JS 是弱…

OpenCL与Metal API下如何合理地安排线程组大小

我们玩过OpenCL的朋友都知道&#xff0c;我们可以通过clGetDeviceInfo接口来查询当前计算设备的几乎所有属性&#xff0c;包括当前计算单元的个数、最大工作组大小、本地存储器大小等等。但这些属性值都是基于当前计算设备的最大可支持能力&#xff0c;而不是当前内核程序执行上…