Flink学习之Flink SQL

server/2024/10/21 7:37:55/

Flink SQL

1、SQL客户端

1.1 基本使用
  • 启动yarn-session

    yarn-session.sh -d
    
  • 启动Flink SQL客户端

    sql-client.sh--退出客户端
    exit;
    
  • 测试

    重启SQL客户端之后,需要重新建表

    sql">-- 构建Kafka Source
    -- 无界流
    drop table if exists students_kafka_source;
    CREATE TABLE if not exists students_kafka_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING
    ) WITH ('connector' = 'kafka','topic' = 'students1000','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv', -- 以 ,分隔的数据-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
    );-- 执行查询,基于KafkaSource是无界流,所以查询时连续变化的
    select * from students_kafka_source;
    select clazz,count(*) as cnt from students_kafka_source group by clazz;-- 向Kafka生产数据
    kafka-console-producer.sh --broker-list master:9092 --topic students1000
    
1.2 三种显示模式
  • 表格模式

    SQL客户端默认的结果显示模式

    在内存中实体化结果,并将结果用规则的分页表格可视化展示出来

    sql">SET 'sql-client.execution.result-mode' = 'table';
    
  • 变更日志模式

    不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流

    sql">SET 'sql-client.execution.result-mode' = 'changelog';
    
  • Tableau模式

    更接近传统的数据库,会将执行的结果(类似变更日志模式,由插入(+)和撤销(-)组成的持续查询产生结果流)以制表的形式直接打在屏幕之上

    sql">SET 'sql-client.execution.result-mode' = 'tableau';
    
1.3 不同的执行模式
  • 批处理

    只能处理有界流

    结果是固定的

    底层是基于MR模型

    不会出现由插入(+)和撤销(-)组成的持续查询产生结果流这种结果,只会出现最终结果

    sql">SET 'execution.runtime-mode' = 'batch';
    
  • 流处理

    默认的方式

    既可以处理无界流,也可以处理有界流

    结果是连续变化的

    底层是基于持续流模型

    sql">SET 'execution.runtime-mode' = 'streaming';
    

2、常用的connector

