FLINK SQL时间属性

server/2024/10/18 20:20:20/

Flink三种时间属性简介

在Flink SQL中,时间属性是一个核心概念,它主要用于处理与时间相关的数据流。Flink支持三种时间属性:事件时间(event time)、处理时间(processing time)和摄入时间(ingestion time)。以下是对这三种时间属性的详细解释:

一、事件时间(Event Time)

  • 定义:事件时间指的是数据本身携带的时间,即数据在产生时的时间戳。
  • 特点:
    • 反映了数据实际发生的时间。
    • 需要从数据中提取时间戳,并可能需要生成watermark来处理乱序数据。
    • 在Flink SQL触发计算时,使用数据本身携带的时间。
  • 应用场景:适用于需要基于数据实际发生时间进行计算的场景,如实时日志分析、交易系统等。

二、处理时间(Processing Time)

  • 定义:处理时间指的是具体算子计算数据执行时的机器时间,即在Flink集群中处理数据的节点所在机器的本地时间。
  • 特点:
    • 是最简单的一种时间概念,不需要从数据里获取时间,也不需要生成watermark。
    • 提供了较低的时间精度和确定性,因为不同节点的处理时间可能不同。
  • 应用场景:适用于对时间精度要求不高,或者数据不需要基于事件时间进行处理的场景。
  • 定义方式:
    • 在DataStream转换时直接指定。
    • 在定义Table Schema时指定,使用.proctime后缀。
    • 在创建表的DDL中指定,使用PROCTIME()函数。

三、摄入时间(Ingestion Time)

  • 定义:摄入时间指的是数据从数据源进入Flink的时间。
  • 特点:
    • 反映了数据被Flink系统接收的时间。
    • 适用于数据源与Flink集群之间存在较大时间差的场景。
  • 应用场景:在Flink SQL中,摄入时间的使用相对较少,因为大多数场景更关注数据实际发生的时间(事件时间)或处理时间。

四、时间属性的应用

在Flink SQL中,时间属性主要用于时间窗口的计算、自定义时间语义的计算等。通过定义时间属性,可以方便地实现基于时间的聚合、过滤、连接等操作。

注意事项

  • 在一个Flink任务中,通常只会选择一个时间属性作为全局时间属性。
  • 时间属性的定义方式取决于具体的应用场景和需求。
  • 在使用事件时间时,需要注意处理乱序数据的问题,并合理设置watermark的生成策略。

Flink三种时间属性应用场景

一、事件时间(Event Time)应用场景:

  • 实时日志分析:在实时日志分析中,通常使用事件时间作为分析的基础。例如,需要统计某个时间段内的日志数量或类型,使用事件时间可以确保统计结果基于日志实际发生的时间。
  • 交易系统:在交易系统中,事件时间用于处理交易数据的实时分析。例如,计算某支股票在特定时间段内的价格波动,需要确保时间戳与交易发生的时间一致。
  • 实时推荐系统:在实时推荐系统中,用户行为数据的时间戳是事件时间。通过基于事件时间的分析,可以了解用户在不同时间段的行为模式,从而提供更加个性化的推荐。

二、处理时间(Processing Time)应用场景:

  • 非实时数据分析:对于不需要严格基于事件时间进行分析的场景,可以使用处理时间。例如,进行批处理任务时,不关心数据实际发生的时间,只关注任务开始和结束的时间。
  • 本地开发和测试:在本地开发和测试环境中,由于无法模拟真实的事件时间,可以使用处理时间进行简化处理。

三、摄入时间(Ingestion Time)应用场景:

  • 数据源与Flink集群时间差较大:当数据源与Flink集群之间存在较大的时间差时,使用摄入时间可以确保数据在Flink集群中处理的一致性。然而,在实际应用中,摄入时间的使用相对较少,因为大多数场景更关注数据实际发生的时间(事件时间)或处理时间。

SQL指定时间属性两种方式

在Flink SQL中,指定时间属性主要有两种方式,以下是对这两种方式的详细解释:

