在 pyflink 处理数据流过程中,有时候需要将data_stream转为table,下面是正确的方式,即每一个算子(map,reduce, window)操作之后需要指定输出数据类型。
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import DataTypes, StreamTableEnvironment, Schemaenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
ds = env.from_collection([(12, "Alice"), (0, "Bob")], type_info=Types.TUPLE([Types.LONG(), Types.STRING()]))def update_tel(data):return data## 正确用法,每一步操作算子之后都需要加上输出的数据类型 output_type
ds = ds.map(lambda x: update_tel(x), output_type=Types.TUPLE([Types.INT(), Types.STRING()]))
#input_table = t_env.from_data_stream(ds).alias("score", "name")
input_table = t_env.from_data_stream(ds)
input_table.print_schema()
t_env.create_temporary_view("MyView", input_table)
t_env.from_path("MyView").print_schema()# 输出:
#(
# `f0` INT NOT NULL,
# `f1` STRING
#)
"""
## 错误用法,不指定output_type
ds = ds.map(lambda x: update_tel(x))
#input_table = t_env.from_data_stream(ds).alias("score", "name")
input_table = t_env.from_data_stream(ds)
input_table.print_schema()
t_env.create_temporary_view("MyView", input_table)
t_env.from_path("MyView").print_schema()输出:
(`f0` RAW('[B', '...')
)
"""
参考:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/python/datastream_tutorial/
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/data_stream_api/
https://github.com/apache/flink/tree/release-1.16/flink-python/pyflink/examples/datastream