使用 Flink CDC 构建 Streaming ETL

devtools/2025/1/8 13:40:49/

安装并配置 Flink

1.下载 Flink 1.20.0
curl -k -O https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
2.解压文件
tar -xzvf flink-1.20.0-bin-scala_2.12.tgz
3.移动到目标目录(可选)

将解压后的 Flink 目录移动到 /opt 或其他目标位置(可选):

sudo mv flink-1.20.0 /opt/flink
4. 配置环境变量

为了方便使用,可以将 Flink 的 bin 目录添加到系统的 PATH 环境变量中。编辑 ~/.bashrc 文件:

vi ~/.bashrc

添加以下内容:

export FLINK_HOME=/opt/flink
export PATH=$FLINK_HOME/bin:$PATH

保存并退出文件后,运行以下命令使修改生效:

source ~/.bashrc
5. 配置 Flink

Flink 默认已经配置好了一些基本设置,不需要集群配置,可以跳过 masters 和 workers 文件的配置。只需要调整一些参数,比如内存配置,或者其他作业配置。

可以根据需要修改 Flink 配置文件 config.yaml,文件位于 /opt/flink/conf 目录下:

目前配置中的 bind-host 设置为 localhost,这意味着 Flink 只能绑定到本地接口,无法接收来自其他机器的请求。需要将其改为 0.0.0.0,使 Flink 能够绑定所有网络接口。

cd /opt/flink/conf
vi config.yaml
jobmanager:bind-host:0.0.0.0
rpc:address:0.0.0.0port:6123memory:process:size:1600mexecution:failover-strategy:regiontaskmanager:bind-host:0.0.0.0host:0.0.0.0numberOfTaskSlots:1memory:process:size:1728mparallelism:default:1rest:address:0.0.0.0bind-address:0.0.0.0
6. 启动 Flink
cd /opt/flink
./bin/start-cluster.sh

关闭Flink

./bin/stop-cluster.sh
7. 访问 Flink Web UI

Flink 会启动一个 Web UI,默认地址为http://<your_server_ip>:8081 例如:http://192.168.173.67:8081,可以在浏览器中访问它来查看 Flink 集群的状态、提交作业等。

830acf27fac8b1665693da6fe478734b.png

使用 Flink CDC 构建 Streaming ETL

假设,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。 对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。

接下来的内容将介绍如何使用 Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:

92bd35a864146054a10317d951bf05f5.png

下载 Flink 和所需要的依赖包

下载下面列出的依赖包,并将它们放到目录 lib/ 下:

  • • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar

  • • flink-sql-connector-mysql-cdc-3.2.1.jar

  • • flink-sql-connector-postgres-cdc-3.2.1.jar

准备数据

在 MySQL 数据库中准备数据

创建数据库和表productsorders 并插入数据

-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATETABLE products (id INTEGERNOTNULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255)NOTNULL,description VARCHAR(512)
);
ALTERTABLE products AUTO_INCREMENT =101;INSERTINTO products
VALUES(default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");CREATETABLE orders (order_id INTEGERNOTNULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOTNULL,customer_name VARCHAR(255)NOTNULL,price DECIMAL(10,5)NOTNULL,product_id INTEGERNOTNULL,order_status BOOLEANNOTNULL-- Whether order has been placed
) AUTO_INCREMENT =10001;INSERTINTO orders
VALUES(default,'2020-07-30 10:08:22','Jark',50.50,102,false),
(default,'2020-07-30 10:11:09','Sally',15.00,105,false),
(default,'2020-07-30 12:00:30','Edward',25.25,106,false);
在 Postgres 数据库中准备数据

创建表shipments,并插入数据

-- PG
CREATETABLE shipments (shipment_id SERIAL NOTNULLPRIMARY KEY,order_id SERIAL NOTNULL,origin VARCHAR(255)NOTNULL,destination VARCHAR(255)NOTNULL,is_arrived BOOLEANNOTNULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH1001;
ALTERTABLEpublic.shipments REPLICA IDENTITYFULL;
INSERTINTO shipments
VALUES(default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
启动 Flink SQL CLI

启动 Flink SQL CLI

./bin/sql-client.sh

启动成功后,可以看到如下的页面:

5805911356640be994448a17b5530c00.png

在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 productsordersshipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

-- Flink SQL
FlinkSQL>CREATETABLE products (id INT,name STRING,description STRING,
PRIMARY KEY (id)NOT ENFORCED
)WITH(
'connector'='mysql-cdc',
'hostname'='localhost',
'port'='3306',
'username'='root',
'password'='123456',
'database-name'='mydb',
'table-name'='products'
);FlinkSQL>CREATETABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10,5),product_id INT,order_status BOOLEAN,
PRIMARY KEY (order_id)NOT ENFORCED
)WITH(
'connector'='mysql-cdc',
'hostname'='localhost',
'port'='3306',
'username'='root',
'password'='123456',
'database-name'='mydb',
'table-name'='orders'
);FlinkSQL>CREATETABLE shipments (shipment_id INT,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,
PRIMARY KEY (shipment_id)NOT ENFORCED
)WITH(
'connector'='postgres-cdc',
'hostname'='localhost',
'port'='5432',
'username'='postgres',
'password'='postgres',
'database-name'='postgres',
'schema-name'='public',
'table-name'='shipments',
'slot.name'='flink');

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中

