Ape-DTS:开源 DTS 工具,助力自建 MySQL、PostgreSQL 迁移上云

ops/2024/12/17 3:17:10/

Ape-DTS 是一款高效、轻量级且功能强大的开源工具,专注于解决数据迁移、同步、校验、订阅与加工的需求。无论是将自建的 MySQL/PostgreSQL 数据库迁移到云端,还是在不同数据库间进行数据迁移,Ape-DTS 都能为您提供便捷且可靠的解决方案。它特别适合于将自建 MySQL 数据库迁移到其他 MySQL 环境(如云端 MySQL、KubeBlocks MySQL),或者其他分析型数据库(例如 ClickHouse、StarRocks),以及消息队列(例如 Kafka)等场景。

https://github.com/apecloud/ape-dts

欢迎扫码添加小助手,备注「DTS」,进入交流群。

为什么选择 Ape-DTS

Ape-DTS 是一款旨在实现 any-to-any 的数据迁移工具:

  • 功能丰富: 支持多种数据库库表结构迁移数据全量迁移增量迁移数据校验订正复查数据订阅加工 等能力。
  • 支持广泛: 目前已支持数据库包括 MySQL,Postgres,Redis,Mongo,StarRocks,ClickHouse,Kafka 等。
  • 简单轻量: 不依赖第三方组件和额外存储,完整镜像解压后小于 100 MB。
  • 性能突出,使用 Rust 开发。

Ape-DTS 支持 MySQL,Postgres,Redis,Mongo,StarRocks,ClickHouse,Kafka 等数据库间的迁移,具体如下:

MySQL -> MySQLPostgreSQL -> PostgreSQLMongoDB -> MongoDBRedis -> RedisMySQL -> KafkaPostgreSQL -> KafkaMySQL -> StarRocksMySQL -> ClickHouseMySQL -> TiDBPostgreSQL -> StarRocks
全量迁移
增量同步
数据校验/订正/复查
库表结构迁移

功能亮点

  • 支持多种数据库间的同构异构数据迁移和同步。
  • 支持全量、增量任务的断点续传
  • 支持数据校验、订正。
  • 支持库、表、列级别的过滤和路由。
  • 针对不同源、目标、任务类型,实现不同的并发算法,提高性能。
  • 可加载用户 Lua 脚本,加工正在迁移/同步的数据。
  • 支持将数据发送到 Kafka,供用户自主消费。
  • 支持以 HTTP Server 的方式启动 Ape-DTS 并拉取增量数据,用户可使用 HTTP Client 获取数据并自主消费。

Ape-DTS 的性能表现

Ape-DTS 在多种场景下展现了卓越的性能表现。本文使用 sysbench 生成全量和增量数据,分别使用 Ape-DTS 和 Debezium 执行迁移任务,并对比结果。

以下是 MySQL -> MySQL 的测试,源端 MySQL和目标端 MySQL 均在 8C16G BCC(百度智能云云服务器)机器上使用 Docker 部署。

可以看到,Ape-DTS 的全量/增减迁移性能都显著优于 Debezium。在相同的节点规格(4C8G)下,Ape-DTS 的全量迁移性能约为 Debezium 的 31 倍,Ape-DTS 的增量迁移性能约为 Debezium 的 9 倍

测试一:全量数据迁移

同步方式节点规格RPS(Rows per Second)源 MySQL 负荷(CPU/内存)目标 MySQL 负荷(CPU/内存)
Ape-DTS1C2G714288.2% / 5.2%211% / 5.1%
Ape-DTS2C4G9940314.0% / 5.2%359% / 5.1%
Ape-DTS4C8G12658213.8% / 5.2%552% / 5.1%
Debezium4C8G405121.5% / 5.2%51.2% / 5.1%

测试二:增量数据迁移

