简化数据流:Apache SeaTunnel实现多表同步的高效指南

ops/2024/11/15 4:49:45/

Apache SeaTunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例说明如何实现这些功能。

单表 to 单表

一个source,一个sink。

从mysql同步到mysql,中间不做区分

env {# You can set flink configuration hereexecution.parallelism = 2job.mode = "BATCH"
}
source{Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "select * from base_region"}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform/sql
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "insert into base_region(id,region_name) values(?,?)"}
}

执行任务

./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf

单表 to 多表

一个source,多个sink。

从MySQL同步到MySQL,将一个用户表数据同步过去,中间通过2个sql组件分布将男性用户和女性用户分开,在sink阶段分别插入到不同的表:

env {execution.parallelism = 2job.mode = "BATCH"
}
source {Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"result_table_name="t_user"query = "select * from t_user;"}
}transform {Sql {source_table_name = "t_user"result_table_name = "t_user_nan"query = "select id,name,birth,gender from t_user where gender ='男';"}Sql {source_table_name = "t_user"result_table_name = "t_user_nv"query = "select id,name,birth,gender from t_user where gender ='女';"}
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nan"query =  "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"}jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nv"query =  "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"}
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf

多表 to 单表

多个source,一个sink。

假如有一张交换器使用情况表,一张路由器使用情况表,目标表是将这种数据合在一起的olap表。

表结构如下:

-- dw 源表1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
) ;INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', '交换器1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', '交换器2', 65);-- dw 源表2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
);INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', '路由器1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', '路由器2', 46);-------------------------------------------------------------------------------
-- olap 目标表
CREATE TABLE `device_performance` (`id` INT NOT NULL AUTO_INCREMENT COMMENT '表主键',`event_time` VARCHAR(32) NOT NULL COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) NOT NULL COMMENT '设备名称',`cpu_usage` FLOAT NOT NULL COMMENT 'CPU利用率单位是%',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) COMMENT='设备状态';

将交换器数据和路由器数据一起同步到olap目标表,总结通过sql组件处理:

env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}

执行任务:

./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf

作业成功!

多表 to 多表

多个source,多个sink。

将交换器使用情况数据和路由器使用情况数据分别同步到对应的目标表,中间sql组件处理

env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance_switch  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance_router  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}

结语

综上所述,Apache SeaTunnel多表同步技术具有高效、实时、可靠和灵活的特点,在企业的数据同步领域发挥着重要作用。借助Apache SeaTunnel多表同步功能,企业能够更好地实现不同系统和数据库之间数据的无缝流转,提升数据管理和利用的效率,为业务发展提供有力支持。希望本文能够帮助读者更好地了解和应用Apache SeaTunnel多表同步,从而为企业数据同步带来更多可能性。

原文链接:https://blog.csdn.net/weixin_44586883/article/details/136049897

本文由 白鲸开源科技 提供发布支持!


http://www.ppmy.cn/ops/85872.html

相关文章

GitHub 详解教程

1. 引言 GitHub 是一个用于版本控制和协作的代码托管平台,基于 Git 构建。它提供了强大的功能,使开发者可以轻松管理代码、追踪问题、进行代码审查和协作开发。 2. Git 与 GitHub 的区别 Git 是一个分布式版本控制系统,用于跟踪文件的更改…

国科大作业考试资料-人工智能原理与算法-2024新编-第十一次作业整理

1、有一位教授想知道学生是否睡眠充足。每天,教授观察学生在课堂 上是否睡觉,并观察他们是否有红眼。教授获得如下的领域知识: (1)没有观察数据时,学生睡眠充足的先验概率为0.7。 (2)给定学生前一天睡眠充足为条件,学生在晚上睡眠充足的概率是0.8;如果前一天睡眠不…

Linux 常用命令详解:从基础操作到进阶应用

Linux 常用命令详解:从基础操作到进阶应用 简介 Linux 是一个强大且灵活的操作系统,它在服务器、开发环境和个人计算机中得到了广泛的应用。Linux 的命令行界面提供了丰富的工具和命令,可以帮助用户高效地管理系统、处理文件、监控性能和进…

[HTML]一文掌握

背景知识 主流浏览器 浏览器是展示和运行网页的平台, 常见的五大浏览器有 IE浏览器、火狐浏览器(Firefox)、谷歌浏览器(Chrome)、Safari浏览器、欧朋浏览器(Opera) 渲染引擎 浏览器解析代码渲…

阿里云服务器 篇六:GitHub镜像网站

文章目录 系列文章搭建镜像网站的2种方式使用 Web 抓取工具 (Spider 技术)使用 Web 代理服务器使用 nginx 搭建GitHub镜像网站基础环境搭建添加对 github.com 的转发配置添加对 raw.githubusercontent.com 的转发配置配置更改注意事项(可选)缓存优化为新增设的二级域名配置DN…

【区块链】JavaScript连接web3钱包,实现测试网络中的 Sepolia ETH余额查询、转账功能

审核看清楚了 ! 这是以太坊测试网络!用于学习的测试网络!!! 有关web3 和区块链的内容为什么要给我审核不通过? 别人凭什么可以发! 目标成果: 实现功能分析: 显示账户信…

k8s学习——安装istio之dns卡壳

我准备使用istio来替代原来的traefic网关和consul服务注册发现的方案,但在安装istio过程中遇到了一些问题,把解决的过程记录下来,便于今后遇到类似问题做个参考。 istio安装的中文文档地址:Istio Prelim 1.23 / 文档 参照Istio …

Mybatis——动态SQL

本文是在上一篇博客的代码基础上进行的,不清楚文件结构的请先回顾上一篇博客Mybatis——快速入门-CSDN博客 xml映射文件 介绍 MyBatis 的 XML 映射文件是 MyBatis 框架中用于定义 SQL 语句、映射规则等配置的重要文件。它允许开发者将 SQL 语句与 Java 对象映射关…