pyflink datastream数据流ds经过一系列转换后转为table,t_env.from_data_stream(ds)

ops/2024/11/17 19:44:22/

pyflink 处理数据流过程中,有时候需要将data_stream转为table,下面是正确的方式,即每一个算子(map,reduce, window)操作之后需要指定输出数据类型。

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import DataTypes, StreamTableEnvironment, Schemaenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
ds = env.from_collection([(12, "Alice"), (0, "Bob")], type_info=Types.TUPLE([Types.LONG(), Types.STRING()]))def update_tel(data):return data## 正确用法,每一步操作算子之后都需要加上输出的数据类型 output_type
ds = ds.map(lambda x: update_tel(x), output_type=Types.TUPLE([Types.INT(), Types.STRING()]))
#input_table = t_env.from_data_stream(ds).alias("score", "name")
input_table = t_env.from_data_stream(ds)
input_table.print_schema()
t_env.create_temporary_view("MyView", input_table)
t_env.from_path("MyView").print_schema()# 输出:
#(
#  `f0` INT NOT NULL,
#  `f1` STRING
#)
"""
## 错误用法,不指定output_type
ds = ds.map(lambda x: update_tel(x))
#input_table = t_env.from_data_stream(ds).alias("score", "name")
input_table = t_env.from_data_stream(ds)
input_table.print_schema()
t_env.create_temporary_view("MyView", input_table)
t_env.from_path("MyView").print_schema()输出:
(`f0` RAW('[B', '...')
)
"""

参考:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/python/datastream_tutorial/
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/data_stream_api/
https://github.com/apache/flink/tree/release-1.16/flink-python/pyflink/examples/datastream


http://www.ppmy.cn/ops/134514.html

相关文章

基于图像处理与机器学习的车牌检测识别系统设计与实现

摘要:随着智能交通系统的快速发展,车牌检测识别技术在交通管理、安防监控等领域的应用日益广泛。然而,复杂环境因素如光照变化、遮挡、背景干扰等给车牌检测识别带来诸多挑战。本研究旨在设计并实现一种鲁棒性强、准确率高的车牌检测识别系统…

flink架构 详解

Flink是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。Flink同时提供了支撑流计算和批计算的接口,以下是对Flink架构的详细解析: 一、架构分层 Flink的架构体系基本上可以分为三层,由上往下依次是:…

《生成式 AI》课程 第3講 CODE TASK 任务3:自定义任务的机器人

课程 《生成式 AI》课程 第3講:訓練不了人工智慧嗎?你可以訓練你自己-CSDN博客 我们希望你创建一个定制的服务机器人。 您可以想出任何您希望机器人执行的任务,例如,一个可以解决简单的数学问题的机器人0 一个机器人&#xff0c…

django入门【05】模型介绍(二)——字段选项

文章目录 1、null 和 blank示例说明⭐ null 和 blank 结合使用的几种情况总结: 2、choices**choices 在 Django 中有以下几种形式:**(1) **简单的列表或元组形式**(2) **字典映射形式**(3&#…

当使用key-value方式进行参数传递时,若key对应的是一个对象或数组结构,如何利用API Post工具进行模拟操作。

1. 后端服务代码如下 RequestMapping("/handle11")public Person handle11(Person person){System.out.println(person);return person;} 2. 后端入参结构 person是一个对象,对象结构如下: public class Person {private String username …

TypeORM在Node.js中的高级应用

💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 TypeORM在Node.js中的高级应用 TypeORM在Node.js中的高级应用 TypeORM在Node.js中的高级应用 引言 TypeORM 基本概念 1. 实体&am…

【SQL】双层嵌套< exists not exists >

EXISTS 运算符 EXISTS 运算符用于判断查询子句是否有记录,如果有一条或多条记录存在返回 True,否则返回 False。 语法 SELECT column_name(s) FROM table_name WHERE EXISTS (SELECT column_name FROM table_name WHERE condition);实例1 查找总访问量…

SQL集合运算

集合论是SQL语言的根基。 1 集合运算 注意事项: 1)SQL能操作具有重复行的集合,可以通过可选项ALL来支持。 如果直接使用UNION或INTERSECT,结果里不会出现重复的行。如果想在结果里留下重复行,可以加上可选项ALL。写…