-- Flink SQL
FlinkSQL>CREATETABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10,5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,shipment_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,
PRIMARY KEY (order_id)NOT ENFORCED
)WITH(
'connector'='elasticsearch-7',
'hosts'='http://localhost:9200',
'index'='enriched_orders');
关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中

-- Flink SQL
Flink SQL> INSERT INTO enriched_ordersSELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrivedFROM orders AS oLEFT JOIN products AS p ON o.product_id = p.idLEFT JOIN shipments AS s ON o.order_id = s.order_id;

现在,就可以在 Kibana 中看到包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders.

c2a9028e83da08197fd02c0d3795ae02.png

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.

5e926aeab3f02b870b20c3c4c43112df.png

接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:

  1. 1. 在 MySQL 的orders表中插入一条数据--MySQL
    INSERT INTO orders
    VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

  2. 2. 在 Postgres 的shipment表中插入一条数据--PG
    INSERT INTO shipments
    VALUES (default,10004,'Shanghai','Beijing',false);

  3. 3. 在 MySQL 的orders表中更新订单的状态--MySQL
    UPDATE orders SET order_status = true WHERE order_id = 10004;

  4. 4. 在 Postgres 的shipment表中更新物流的状态--PG
    UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;

  5. 5. 在 MYSQL 的orders表中删除一条数据--MySQL
    DELETE FROM orders WHERE order_id = 10004;
    每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:

78d0a0e8758fd6e4fb58ff5ce7cdd84f.gif


http://www.ppmy.cn/devtools/148541.html

相关文章

算命网站源码PHP框架_附2025新版设计书教程

算命网站源码PHP设计书 1. 项目概述 1.1 项目背景 随着互联网的发展&#xff0c;越来越多的人对命理和占卜产生了兴趣。算命网站可以为用户提供个性化的命理分析、运势预测等服务。本项目旨在设计一个基于PHP的算命网站&#xff0c;方便用户在线获取命理服务。 1.2 项目目标…

stm32第一次烧录或者上电运行卡死问题分析

问题描述 单片机烧录代码&#xff08;刚上电&#xff09;无法立即运行&#xff0c;必须要复位一次或多次才能运行&#xff1b;跟踪调试会进入HardFault_Handler中断。 问题分析 烧录配置如下图&#xff0c;首先排除配置问题那么该问题就比较让人头大了&#xff0c;理论上&am…

概率论常用的分布公式

01 常见离散型分布及其概率分布、期望和方差公式 伯努利分布 概率分布&#xff1a;期望&#xff1a; E(X)p方差&#xff1a;D(X)p(1−p) 二项分布 概率分布&#xff1a;期望&#xff1a;E(X)np方差&#xff1a; D(X)np(1−p) 表示方法&#xff1a;X∼B(n,p) 泊松分布 概率分布…

镜舟科技2024年度回顾,坚持长期价值,进一步实现商业突破

2024年&#xff0c;镜舟科技实现了显著的商业增长&#xff0c;ARR&#xff08;年度经常性收入&#xff09;连续三年翻倍&#xff0c;NRR&#xff08;净留存率&#xff09;保持在130%以上&#xff0c;商业化客户数量超过120家。公司持续为客户创造长期价值&#xff0c;通过开源战…

浅谈棋牌游戏开发流程七:反外挂与安全体系——守护游戏公平与玩家体验

一、前言&#xff1a;为什么反外挂与安全这么重要&#xff1f; 对于任何一款线上棋牌游戏而言&#xff0c;公平性和玩家安全都是最重要的核心要素之一。如果游戏环境充斥着各式各样的外挂、作弊方式&#xff0c;不仅会毁坏玩家体验&#xff0c;更会导致游戏生态崩塌、口碑下滑…

PWR-STM32电源控制

一、原理 睡眠模式不响应其他操作&#xff0c;比如烧写程序&#xff0c;烧写时按住复位键松手即可下载&#xff0c;在禁用JTAG也可如此烧写程序。 对于低功耗模式可以通过RTC唤醒、外部中断唤醒、中断唤醒。 1、电源框图&#xff1a; VDDA主要负责模拟部分的供电、Vref和Vref-…

操作系统安全保护

9.1 概述 1&#xff09;概念 一般来说&#xff0c;操作系统的安全是指满足安全策略要求&#xff0c;具有相应的安全机制及安全功能&#xff0c;符合特定的安全标准&#xff0c;在一定约束条件下&#xff0c;能够抵御常见的网络安全威胁&#xff0c;保障自身的安全运行及资源安…

Java中使用JFreeChart生成甘特图

引言 甘特图是一种流行的项目管理工具&#xff0c;用于显示项目的进度和任务分配。它通过条形图显示任务的开始和结束时间&#xff0c;使项目经理能够直观地了解项目的整体情况。在Java开发中&#xff0c;JFreeChart是一个强大的开源图表库&#xff0c;能够生成各种类型的图表…