2.1 Kafka
  • 准备工作

    # 下载依赖
    https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.4/flink-sql-connector-kafka-1.15.4.jar# 长传至${FLINK_HOME}/lib# 重启yarn-session.sh
    # 先找到yarn-session的application id
    yarn application -list# kill掉yarn-session在Yarn上的进程
    yarn application -kill application_1722331927004_0007# 再启动yarn-session
    yarn-session.sh -d# 再启动sql-client
    sql-client.sh
    
  • Source

    sql">-- 构建Kafka Source
    -- 无界流
    drop table if exists students_kafka_source;
    CREATE TABLE if not exists students_kafka_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,-- Kafka Source提供的数据之外的数据(元数据)`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',`pt` BIGINT METADATA FROM 'partition',`offset` BIGINT METADATA FROM 'offset',`topic` STRING METADATA FROM 'topic'
    ) WITH ('connector' = 'kafka','topic' = 'students1000','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
    );-- 执行查询
    select id,name,event_time,pt,`offset`,`topic` from students_kafka_source limit 10;
    
  • Sink

    • 结果不带更新的Sink

      sql">drop table if exists students_lksb_sink;
      CREATE TABLE if not exists students_lksb_sink (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING
      ) WITH ('connector' = 'kafka','topic' = 'students_lksb_sink','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
      );-- 执行不带更新的查询
      insert into students_lksb_sink
      select id,name,age,gender,clazz from students_kafka_source where clazz='理科四班';
      
    • 结果带更新的Sink

      Kafka只支持追加的写入,不支持更新数据

      故有更新的查询结果无法直接编码,写入Kafka

      虽然Kafka支支持append,但是可以将更新流编码成“ +、-”不断追加到Kafka中

      如果有更新,那么往Kafka写两条记录即可表示更新,即:先“-”再“+”

      但是csv这种格式无法表达“-”或“+”操作,故无法在有更新的结果写Kafka时使用

      需要使用:canal-json或者是debezium-json

      canal-json:{“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}

      debezium-json:{“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}

      sql">-- 基于Kafka Source 统计班级人数 最终结果写入Kafka
      drop table if exists clazz_cnt_sink;
      CREATE TABLE if not exists clazz_cnt_sink (`clazz` String,`cnt` BIGINT
      ) WITH ('connector' = 'kafka','topic' = 'clazz_cnt_sink_02','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'canal-json' -- 或者是指定为debezium-json
      );-- 执行查询并且将查询结果插入到Sink表中
      insert into clazz_cnt_sink
      select clazz,count(*) as cnt from students_kafka_source group by clazz;
      
2.2 JDBC

用于连接数据库,例如:MySQL、Oracle、PG、Derby

  • 准备工作

    # 下载依赖
    https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.4/flink-connector-jdbc-1.15.4.jar# 上传依赖至FLINK的lib目录下,还需要将Linu中MySQL的驱动拷贝一份到lib目录下,可以从Hadoop中进行拷贝# 重启yarn-session以及sql客户端
    
  • Source

    有界流,只会查询一次,查询完后直接结束(从jdbc中读取数据是有界流

    sql">drop table if exists students_mysql_source;
    CREATE TABLE if not exists students_mysql_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,PRIMARY KEY (id) NOT ENFORCED
    ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false','table-name' = 'students','username' = 'root','password' = '123456'
    );-- 执行查询
    select * from students_mysql_source;
    -- 将模式换成tableau 看结果变化的全过程
    SET 'sql-client.execution.result-mode' = 'tableau';
    -- 默认会以 流处理的方式 执行,所以可以看到结果连续变化的过程
    select gender,count(*) as cnt from students_mysql_source group by gender;
    -- 将运行模式切换成批处理
    SET 'execution.runtime-mode' = 'batch';
    -- 再试执行,只会看到最终的一个结果,没有变化的过程(这是与流处理的区别之处)
    select gender,count(*) as cnt from students_mysql_source group by gender;
    
  • Sink

    从Kafka接收无界流的学生数据,统计班级人数,将最终的结果写入MySQL

    sql">-- 创建MySQL的结果表
    -- 查询库中已有表的建表语句
    show create table xxx;-- 无主键的MySQL建表语句
    -- 最终发现写入的结果是有连续变换的过程,并不是直接写入最终的结果
    drop table if exists `clazz_cnt`;
    CREATE TABLE if not exists `clazz_cnt`(`clazz` varchar(255) DEFAULT NULL,`cnt` bigint DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 将班级设置为主键
    -- 最终写入的结果是可以通过主键进行更新,所以可以展示最终的结果,并且可以实时更新
    drop table if exists `clazz_cnt`;
    CREATE TABLE if not exists `clazz_cnt`(`clazz` varchar(255) NOT NULL,`cnt` bigint(20) DEFAULT NULL,PRIMARY KEY (`clazz`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 创建MySQL的Sink表
    drop table if exists clazz_cnt_mysql_sink;
    CREATE TABLE if not exists clazz_cnt_mysql_sink (`clazz` STRING,`cnt`	BIGINT,-- 如果查询的结果有更新,则需要设置主键PRIMARY KEY (clazz) NOT ENFORCED
    ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false','table-name' = 'clazz_cnt','username' = 'root','password' = '123456'
    );-- 记得将执行模式切换成流处理,因为Kafka是无界流
    SET 'execution.runtime-mode' = 'streaming';
    -- 执行查询:实时统计班级人数,将结果写入MySQL
    insert into clazz_cnt_mysql_sink
    select clazz,count(*) as cnt from students_kafka_source where clazz is not null group by clazz;
    
2.3 HDFS
  • Source

    • 有界流

      默认的方式

      sql">drop table if exists students_hdfs_source;
      CREATE TABLE if not exists students_hdfs_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,`file.path` STRING NOT NULL METADATA
      ) WITH ('connector' = 'filesystem','path' = 'hdfs://master:9000/bigdata30/students.txt','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
      );-- 查询表中是否有数据
      select * from students_hdfs_source limit 100;
      
    • 无界流

      同DataStream的FileSource一致

      可以通过设置source.monitor-interval参数,来指定一个监控的间隔时间,例如:5s

      FLink就会定时监控目录的一个变换,有新的文件就可以实时进行读取

      最终得到一个无界流

      sql">-- 创建HDFS目录
      hdfs dfs -mkdir /bigdata30/flink-- 创建Source表
      drop table if exists students_hdfs_unbounded_source;
      CREATE TABLE if not exists students_hdfs_unbounded_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,`file.path` STRING NOT NULL METADATA
      ) WITH ('connector' = 'filesystem','path' = 'hdfs://master:9000/bigdata30/flink','source.monitor-interval' = '5s','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
      );-- 执行查询
      select * from students_hdfs_unbounded_source;-- 向目录上传文件
      hdfs dfs -cp /bigdata30/students.txt /bigdata30/flink/students.txt1
      hdfs dfs -cp /bigdata30/students.txt /bigdata30/flink/students.txt2
      
  • Sink

    • 查询结果没有更新,写入数据

      sql">drop table if exists students_hdfs_sink;
      CREATE TABLE if not exists students_hdfs_sink (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,`file_path` STRING
      ) WITH ('connector' = 'filesystem','path' = 'hdfs://master:9000/bigdata30/sink/','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
      );insert into students_hdfs_sink
      select * from students_hdfs_source;
      
    • 查询结果有更新,写入数据

      同Kafka类似,HDFS不支持更新数据,故需要将变换的结果编码成canal-json或者是debezium-json的格式才能进行insert

      sql">drop table if exists clazz_cnt_hdfs_sink;
      CREATE TABLE if not exists clazz_cnt_hdfs_sink (`clazz` STRING,`cnt` BIGINT
      ) WITH ('connector' = 'filesystem','path' = 'hdfs://master:9000/bigdata30/clazz_cnt/','format' = 'canal-json'
      );-- 使用有界的数据源来写入待更新的计算结果
      insert into clazz_cnt_hdfs_sink
      select clazz,count(*) as cnt from students_hdfs_source group by clazz;
      
2.4 HBase
hbase启动顺序:
zk(三台虚拟机都启动)-->hadoop(主从复制:在master端启动即可)-->hbase(在master端启动即可)hbase关闭顺序:
hbase-->hadoop-->zk# 启动
start-hbase.sh#关闭
stop-hbase.sh# 进入HBase的客户端
hbase shell
  • 准备工作

    sql"># 下载依赖
    https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/1.15.4/flink-sql-connector-hbase-2.2-1.15.4.jar# 上传依赖并重启yarn-session及sql客户端
    
  • Source

    同MySQL类似,得到是一个有界流

    sql">drop table if exists students_hbase_source;
    CREATE TABLE if not exists students_hbase_source (rowkey STRING,info ROW<name STRING, age STRING,gender STRING,clazz STRING>,PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH ('connector' = 'hbase-2.2','table-name' = 'students','zookeeper.quorum' = 'master:2181'
    );select rowkey,info.name,info.age,info.gender,info.clazz from students_hbase_source;
    
  • Sink

    同MySQL类似

    sql">-- 在HBase中建表
    create 'stu01','info'-- 构建HBase Sink表
    drop table if exists stu_hbase_sink;
    CREATE TABLE if not exists stu_hbase_sink (id STRING,info ROW<name STRING,clazz STRING>,PRIMARY KEY (id) NOT ENFORCED
    ) WITH ('connector' = 'hbase-2.2','table-name' = 'stu01','zookeeper.quorum' = 'master:2181'
    );-- 丢弃null的数据
    set 'table.exec.sink.not-null-enforcer'='DROP';-- 仅追加的结果写入,由于HBase有rk存在,相同的RK会进行覆盖
    insert into stu_hbase_sink
    select cast(id as STRING) as id,ROW(name,clazz) as info 
    from students_kafka_source
    ;-- hbase中遍历数据
    scan "stu01",LIMIT => 50-- 在HBase中建表
    create 'clazz_cnt_01','info'-- 构建HBase Sink表
    drop table if exists clazz_cnt_hbase_sink;
    CREATE TABLE if not exists clazz_cnt_hbase_sink (clazz STRING,info ROW<cnt BIGINT>,PRIMARY KEY (clazz) NOT ENFORCED
    ) WITH ('connector' = 'hbase-2.2','table-name' = 'clazz_cnt_01','zookeeper.quorum' = 'master:2181'
    );-- 带更新的查询结果可以实时在HBase中通过RK进行更新
    insert into clazz_cnt_hbase_sink
    select clazz,ROW(count(*)) as info
    from students_kafka_source
    group by clazz
    ;-- hbase中遍历数据
    scan "clazz_cnt_01",LIMIT => 50
    
2.5 DataGen

用于按照指定的规则生成数据,一般用于性能测试

sql">drop table if exists datagen;
CREATE TABLE if not exists datagen (id BIGINT,random_id BIGINT,name STRING
) WITH ('connector' = 'datagen',-- optional options --'rows-per-second'='20', -- 设置每秒钟生成的数据量'fields.id.kind' = 'random','fields.id.min'='10000000','fields.id.max'='99999999','fields.random_id.kind' = 'random','fields.random_id.min'='10000000','fields.random_id.max'='99999999','fields.name.length'='5'
);
2.6 Blackhole

用于性能测试,可以作为Sink端

sql">drop table if exists blackhole_table;
CREATE TABLE if not exists  blackhole_table
WITH ('connector' = 'blackhole')
LIKE datagen (EXCLUDING ALL);insert into blackhole_table
select * from datagen group by name;drop table if exists blackhole_table;
CREATE TABLE if not exists  blackhole_table(name String,cnt BIGINT
)
WITH ('connector' = 'blackhole')
;insert into blackhole_table
select name,count(*) as cnt from datagen group by name;
2.7 Print

将结果数据在TaskManager中输出

sql">drop table if exists print_table;
CREATE TABLE if not exists print_table (name STRING,cnt BIGINT
) WITH ('connector' = 'print'
);insert into print_table
select name,count(*) as cnt from datagen group by name;

3、常用的格式

3.1 CSV

逗号分隔符文件,并非一定是.csv文件

在作为Sink时的format,仅支持写入不带更新的结果

解析每条数据是通过顺序匹配

常用参数:

csv.ignore-parse-errors 默认false,忽略解析错误,不会导致程序直接停止

csv.field-delimiter 默认 逗号,指定数据的列分隔符

3.2 JSON
3.2.1 json

普通的json格式,解析数据是通过列名进行匹配

同csv类似,只支持写入不带更新的结果

sql">drop table if exists cars_json_source;
CREATE TABLE if not exists cars_json_source (car String,county_code INT,city_code INT,card BIGINT,camera_id String,orientation String,road_id BIGINT,`time` BIGINT,speed Double
) WITH ('connector' = 'kafka','topic' = 'cars_json','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'json'
);
3.2.2 canal-json

一种特殊的JSON格式

支持写入更新的结果

{“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}

3.2.3 debezium-json

同canal-json,只是数据格式有些许差异

{“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}

3.3 ORC

一般不用

3.4 PARQUET

一般不用

4、时间属性

4.1 处理时间

基于系统的时间

sql">drop table if exists students_kafka_source;
CREATE TABLE if not exists students_kafka_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,-- 通过系统时间给表增加一列,即:处理时间proc_time as PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'students1000','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);select  clazz,count(*) as cnt,tumble_start(proc_time,INTERVAL '5' SECONDS) as window_start,tumble_end(proc_time,INTERVAL '5' SECONDS) as window_end
from students_kafka_source 
group by clazz,tumble(proc_time,INTERVAL '5' SECONDS)
;-- 向Topic中生产数据
kafka-console-producer.sh --broker-list master:9092 --topic students1000
4.2 事件时间

基于数据自带的时间

java,2024-08-03 10:41:50

java,2024-08-03 10:41:51

java,2024-08-03 10:41:52

sql">drop table if exists words_kafka_source;
CREATE TABLE if not exists words_kafka_source (`word` STRING,-- 从数据中过来的一列,作为事件时间event_time TIMESTAMP(3),-- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'words_event_time','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 创建topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic words_event_time-- 执行查询,使用滚动的事件时间窗口进行word count,每5s统计一次
select  word,count(*) as cnt,tumble_start(event_time,INTERVAL '5' SECONDS) as window_start,tumble_end(event_time,INTERVAL '5' SECONDS) as window_end
from words_kafka_source 
group by word,tumble(event_time,INTERVAL '5' SECONDS)
;-- 向Topic中生产数据
kafka-console-producer.sh --broker-list master:9092 --topic words_event_time

5、SQL语法

5.1 Hints

在SQL查询时动态修改表的参数配置

sql">-- words_kafka_source 默认从最后开始消费
select * from words_kafka_source; // 只能查询到最新的数据,不会从头开始消费-- 假设现在需要从头开始消费
-- 第一种方案,将words_kafka_source删除重建
-- 第二种方案,通过alter table 对表进行修改
-- 第三种方案,通过hints动态调整表的配置
select * from words_kafka_source /*+OPTIONS('scan.startup.mode' = 'earliest-offset') */ ;
5.2 With

用于将多次执行的同一查询通过with先定义,后面可以进行多次使用,避免重复的SQL

应用场景:1、多次使用的SQL查询可以缓存提高性能 2、将多级嵌套解开来,降低主SQL的复杂度

sql">drop table if exists students_mysql_source;
CREATE TABLE if not exists students_mysql_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false','table-name' = 'students','username' = 'root','password' = '123456'
);select id,name from students_mysql_source where clazz = '理科一班'
union all
select id,name from students_mysql_source where clazz = '理科一班'
;-- 通过with可以将多次使用的SQL进行定义
with stu_lkyb as (select id,name from students_mysql_source where clazz = '理科一班'
)
select * from stu_lkyb
union all
select * from stu_lkyb
union all
select * from stu_lkyb
;
5.3 Where

可以进行过滤

sql">select id,name,clazz,age from students_mysql_source where clazz = '理科一班' and age > 20;-- 找到重复数据并进行过滤
select	id,name,age,gender,clazz
from (
select id,name,age,gender,clazz,count(*) as cnt from students_mysql_source group by id,name,age,gender,clazz
) t1 where t1.cnt = 1;-- 聚合后的过滤可以使用Having
select id,name,age,gender,clazz,count(*) as cnt from students_mysql_source group by id,name,age,gender,clazz
having count(*) = 1;
5.4 Distinct

用于去重

需要对每条不同的数据维护一个状态,状态会无限制的增大,最终任务可能会失败

无界流是正常可以去重的

有界流必须在分组之后带上聚合操作才能去重,如果直接distinct或者是groupby不聚合,最终任务里不会产生shuffle,即不会分组,也就无法去重

sql">-- 去重
select id,name,age,gender,clazz from students_mysql_source group by id,name,age,gender,clazz;-- 等价于distinct
select distinct id,name,age,gender,clazz from students_mysql_source;select distinct id from students_mysql_source;
5.5 Windowing TVFs

目前提供了三类TVFs窗口操作:TUMBLE、HOP、CUMULATE

会话SESSION窗口只能通过GROUP WINDOW FUNCTION实现

计数窗口在FLINK SQL中暂未支持

5.5.1 Tumble

需要设置一个滚动时间

每隔一段时间会触发一次窗口的统计

sql">-- 创建Bid订单表
drop table if exists bid_kafka_source;
CREATE TABLE if not exists bid_kafka_source (`item` STRING,`price` DOUBLE,`bidtime` TIMESTAMP(3),`proc_time` as PROCTIME(),-- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间WATERMARK FOR bidtime AS bidtime
) WITH ('connector' = 'kafka','topic' = 'bid','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 准备数据
C,4.00,2020-04-15 08:05:00
A,2.00,2020-04-15 08:07:00
D,5.00,2020-04-15 08:09:00
B,3.00,2020-04-15 08:11:00
E,1.00,2020-04-15 08:13:00
F,6.00,2020-04-15 08:17:00-- 创建Kafka Topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic bid-- 生产数据
kafka-console-producer.sh --broker-list master:9092 --topic bid-- 基于事件时间的滚动窗口
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
) group by window_start,window_end
;-- 基于处理时间的滚动窗口
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '10' SECONDS)
) group by window_start,window_end
;
5.5.2 HOP

滑动窗口

需要指定两个时间:滑动的时间、窗口的大小

sql">-- 基于事件时间的滑动窗口
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可HOP(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
) group by window_start,window_end
;-- 基于处理时间的滑动窗口
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可HOP(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '5' SECONDS, INTERVAL '10' SECONDS)
) group by window_start,window_end
;
5.5.3 CUMULATE

累积窗口:首先会按照步长初始化一个窗口大小,然后按照步长的间隔时间触发窗口的统计,接下来窗口大小会不断增大,直到达到设置的最大size,然后重复这个过程

需要指定两个时间间隔:步长、最大的size

例如:步长为2分钟,size为10分钟

每隔2分钟会触发一次统计,第一次统计的最近两分钟的数据,第二次统计是最近4分钟的…第5次统计是最近10分钟的数据,第6次统计是最近2分钟的数据…

sql">-- 基于事件时间的累计窗口
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
) group by window_start,window_end
;-- 基于处理时间的累计窗口
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '2' SECONDS, INTERVAL '10' SECONDS)
) group by window_start,window_end
;
5.5.4 SESSION

会话窗口,目前版本不支持TVFs写法

需要使用老版本的写法:GROUP WINDOW FUNCTION

间隔一段时间没有数据就会触发窗口的统计

sql">-- 基于事件时间的会话窗口
select session_start(bidtime, INTERVAL '2' MINUTES) as session_start,session_end(bidtime, INTERVAL '2' MINUTES) as session_end,sum(price) as sum_price
from bid_kafka_source
group by session(bidtime, INTERVAL '2' MINUTES)
;-- 基于处理时间的会话窗口
select session_start(proc_time, INTERVAL '2' SECONDS) as session_start,session_end(proc_time, INTERVAL '2' SECONDS) as session_end,sum(price) as sum_price
from bid_kafka_source
group by session(proc_time, INTERVAL '2' SECONDS)
;

6、Over聚合

6.1 聚合类

sum、max、min、count、avg

sum 比较特殊:如果指定了order By,则表示累加求和,不指定则表示整个窗口求和

max、min、count、avg 不需要指定order By

sql">-- 准备数据
item,supply_id,price,bidtime
A,001,4.00,2020-04-15 08:05:00
A,002,2.00,2020-04-15 08:06:00
A,001,5.00,2020-04-15 08:07:00
B,002,3.00,2020-04-15 08:08:00
A,001,1.00,2020-04-15 08:09:00
A,002,6.00,2020-04-15 08:10:00
B,001,6.00,2020-04-15 08:11:00
A,001,6.00,2020-04-15 08:12:00
B,002,6.00,2020-04-15 08:13:00
B,002,6.00,2020-04-15 08:14:00
A,001,66.00,2020-04-15 08:15:00
B,001,6.00,2020-04-15 08:16:00-- 创建order订单表
drop table if exists order_kafka_source;
CREATE TABLE if not exists order_kafka_source (`item` STRING,`supply_id` STRING,`price` DOUBLE,`bidtime` TIMESTAMP(3),-- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间WATERMARK FOR bidtime AS bidtime
) WITH ('connector' = 'kafka','topic' = 'order','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 创建Kafka Topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic order-- 生产数据
kafka-console-producer.sh --broker-list master:9092 --topic order-- 聚合类函数在实时的Over窗口上只会产生追加的数据,没有更新
-- 最终需要维护的状态大小同partition by指定的字段有关
-- 1、统计每种商品的累计成交金额
select item-- 必须指定order by ,而且必须使用时间列升序,-- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与计算-- 相当于sum只能做累加求和,来一条数据累加一次,无法做全局聚合,sum(price) over (partition by item order by bidtime) as sum_price
from order_kafka_source
;-- 2、统计每种商品的最大成交金额
select item-- 必须指定order by ,而且必须使用时间列升序,-- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与统计-- 来一条数据就会输出一条数据,max会将截止到当前时间戳最大的数据中取最大的一个值,max(price) over (partition by item order by bidtime) as max_price
from order_kafka_source
;-- 3、统计每种商品的最小、平均成交金额/交易次数 同上-- 4、统计最近10分钟内每种商品的累计成交金额

http://www.ppmy.cn/server/95819.html

相关文章

Python爬虫技术 第33节 未来趋势和技术发展

网络爬虫&#xff08;Web crawler&#xff09;是一种自动化的程序或脚本&#xff0c;用于遍历互联网上的网页并收集所需的数据。爬虫技术在许多领域都有广泛的应用&#xff0c;从搜索引擎到数据分析、市场研究、竞争情报等。 爬虫技术的基础 基本原理&#xff1a; URL管理&…

ARM 架构硬件新趋势:嵌入式领域的未来

目录 目录 一、ARM 架构概述 二、新趋势一&#xff1a;AI 加速器集成 三、新趋势二&#xff1a;更高效的电源管理 四、新趋势三&#xff1a;安全性增强 五、结语 随着物联网 (IoT) 和边缘计算的发展&#xff0c;ARM 架构在嵌入式系统中的应用越来越广泛。从智能手机到智能…

【转行大模型 01】大数据已死?AI当道!我为何想转战大模型

作为一名经验丰富的大数据开发工程师&#xff0c;我最近决定扩展自己的职业方向&#xff0c;转向大模型应用开发。这个决定源于对技术趋势的观察、对个人发展的思考&#xff0c;以及对我们行业未来的预判。让我从一个大数据工程师的视角&#xff0c;逐步分析这个决定背后的逻辑…

MySQL事务,锁,MVCC总结

mysql中最重要的就是事务&#xff0c;其四大特性让我们维持了数据的平衡&#xff0c;一致。那么事务究竟是什么&#xff0c;与什么相关&#xff0c;他的使用步骤&#xff0c;以及使用过程中我们会遇到什么问题呢&#xff1f;下面我们一起学习交流! 1.MySQL的存储引擎&#xff…

算法【构建前缀信息解决子数组问题】

本文需要对掌握哈希表的用法。 构建某个前缀信息比如最早出现、最晚出现、出现次数等&#xff0c;是很常见的技巧。除此之外&#xff0c;还有很多种类的前缀信息可以构建出来&#xff0c;解决很多子数组相关问题。下面通过几个题目加深对构建前缀信息这个方法的理解。 题目一 …

《深入浅出WPF》学习笔记六.手动实现Mvvm

《深入浅出WPF》学习笔记六.手动实现Mvvm demo的层级结构,Mvvm常用项目结构 依赖属性基类实现 具体底层原理后续学习中再探讨,可以粗浅理解为,有一个全局对象使用list或者dic监听所有依赖属性,当一个依赖属性变化引发通知时,就会遍历查询对应的字典&#xff0c;通知View层进行…

135. 分发糖果【 力扣(LeetCode) 】

一、题目描述 n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求&#xff0c;给这些孩子分发糖果&#xff1a; 每个孩子至少分配到 1 个糖果。相邻两个孩子评分更高的孩子会获得更多的糖果。 请你给每个孩子分发糖果&#xff0c;计算并返回…

hive自动安装脚本

使用该脚本注意事项 安装hive之前确定机子有网络。或者yum 更改为本地源&#xff0c;因为会使用epel仓库下载一个pv的软件使用该脚本前提是自行安装好mysql数据库准备好tomcat软件包&#xff0c;该脚本使用tomcat9.x版本测试过能正常执行安装成功&#xff0c;其他版本没有测试…