详解 Flink Table API 和 Flink SQL 之时间特性

news/2024/9/25 11:17:42/

一、介绍

  • Table API 和 SQL 进行基于时间的操作(比如时间窗口)时需要定义相关的时间语义和时间数据来源的信息。因此会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间
  • 时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的 DDL 里直接定义为一个字段,也可以在 DataStream 转换成表时定义。一旦定义了时间属性,就可以作为一个普通字段引用,并且可以在基于时间的操作中使用
  • 时间属性的数据类型为 TIMESTAMP,类似于常规时间戳,可以直接访问并且进行计算。
  • 按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)

二、处理时间定义

java">/**处理时间既不需要提取时间戳,也不需要生成 watermark
*/
public class TestTableProcessingTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);/*方式一:在 DataStream 转化时直接指定注意:1.使用 .proctime,定义处理时间字段2.proctime 属性只能通过附加逻辑字段,来扩展物理 schema。因此,只能在 schema 定义的末尾定义*/DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature, pt.proctime");/*方式二:在定义 Table Schema时指定*/tableEnv.connect(new FileSystem().path("./sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE()).field("pt", DataTypes.TIMESTAMP(3)).proctime()).createTemporaryTable("sensorTable");/*方式三:在创建表的 DDL 中指定注意:运行这个 DDL,必须使用 Blink Planner 版本依赖*/String sinkDDL= "create table sensorTable (" +" id varchar(20) not null, " +" ts bigint, " +" temperature double, " +" pt AS PROCTIME() " +") with (" +" 'connector.type' = 'filesystem', " +" 'connector.path' = '/sensor.txt', " +" 'format.type' = 'csv')";tableEnv.sqlUpdate(sinkDDL);env.execute();}
}

三、事件时间定义

java">/**事件时间定义需要从事件数据中,提取时间戳,并设置用于推进事件时间的进展的 watermark
*/
public class TestTableEventTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//开启事件时间语义env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);/*方式一:在 DataStream 转化时直接指定注意:1.首先必须在转换的数据流中分配时间戳和 watermark2.使用 .rowtime,定义事件时间字段3.事件时间字段既可以作为新字段追加到 schema,也可以用现有字段替换*/DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));})//提取事件时间戳和设置watermark.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature"); //将现有字段替换为事件时间字段Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature, rt.rowtime"); //将事件时间字段作为新字段追加/*方式二:在定义 Table Schema时指定*/tableEnv.connect(new FileSystem().path("./sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()).rowtime( //在事件时间戳字段后调用 rowtime 方法new RowTime().timestampsFromField("timestamp")  // 从字段中提取事件时间戳.watermarksPeriodicBounded(1000)   // 设置watermark 延迟 1 秒).field("temperature", DataTypes.DOUBLE())).createTemporaryTable("sensorTable");/*方式三:在创建表的 DDL 中指定注意:运行这个 DDL,必须使用 Blink Planner 版本依赖说明:FROM_UNIXTIME 是系统内置的时间函数,用来将一个整数(秒数)转换成 “YYYY-MM-DD hh:mm:ss”格式(默认,也可以作为第二个 String 参数传入)的日期时间字符串(date time string);然后再用 TO_TIMESTAMP 将其转换成 Timestamp*/String sinkDDL = "create table dataTable (" +" id varchar(20) not null, " + " ts bigint, " +" temperature double, " +" rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " +" watermark for rt as rt - interval '1' second" +") with (" +" 'connector.type' = 'filesystem', " +" 'connector.path' = '/sensor.txt', " +" 'format.type' = 'csv')";tableEnv.sqlUpdate(sinkDDL);env.execute();}
}

http://www.ppmy.cn/news/1470365.html

相关文章

“中国人对AI的态度最为积极”说明什么?

#AI技术服务 #AI智能应用 #AI合同 #合同AI服务 斯坦福大学的AI指数报告指出&#xff1a;“中国人对AI的态度最为积极……” 这一观点应是基于一系列的观察和研究得出的结果。如果这一观点成立&#xff0c;那么它或许说明了以下几个层面的意义: 首先&#xff0c;政策支持与…

【计算机网络】[第4章 网络层][自用]

1 概述 (1)因特网使用的TCP/IP协议体系(四层)的网际层,提供的是无连接、不可靠的数据报服务; (2)ATM、帧中继、X.25的OSI体系(七层)中的网络层,提供的是面向连接的、可靠的虚电路服务。 (3)路由选择分两种: 一种是由用户or管理员人工进行配置(只适用于规…

新人学习笔记之(注释和关键字)

一、注释 1.什么是注释 (1)注释是在程序指定位置添加的说明性信息 (2)简单理解&#xff0c;就是对代码的一种解释 2.注释的分类 (1)单行注释 格式&#xff1a;// 注释信息 (2)多行注释 格式&#xff1a;/*注释信息*/ 3.注释的使用 (1)主要作用&#xff1a;增加代码的阅读性 4.注…

C# —— 类

简介 类;就是具有相同属性和方法的对象集合,例如,人类 动物类型 ADC类等 Array数组类 ArrayList类 List类 字符串类等 类包含又什么东西组成 又什么可执行的操作(方法) 对象; 类的实例化.通俗讲就是类中的其中一个, 例如 哈士奇&#xff0c;鲁班等 a1 a2 a3就是ArrayList中的…

面向对象设计模式准则

一&#xff0c;简介 罗伯特C马丁在 21 世纪早期引入了名为「SOLID」的设计原则&#xff0c;指代了面向对象编程和面向对象设计的五个基本原则&#xff08;也有说六个的&#xff09;&#xff0c;即为 SOLIDD. 单一职责原则&#xff08;Single Responsibility Principle&#x…

ios-deploy - Required for installing your app on a physical device with the CLI

ios-deploy 是一个用于在 iOS 设备上安装、调试和运行 iOS 应用的开源工具。如果你正在使用命令行界面&#xff08;CLI&#xff09;来部署 React Native 或其他原生 iOS 应用到物理设备&#xff0c;那么安装 ios-deploy 是必要的。 以下是安装 ios-deploy 的一般步骤&#xff…

如何保证数据库和缓存的一致性

背景&#xff1a;为了提高查询效率&#xff0c;一般会用redis作为缓存。客户端查询数据时&#xff0c;如果能直接命中缓存&#xff0c;就不用再去查数据库&#xff0c;从而减轻数据库的压力&#xff0c;而且redis是基于内存的数据库&#xff0c;读取速度比数据库要快很多。 更新…

【Android】Android Studio版本手动升级到指定版本方法

前言 Android Studio本身存在自动升级的功能&#xff0c;但是这个功能使用的时候基本都是要升级到最新版本&#xff0c;但是有时候我们也不想升级到最新版本&#xff0c;是想升级到某一个版本&#xff0c;这个时候&#xff0c;Android Studio的自动升级版本的功能就无法使用了…