同步方式节点规格RPS(Rows per Second)源 MySQL 负荷(CPU/内存)目标 MySQL 负荷(CPU/内存)
Ape-DTS1C2G1500218.8% / 5.2%467% / 6.5%
Ape-DTS2C4G2469218.1% / 5.2%687% / 6.5%
Ape-DTS4C8G2628718.2% / 5.2%685% / 6.5%
Debezium4C8G295120.4% / 5.2%98% / 6.5%

镜像对比

工具镜像大小
Ape-DTS86.4 MB
Debezium1.38 GB

如何使用 Ape-DTS 迁移 MySQL?

以下是 Ape-DTS 在 自建 MySQL 数据库迁移到 KubeBlocks MySQL 场景中的实际使用示例。

更多示例可参考: https://github.com/apecloud/ape-dts/tree/main/docs/en/tutorial。

配置文件差异总结

针对不同的任务,配置文件中的 extract_type, sink_type 等其他配置不同。

以下是一个简略的配置文件差异总结。更多具体配置,可参考官网上教程、模板及配置说明:https://github.com/apecloud/ape-dts。

任务类型extract_typesink_type特殊配置说明
库表结构迁移structstructdo_dbs指定需要迁移的数据库
全量迁移snapshotwriteparallel_type=snapshot全量抓取数据快照
增量同步cdcwriteserver_id, do_events基于 binlog 同步数据变更
数据校验snapshotcheck日志输出比较源库与目标库数据一致性
数据订正check_logwritecheck_log_dir指定校验结果日志的路径,用于订正任务
数据复查check_logcheckcheck_log_dir指定校验结果日志的路径

1. 准备工作

1.1 环境准备
工具准备

该示例中使用的 DTS_IMAGE 版本如下:

APE_DTS_IMAGE="docker.io/apecloud/ape-dts:2.0.12"
源库

用本地 Docker 搭建 MySQL。

docker run -d --name some-mysql-1 \
--platform linux/x86_64 \
-it \
-p 3307:3306 -e MYSQL_ROOT_PASSWORD="123456" \"$MYSQL_IMAGE" --lower_case_table_names=1 --character-set-server=utf8 --collation-server=utf8_general_ci \--datadir=/var/lib/mysql \--user=mysql \--server_id=1 \--log_bin=/var/lib/mysql/mysql-bin.log \--max_binlog_size=100M \--gtid_mode=ON \--enforce_gtid_consistency=ON \--binlog_format=ROW \--default_time_zone=+08:00
  • 将主机的端口 3307 映射到容器的端口 3306
  • 设置 MySQL root 密码为 123456
  • 使用由 $MYSQL_IMAGE 指定的镜像。

记录源端的 URL:

url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
目标库

我们在 ACK 上使用 KubeBlocks 搭建了 MySQL 集群,更多集群运维操作可参考 KubeBlocks MySQL Examples。

  1. 创建集群。

    # 创建 MySQL 集群
    kbcli cluster create mycluster --cluster-definition mysql -n demo
    
  2. 暴露服务。这里我们通过 LoadBalancer 暴露服务地址。

    # 将集群暴露至公网
    kbcli cluster expose mycluster --type internet --enable=true -ndemo
    

查看到公网地址为: 47.xx.xx.xx,记录为你的 目标地址

记录目标端地址:

url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled
1.2 数据准备

登录本地 MySQL,创建测试用的数据库和表。

mysql -h127.0.0.1 -uroot -p123456 -P3307CREATE DATABASE test_db;
CREATE TABLE test_db.tb_1(id int, value int, primary key(id));
CREATE TABLE test_db.tb_2(id int, value text, primary key(id));INSERT INTO test_db.tb_1 VALUES(1,1),(2,2),(3,3),(4,4);
INSERT INTO test_db.tb_2 VALUES(5,'a'),(6,'b'),(7,'c'),(8,'d');

2. 库表结构迁移

创建任务配置

请将以下示例配置中的 extractor.url 和 sinker.url 替换为前面记录的源端 URL 和目标端 URL。