一、在创建表的DDL中指定时间属性

  1. 事件时间(Event Time):
    • 在创建表的DDL语句中,可以通过增加一个时间戳字段并使用WATERMARK语句来定义事件时间属性。
    • 事件时间列的字段类型必须是TIMESTAMP或TIMESTAMP_LTZ类型。
    • WATERMARK语句用于生成水印(watermark),以处理乱序数据。水印是一个延迟阈值,表示在该时间戳之前的所有数据都已经到达。
      示例代码:
sql">CREATE TABLE user_actions (  user_name STRING,  data STRING,  user_action_time TIMESTAMP(3),  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND  
) WITH (...);

在这个例子中,user_action_time被声明为事件时间,并且设置了5秒的水印延迟。
2. 处理时间(Processing Time):
- 在创建表的DDL语句中,可以通过增加一个字段并使用PROCTIME()函数来定义处理时间属性。
- PROCTIME()函数是Flink SQL内置的函数,用于获取当前处理时间。
示例代码:

sql">CREATE TABLE EventTable (  user STRING,  url STRING,  ts AS PROCTIME()  
) WITH (...);

在这个例子中,ts字段被定义为处理时间属性。

二、在DataStream转换时指定时间属性

  1. 事件时间(Event Time):
    • 在DataStream API中,可以通过assignTimestampsAndWatermarks方法来为数据流分配时间戳和水印。
    • 这种方法通常用于从外部数据源(如Kafka)读取数据时,为数据分配事件时间。
      示例代码(伪代码):
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer<MyEvent>(...))  .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(5)) {  @Override  public long extractTimestamp(MyEvent event) {  return event.getTimestamp(); // 从事件中提取时间戳  }  });

在这个例子中,使用BoundedOutOfOrdernessTimestampExtractor为数据流分配了事件时间,并设置了5秒的最大乱序时间。
2. 处理时间(Processing Time):
- 在DataStream API中,处理时间是默认的时间属性,不需要显式指定。
- 但是,如果需要在后续操作中引用处理时间,可以通过在Table API中使用.proctime后缀来访问。
示例代码(伪代码):

sql">Table table = tableEnv.fromDataStream(stream, "user, temperature, timestamp, pt.proctime as processingTime");

在这个例子中,pt.proctime被用作处理时间属性,并在Table API中进行了访问。

需要注意的是,在实际应用中,选择哪种方式指定时间属性取决于具体的应用场景和需求。在Flink SQL中,通常更倾向于在创建表的DDL中指定时间属性,因为这样可以更直观地定义表的模式结构(schema),并且方便后续的时间相关操作。而在DataStream API中指定时间属性则更灵活,适用于需要从外部数据源读取数据并为其分配时间戳的场景。

SQL事件时间案例

以下是一个关于Flink SQL事件时间的案例,用于展示如何在Flink SQL中使用事件时间属性进行窗口聚合操作。

案例背景

假设有一个数据流,其中包含了用户的点击事件。每个事件都有一个事件时间戳,表示用户点击的时间。任务是计算每个用户在每10分钟窗口内的点击次数。

步骤一:创建数据源表

首先,需要创建一个数据源表,并声明事件时间属性。在这个例子中,假设数据源是一个Kafka主题,并且事件时间戳存储在名为eventTime的字段中。

sql">CREATE TABLE clicks (  userId STRING,  eventTime TIMESTAMP(3),  -- 事件时间戳  WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 声明水印,用于处理乱序数据  
) WITH (  'connector' = 'kafka',  'topic' = 'clicks_topic',  'properties.bootstrap.servers' = 'localhost:9092',  'format' = 'json',  'scan.startup.mode' = 'earliest-offset'  
);

步骤二:使用窗口聚合操作

接下来,可以使用Flink SQL的窗口聚合操作来计算每个用户在每10分钟窗口内的点击次数。在这个例子中,将使用滚动窗口(TUMBLE)进行聚合。

sql">SELECT  userId,  TUMBLE_START(eventTime, INTERVAL '10' MINUTE) AS windowStart,  -- 窗口开始时间  TUMBLE_END(eventTime, INTERVAL '10' MINUTE) AS windowEnd,      -- 窗口结束时间  COUNT(*) AS clickCount                                         -- 点击次数  
FROM  clicks  
GROUP BY  userId,  TUMBLE(eventTime, INTERVAL '10' MINUTE);

解释

  1. 创建数据源表:在创建表时,指定了eventTime字段为事件时间属性,并设置了5秒的水印延迟。这意味着Flink将等待最多5秒以处理可能到达的乱序数据。
  2. 窗口聚合操作:使用TUMBLE函数定义了一个滚动窗口,窗口大小为10分钟。然后,按用户ID和窗口进行分组,并计算每个分组中的点击次数。
  3. 结果:查询结果将包含用户ID、窗口开始时间、窗口结束时间和点击次数。

注意事项

  1. 水印:在处理事件时间时,水印是非常重要的。它们允许Flink处理乱序数据,并确保在窗口聚合时不会遗漏任何数据。
  2. 时间属性类型:事件时间列的字段类型必须是TIMESTAMP或TIMESTAMP_LTZ类型。如果数据源中的时间戳是BIGINT类型(表示毫秒或秒),则需要在创建表时将其转换为TIMESTAMP类型。
  3. 窗口类型:Flink SQL支持多种类型的窗口,如滚动窗口(TUMBLE)、滑动窗口(SLIDE)和会话窗口(SESSION)等。根据具体需求选择合适的窗口类型。

SQL处理时间案例

在Flink SQL中,处理时间(Processing Time)是指数据被具体算子处理时的系统时间。以下是一个基于处理时间的Flink SQL案例,用于展示如何使用处理时间属性进行窗口聚合操作。

案例背景

假设有一个数据流,其中包含了传感器读取的数据。每个数据都有一个读取时间戳,但这个时间戳不是事件发生时的时间,而是数据被读取到系统的时间。任务是计算每5分钟内读取的数据量。

步骤一:创建数据源表

首先,需要创建一个数据源表,并声明处理时间属性(在Flink SQL中,处理时间属性是隐式的,不需要显式声明,但可以通过特定的函数来引用)。在这个例子中,假设数据源是一个Socket流。

sql">-- 假设有一个Socket数据源,数据格式为:id,value,timestamp(这里的timestamp是读取时间戳)  
CREATE TABLE sensor_data (  id STRING,  value DOUBLE,  timestamp BIGINT  -- 读取时间戳,单位为毫秒  
) WITH (  'connector' = 'socket',  'hostname' = 'localhost',  'port' = '9999',  'format' = 'csv'  
);

步骤二:转换时间戳并创建处理时间窗口

由于Flink SQL中的处理时间属性是隐式的,不能直接对其进行操作。但是,可以通过将读取时间戳转换为TIMESTAMP类型(尽管这不是必要的,因为处理时间窗口不需要显式的时间戳字段),然后使用Flink SQL提供的窗口函数来创建处理时间窗口。不过,在这个例子中,将直接使用处理时间窗口函数,而不进行显式的转换。

sql">-- 使用处理时间滚动窗口计算每5分钟内的数据量  
SELECT  TUMBLE_START(PROCTIME()) AS window_start,  -- 窗口开始时间(处理时间)  TUMBLE_END(PROCTIME()) AS window_end,      -- 窗口结束时间(处理时间)  COUNT(*) AS data_count                     -- 数据量  
FROM  sensor_data  
GROUP BY  TUMBLE(PROCTIME(), INTERVAL '5' MINUTE);   -- 使用处理时间滚动窗口,窗口大小为5分钟

解释

  1. 创建数据源表:创建了一个名为sensor_data的数据源表,它接收来自Socket流的数据。数据包含id、value和timestamp字段,其中timestamp是数据被读取到系统的时间戳(以毫秒为单位)。
  2. 转换时间戳并创建窗口:在这个例子中,实际上没有显式地将timestamp字段转换为TIMESTAMP类型,因为处理时间窗口不需要这样做。相反,直接使用了PROCTIME()函数来获取处理时间,并使用TUMBLE函数创建了一个滚动窗口。窗口大小为5分钟,意味着每5分钟将计算一次窗口内的数据量。
  3. 结果:查询结果将包含窗口开始时间、窗口结束时间和窗口内的数据量。

注意事项

  1. 处理时间属性:在Flink SQL中,处理时间属性是隐式的,不需要显式声明。它表示数据被具体算子处理时的系统时间。
  2. 窗口函数:Flink SQL提供了多种窗口函数,如TUMBLE(滚动窗口)、SLIDE(滑动窗口)和SESSION(会话窗口)等。根据具体需求选择合适的窗口函数。
  3. 数据源:在这个例子中,使用了Socket作为数据源。在实际应用中,数据源可能是Kafka、文件系统或其他数据源。

http://www.ppmy.cn/server/131216.html

相关文章

回溯法与迭代法详解:如何从手机数字键盘生成字母组合

在这篇文章中&#xff0c;我们将详细介绍如何基于手机数字键盘的映射&#xff0c;给定一个仅包含数字 2-9 的字符串&#xff0c;输出它能够表示的所有字母组合。这是一个经典的回溯算法问题&#xff0c;适合初学者理解和掌握。 问题描述 给定一个数字字符串&#xff0c;比如 …

云原生、云计算、虚拟化概念概述

&#xff08;带着批评阅读&#xff0c;不对的请评论区补充&#xff09; 1、出现年代前后顺序 虚拟化------>云计算------>云原生 2、虚拟化 虚拟化侧重描述实现&#xff0c;最开始的技术是模拟、hook指令执行软件程序&#xff0c;后续出现了半虚拟化、CPU硬件提供虚拟化…

常见的负载均衡

1.常见的负载均衡服务 负载均衡服务是分布式系统中用于分配网络流量和请求的关键组件&#xff0c;它可以帮助提高应用程序的可用性、可扩展性和响应速度。以下是一些常用的负载均衡服务&#xff1a; Nginx&#xff1a;一个高性能的Web服务器和反向代理&#xff0c;广泛用于实现…

Spark第一天

MapReduce过程复习 Spark由五部分组成 RDD五大特征 1、 Spark -- 代替MapReduce <<<<< scala是单机的&#xff0c;spark是分布式的。>>>>> 开源的分布式计算引擎 可以快速做计算 -- 因为可以利用内存来做一些计算 (1) 分为5个库(模块) : 1、…

【OceanBase诊断调优】—— 错误码 5065 和 5066 的区别

适用版本&#xff1a;V2.1.x、V2.2.x、V3.1.x、V3.2.x 5065 与 5066 是两个近似的报错。 OB_ERR_QUERY_INTERRUPTED(-5065): Message: Query execution was interrupted。 含义为执行中断, 例如终端执行 SQL 过程中按 ctrlc 终止 SQL 执行会报 -5065。 OB_ERR_SESSION_INTER…

原来!给我一首歌的时间,真的可以搞定Vue的计算属性computed ,以及如何区分watch监视属性

目录 1. 什么是Vue的计算属性呢&#xff1f; 2. 如何区分computed 和 watch&#xff1f; 3. computed 完整示例1 4. computed 完整示例2 1. 什么是Vue的计算属性呢&#xff1f; Vue的计算属性&#xff08;Computed Properties&#xff09;是Vue框架中一个非常重要的特性&a…

git merge啥意思

git merge 是 Git 中的一个命令&#xff0c;用于将一个分支的更改合并到另一个分支中。当你在一个项目中有多个开发人员同时工作&#xff0c;或者你在不同的特性分支上开发新功能时&#xff0c;git merge 命令就非常有用。它可以帮助你将不同分支上的更改整合在一起。 git mer…

处理“navicat premium 2003 - 无法在 192.168.10.140 上连接到 MySQL 服务器(10060“未知错误“)”的问题:

以下是一些可能的解决方法来处理“navicat premium 2003 - 无法在 192.168.10.140 上连接到 MySQL 服务器&#xff08;10060"未知错误"&#xff09;”的问题&#xff1a; **一、检查 MySQL 服务状态** 1. 确认 MySQL 服务是否正在运行。你可以在服务器上通过任务管…