Flink SQL 从一个SOURCE 写入多个Sink端实例

ops/2024/12/24 4:09:08/

一. 背景

FLINK 任务从一个数据源读取数据, 写入多个sink端.

二. 官方实例

写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。
sql">
--源表
CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT
) WITH ('connector' = 'datagen'
);--结果表A
CREATE TEMPORARY TABLE blackhole_sinkA(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole' 
);--结果表B
CREATE TEMPORARY TABLE blackhole_sinkB(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole' 
);--DML
BEGIN STATEMENT SET;      --写入多个Sink时,必填。
INSERT INTO blackhole_sinkA SELECT UPPER(name), sum(score) FROM datagen_source GROUP BY UPPER(name);
INSERT INTO blackhole_sinkB SELECT LOWER(name), max(score) FROM datagen_source GROUP BY LOWER(name);
END;      --写入多个Sink时,必填。

三. 实操

3.1. 启动Standlone集群

进入到flink引擎包目录, 启动Standlone模式.

./bin/start-cluster.sh

flink__sqlclient_54">3.2. 启动flink sql-client.

./bin/sql-client.sh embedded

sql_57">3.3. 执行sql

sql">Flink SQL> CREATE TEMPORARY TABLE datagen_source (
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'datagen'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkA(
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkB(
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> BEGIN STATEMENT SET;
[INFO] Begin a statement set.Flink SQL> INSERT INTO blackhole_sinkA
>   SELECT UPPER(name), sum(score)
>   FROM datagen_source
>   GROUP BY UPPER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> INSERT INTO blackhole_sinkB
>   SELECT LOWER(name), max(score)
>   FROM datagen_source
>   GROUP BY LOWER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> END;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 37a1390129c356374601a267cb8080b6

flink_ui_107">3.4. 查看flink ui

查看flink ui页面,验证结论.

http://master01:8081/#/job/37a1390129c356374601a267cb8080b6/overview

在这里插入图片描述


http://www.ppmy.cn/ops/144471.html

相关文章

linux zip unzip 命令的使用

在 Linux 系统中,zip 和 unzip 命令用于创建和解压缩 ZIP 文件。这是非常常见的文件管理操作。以下是它们的具体用法: 1. zip 命令 zip 用于压缩文件或目录为 .zip 格式。 基本语法 zip [选项] 压缩包名称.zip 文件或目录常见示例 1.1 压缩单个文件…

数据分析和AI丨知识图谱,AI革命中数据集成和模型构建的关键推动者

人工智能(AI)已经吸引了数据科学家、技术领导者以及任何使用数据进行商业决策者的兴趣。绝大多数企业都希望利用人工智能技术来增强洞察力和生产力,而对于这些企业而言,数据集的质量差成为了最主要的障碍。 数据源需要进行清洗且明…

电脑使用CDR时弹出错误“计算机丢失mfc140u.dll”是什么原因?“计算机丢失mfc140u.dll”要怎么解决?

电脑使用CDR时弹出“计算机丢失mfc140u.dll”错误:原因与解决方案 在日常电脑使用中,我们时常会遇到各种系统报错和文件丢失问题。特别是当我们使用某些特定软件,如CorelDRAW(简称CDR)时,可能会遇到“计算…

模拟法简介(蓝桥杯)

模拟法,顾名思义,就是利用计算机模拟问题的求解过程,从而得到问题的解。模拟法由于简单,因此又被称为“不是算法的算法”! 模拟法是学习算法的基础,通过模拟可以学习编程的各类技巧,提升初学者建…

微服务-02

在微服务-01中,我们复习了微服务的拆分,由于每个微服务都有不同的地址或端口,入口不同,相信大家在与前端联调的时候发现了一些问题: 请求不同数据时要访问不同的入口,需要维护多个入口地址,麻烦…

vue3+vite 引入动画组件库 Inspira UI

关于Inspira UI Inspira UI不是传统的组件库。相反,它是精选的优雅组件集合,您可以轻松将其集成到您的应用程序中。只需选择所需的组件,复制代码,然后自定义以适合您的项目即可。您可以随意使用和修改代码! 官网地址…

华为、华三交换机纯Web下如何创关键VLANIF、操作STP参数

华为交换机WEB操作 使用的是真机S5735,目前主流的版本都适用(V1R5~V2R1的就不在列了,版本太老了,界面完全不一样,这里调试线接的console口,电脑的网络接在ETH口) 「模拟器、工具合集」复制整段内…

C++如何处理对象的生命周期管理?

在 C 中,对象的生命周期管理至关重要,尤其是涉及动态内存分配的情况下。管理对象生命周期的核心是确保对象在需要时被创建,不再需要时被销毁,并避免资源泄漏或悬空指针问题。以下是常见的对象生命周期管理方法和技巧: …