核心配置:

  • extract_type=structsink_type=struct:表示迁移的是库表结构。
  • do_dbs:指定需要迁移的数据库
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
extract_type=struct
db_type=mysql
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled[sinker]
sink_type=struct
db_type=mysql
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled[filter]
do_dbs=test_db[parallelizer]
parallel_type=serial[pipeline]
buffer_size=100
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini 
检查目标库
  1. 登录目标库。

    mysql -h<目标地址> -uape_test -pApe123456789
    
  2. 查看数据。

    mysql> SHOW CREATE TABLE test_db.tb_1;
    mysql> SHOW CREATE TABLE test_db.tb_2;
    
3. 同步全量数据
创建任务配置

核心配置

  • extract_type=snapshot:表示全量迁移,抓取源数据库的快照。
  • sink_type=write:表示将数据写入目标数据库
  • parallel_type=snapshotparallel_size:控制快照并发级别,提高全量迁移效率。
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=snapshot
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled[sinker]
db_type=mysql
sink_type=write
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled[filter]
do_dbs=test_db
do_events=insert[parallelizer]
parallel_type=snapshot
parallel_size=8[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini 
检查目标库
  1. 登录目标库。

    mysql -h<目标地址> -uape_test -pApe123456789
    
  2. 检查数据。

    mysql> SELECT * FROM test_db.tb_1;
    +----+-------+
    | id | value |
    +----+-------+
    |  1 |     1 |
    |  2 |     2 |
    |  3 |     3 |
    |  4 |     4 |
    +----+-------+mysql> SELECT * FROM test_db.tb_2;
    +----+-------+
    | id | value |
    +----+-------+
    |  5 | a     |
    |  6 | b     |
    |  7 | c     |
    |  8 | d     |
    +----+-------+
    
4. 增量任务
创建任务配置

核心配置:

  • extract_type=cdc:表示增量同步,基于源库的 binlog 或 WAL 日志抓取数据变更。
  • sink_type=write:表示将数据写入目标数据库
  • server_id:Ape-DTS 在该 MySQL 复制组中的标识,由用户指定,取值 [1-2^32 - 1],不得与该复制组中其他 server_id 相同。
  • do_events:指定需同步的事件类型(如 insert, update, delete)。
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=cdc
server_id=2000
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled[filter]
do_dbs=test_db
do_events=insert,update,delete[sinker]
db_type=mysql
sink_type=write
batch_size=200
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled[parallelizer]
parallel_type=rdb_merge
parallel_size=8[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini 
修改源库数据
  1. 登录本地 MySQL。

    mysql -h127.0.0.1 -uroot -p123456 -uroot -P3307
    
  2. 修改数据。

    DELETE FROM test_db.tb_1 WHERE id=1;UPDATE test_db.tb_1 SET value=2000000 WHERE id=2;INSERT INTO test_db.tb_2 VALUES(9, 'f');
    
检查目标库
  1. 登录目标 MySQL。

    mysql -h<目标地址> -uape_test -pApe123456789
    
  2. 查看目标端数据。

    mysql> SELECT * FROM test_db.tb_1;
    +----+---------+
    | id | value   |
    +----+---------+
    |  2 | 2000000 |
    |  3 |       3 |
    |  4 |       4 |
    +----+---------+mysql> SELECT * FROM test_db.tb_2;
    +----+-------+
    | id | value |
    +----+-------+
    |  5 | a     |
    |  6 | b     |
    |  7 | c     |
    |  8 | d     |
    |  9 | f     |
    +----+-------+
    

    可以看到增量数据都已经同步了。

5. 数据校验
在目标端修改数据
  1. 登录目标 MySQL。

    mysql -h<目标地址> -uape_test -pApe123456789
    
  2. 在目标端修改数据,构造和源库的差异。

    DELETE FROM test_db.tb_1 WHERE id=4;         # 删除tb_1数据
    UPDATE test_db.tb_1 SET value=1 WHERE id=2;  # 修改tb_1数据DELETE FROM test_db.tb_2 WHERE id=5;         # 删除tb_2数据
    
创建任务配置

核心配置

  • extract_type=snapshot:校验基于源库的全量数据快照。
  • sink_type=check:表示校验目标库与源库数据是否一致。
  • 输出日志:校验的差异会记录在日志文件中(如缺失数据和不一致的数据)。
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=snapshot
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled[sinker]
db_type=mysql
sink_type=check
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled[filter]
do_dbs=test_db
do_events=insert[parallelizer]
parallel_type=rdb_check
parallel_size=8[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
-v "/tmp/ape_dts/check_data_task_log/:/logs/" \
"$APE_DTS_IMAGE" /task_config.ini 
查看校验结果

源库为基准,检查目标库的数据缺失和不同,校验结果以日志文件输出。

  1. 检查数据缺失。

    cat /tmp/ape_dts/check_data_task_log/check/miss.log
    

    可以看到具体的缺失数据信息。

    {"log_type":"Miss","schema":"test_db","tb":"tb_1","id_col_values":{"id":"4"},"diff_col_values":{}}
    {"log_type":"Miss","schema":"test_db","tb":"tb_2","id_col_values":{"id":"5"},"diff_col_values":{}}
    
  2. 检查数据差异。

    cat /tmp/ape_dts/check_data_task_log/check/diff.log
    

    可以看到输出如下,diff_col_values展示了差异的具体内容。

    {"log_type":"Diff","schema":"test_db","tb":"tb_1","id_col_values":{"id":"2"},"diff_col_values":{"value":{"src":"2000000","dst":"1"}}}
    
6. 数据订正
  • 根据校验日志,反查源库,订正目标库。
创建任务配置

核心配置

  • extract_type=check_log:表示基于校验日志执行数据订正任务。
  • sink_type=write:将订正后的数据写回目标库。
  • check_log_dir:指定校验日志的路径,用于订正任务。
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=check_log
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
check_log_dir=./check_data_task_log[sinker]
db_type=mysql
sink_type=write
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled[filter]
do_events=*[parallelizer]
parallel_type=rdb_check
parallel_size=8[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
-v "/tmp/ape_dts/check_data_task_log/check/:/check_data_task_log/" \
"$APE_DTS_IMAGE" /task_config.ini 
查看订正后的结果
  1. 登录目标库。

    mysql -h<目标地址> -uape_test -pApe123456789
    
  2. 查看数据。

    mysql> SELECT * FROM test_db.tb_1;
    +----+---------+
    | id | value   |
    +----+---------+
    |  2 | 2000000 |
    |  3 |       3 |
    |  4 |       4 |
    +----+---------+mysql> SELECT * FROM test_db.tb_2;
    +----+-------+
    | id | value |
    +----+-------+
    |  5 | a     |
    |  6 | b     |
    |  7 | c     |
    |  8 | d     |
    |  9 | f     |
    +----+-------+
    

    可以看到目标端被删除和更新的数据,都已经被订正了。

7. 数据复查
  • 根据校验日志,反查源库,再次校验目标库
  • 和全量校验的区别在于校验数据的范围:数据复查限定在校验出的 缺失/不同 的数据
修改目标库数据,构造和源库的差异
  1. 登录目标库。

    mysql -h<目标地址> -uape_test -pApe123456789
    
  2. 修改数据。

    DELETE FROM test_db.tb_1 WHERE id=4;   # 删除tb_1数据
    
创建任务配置

核心配置

  • extract_type=check_log:基于校验日志。
  • sink_type=check 用于复查目标库数据。
  • check_log_dir:指定校验日志的路径。
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=check_log
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
check_log_dir=./check_data_task_log[sinker]
db_type=mysql
sink_type=check
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled[filter]
do_events=*[parallelizer]
parallel_type=rdb_check
parallel_size=8[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
-v "/tmp/ape_dts/check_data_task_log/check/:/check_data_task_log/" \
-v "/tmp/ape_dts/review_data_task_log/:/logs/" \
"$APE_DTS_IMAGE" /task_config.ini 
查看复查结果
  1. 查看数据缺失。

    cat /tmp/ape_dts/review_data_task_log/check/miss.log
    

    可以看到输入日志显示 {“id”:“4”} 缺失。

    {"log_type":"Miss","schema":"test_db","tb":"tb_1","id_col_values":{"id":"4"},"diff_col_values":{}}
    
  2. 查看数据差异。

  • /tmp/ape_dts/review_data_task_log/check/diff.log 为空,符合预期。

更多教程

  • 更多任务类型、教程、任务配置,请参考 Ape-DTS 主页。

http://www.ppmy.cn/ops/142528.html

相关文章

DBApi-相关事宜记录

1、源码的版本是2.3.1 2、小问题记录 2.1、根pom文件报错 <configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding> </configuration> 解决办法&#xff1a; <plugin><groupId>o…

Scala符号使用大全

Scala 中的符号&#xff08;Symbol&#xff09;是一种用于表示名称或标识符的类型&#xff0c;通常用于反射&#xff08;reflection&#xff09;。在 Scala 2.10 引入了 scala.reflect.runtime.universe.Symbol 类型。 以下是 Scala 中使用符号的一些常见示例&#xff1a; 获取…

关于Python程序消费Kafka消息不稳定问题的处理方法

在使用Python程序消费Kafka消息的过程中&#xff0c;有时会遇到各种不稳定的情况&#xff0c;如自动提交偏移量无效、CommitFailedError错误等。这些问题不仅影响了数据处理的可靠性&#xff0c;还可能导致重复消费或丢失消息。本文将针对这两个常见问题提供详细的解决方案和最…

【大语言模型】ACL2024论文-26 在支持数据存在的情况下进行框架构建:以美国经济新闻为例研究

【大语言模型】ACL2024论文-26 在支持数据存在的情况下进行框架构建&#xff1a;以美国经济新闻为例研究 目录 文章目录 【大语言模型】ACL2024论文-26 在支持数据存在的情况下进行框架构建&#xff1a;以美国经济新闻为例研究目录摘要研究背景问题与挑战如何解决创新点算法模型…

工业大数据分析算法实战-day05

文章目录 day05分而治之中的MARS算法神经网络逼近能力解释 day05 今天是第5天&#xff0c;昨日从统计分析开始利用统计学的知识判断当前样本的分布以及估计总体的参数和假设检验的情况&#xff0c;以及介绍了线性回归算法的相关优化点&#xff0c;但是毕竟线性回归是线性划分的…

Nginx 缓存那些事儿:原理、配置和最佳实践

Nginx 缓存那些事儿&#xff1a;原理、配置和最佳实践 在当今的互联网世界&#xff0c;网站的访问量和数据处理量不断攀升&#xff0c;如何确保用户能够快速、稳定地访问我们的网站&#xff0c;已经成为每个运维工程师面临的挑战。幸运的是&#xff0c;Nginx 作为一款高性能的…

phpSpider如何实现登录态保持的数据爬取

在使用 PHP 实现一个名为 phpSpider 的爬虫程序时&#xff0c;如果需要保持登录态以进行数据爬取&#xff0c;通常需要模拟用户登录过程&#xff0c;并在后续请求中携带登录后产生的认证信息&#xff08;如 Cookies、Session ID 或 Token&#xff09;。以下是一个实现登录态保持…

第二部分:进阶主题 12 . 用户管理 --[MySQL轻松入门教程]

第二部分&#xff1a;进阶主题 12 . 用户管理 --[MySQL轻松入门教程] MySQL 用户管理涉及到创建用户、授予和撤销权限、修改密码以及删除用户等操作。以下是一些常用的 MySQL 用户管理命令&#xff1a; 创建用户 下面是两个创建 MySQL 用户的示例&#xff0c;包括授予用户权…