如何在 Java 中使用 Canal 同步 MySQL 数据到 Redis

ops/2024/11/8 12:59:36/

文章目录

  • 一、引言
  • 二、工作原理
    • 1. MySQL主备复制原理
    • 2. canal 工作原理
  • 三、环境准备
    • 1. 安装和配置 MySQL
    • 2. 安装和配置 Canal
    • 3. 安装和配置 Redis
  • 四、开发 Java 应用
    • 1. 添加依赖
    • 2. 编写 Canal 客户端代码
    • 3. 运行和测试
      • 3.1 启动 Canal 服务:
      • 3.2 启动 Redis 服务:
      • 3.3 启动 Java 应用:
      • 3.4 测试数据同步:
  • 五、注意事项
  • 六、结论
  • 七、参考资料

一、引言

在现代微服务架构中,数据同步是一个常见的需求。特别是将 MySQL 数据实时同步到 Redis,可以显著提升应用的性能和响应速度。本文将详细介绍如何使用 Canal 实现这一目标。Canal 是阿里巴巴开源的一个数据库 Binlog 同步工具,可以实时捕获 MySQL 的 Binlog 日志并将其同步到其他存储系统。
项目地址:alibaba/canal

二、工作原理

1. MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

2. canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

三、环境准备

1. 安装和配置 MySQL

Canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row。编辑 MySQL 配置文件 my.cnf 或 my.ini,添加或修改以下内容:

[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

重启 MySQL 服务以使配置生效:

sudo service mysql restart

2. 安装和配置 Canal

下载并解压 Canal 服务端:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal
cd /opt/canal

编辑 Canal 配置文件 conf/example/instance.properties,配置 MySQL 服务器的相关信息:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*

启动 Canal 服务:

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

如果启动失败,注意检查配置文件conf/example/instance.properties的内容,还要注意JDK版本及配置。建议使用1.6.25。我用openjdk 21启动报错,改回JDK8u421启动成功。

3. 安装和配置 Redis

确保 Redis 服务已经安装并启动。可以在 Redis 客户端中执行以下命令检查:

redis-cli
ping

四、开发 Java 应用

1. 添加依赖

在你的 pom.xml 文件中添加 Canal 客户端和 Redis 客户端的依赖。以下是一个示例:

<dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>5.1.5</version></dependency>
</dependencies>

2. 编写 Canal 客户端代码

创建一个 Java 类来连接 Canal 服务并处理 Binlog 事件,将数据同步到 Redis:

java">package org.hbin.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;import java.net.InetSocketAddress;
import java.util.List;public class CanalToRedisSync {public static void main(String[] args) {// 创建 Canal 连接InetSocketAddress address = new InetSocketAddress("127.0.0.1", 11111);CanalConnector connector = CanalConnectors.newSingleConnector(address, "example", "", "");// 连接到 Canal 服务connector.connect();connector.subscribe(".*\\..*");connector.rollback();// 创建 Redis 客户端Jedis jedis = new Jedis("127.0.0.1", 6379);while (true) {Message message = connector.getWithoutAck(100); // 获取最多 100 条记录long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} else {handleEntry(message.getEntries(), jedis);}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}}private static void handleEntry(List<CanalEntry.Entry> entries, Jedis jedis) {for (CanalEntry.Entry entry : entries) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChange = null;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}CanalEntry.EventType eventType = rowChange.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {syncDelete(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());} else if (eventType == CanalEntry.EventType.INSERT) {syncInsert(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());} else {System.out.println("-------> before");syncUpdate(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());System.out.println("-------> after");syncUpdate(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());}}}}private static void syncInsert(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {StringBuilder key = new StringBuilder();StringBuilder value = new StringBuilder();for (CanalEntry.Column column : columns) {if (column.getName().equals("id")) {key.append(column.getValue());} else {value.append(column.getName()).append(":").append(column.getValue()).append(",");}}System.out.println("Insert: " + key.toString() + " -> " + value.toString());jedis.hset(schema + ":" + table, key.toString(), value.toString());}private static void syncUpdate(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {StringBuilder key = new StringBuilder();StringBuilder value = new StringBuilder();for (CanalEntry.Column column : columns) {if (column.getName().equals("id")) {key.append(column.getValue());} else {value.append(column.getName()).append(":").append(column.getValue()).append(",");}}System.out.println("Update: " + key.toString() + " -> " + value.toString());jedis.hset(schema + ":" + table, key.toString(), value.toString());}private static void syncDelete(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {StringBuilder key = new StringBuilder();for (CanalEntry.Column column : columns) {if (column.getName().equals("id")) {key.append(column.getValue());}}System.out.println("Delete: " + key.toString());jedis.hdel(schema + ":" + table, key.toString());}
}

3. 运行和测试

3.1 启动 Canal 服务:

sh /opt/canal/bin/startup.sh

3.2 启动 Redis 服务:

确保 Redis 服务已经启动,可以在 Redis 客户端中执行以下命令检查:

redis-cli
ping

3.3 启动 Java 应用:

编译并运行上述 Java 应用,确保 Canal 服务和 MySQL 服务器正常运行。

3.4 测试数据同步:

在 MySQL 中插入、更新或删除数据,观察 Java 应用是否能够实时捕获这些变化并将数据同步到 Redis。
相关SQL如下:

drop database if exists canal;
create database canal;
use canal;drop table if exists user;
create table user(`id` bigint AUTO_INCREMENT primary key,`name` varchar(20) NOT NULL,`age` tinyint DEFAULT 0,`detail` varchar(100) DEFAULT '',`create_time` date,`update_time` date
);insert into user value(1, 'Tom1', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(2, 'Tom2', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(3, 'Tom3', 25, 'canal', '2024-11-07', '2024-11-07');
update user set age=26 where id=2;
delete from user where id=3;

输出信息:

================> binlog[binlog.000008:6390] , name[canal,user] , eventType : CREATE
================> binlog[binlog.000008:6899] , name[canal,user] , eventType : INSERT
Insert: 1 -> name:Tom1,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7213] , name[canal,user] , eventType : INSERT
Insert: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7527] , name[canal,user] , eventType : INSERT
Insert: 3 -> name:Tom3,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7850] , name[canal,user] , eventType : UPDATE
-------> before
Update: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
-------> after
Update: 2 -> name:Tom2,age:26,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:8193] , name[canal,user] , eventType : DELETE
Delete: 3

五、注意事项

  • 性能优化:根据实际需求调整 Canal 和 Redis 的配置,以优化性能。
  • 错误处理:在生产环境中,需要增加错误处理和重试机制,确保数据同步的可靠性。
  • 安全性:确保 Canal 和 Redis 的连接是安全的,使用适当的认证和授权机制。

六、结论

通过使用 Canal,我们可以轻松地将 MySQL 数据实时同步到 Redis、Kafka 或其他系统。这不仅提高了数据的一致性和实时性,还为应用提供了更高的性能和响应速度。希望本文对你有所帮助。

七、参考资料

  • canal QuickStart
  • canal ClientExample

http://www.ppmy.cn/ops/131932.html

相关文章

全星魅-物联网定位终端-北斗定位便携终端-北斗有源终端

在当今快速发展的物流运输行业中&#xff0c;精准定位与实时监控已成为确保货物安全与高效运输的关键因素。为了满足这一需求&#xff0c;QMCZ10作为一款集4G&#xff08;LTE Cat1&#xff09;通讯技术与智能定位功能于一体的终端产品&#xff0c;应运而生。它不仅具备普通定位…

【算法与数据结构】【链表篇】【题1-题5】

题1.从尾到头打印链表 题目&#xff1a;输入一个链表的头结点&#xff0c;从尾到头反过来打印出每个节点的值。链表的定义如下&#xff1a; struct ListNode {int mValue;ListNode *mNext;ListNode *mPrev; }; 1.1 方法一&#xff1a;栈 思路&#xff1a;要反过来打印&…

爬虫-------字体反爬

目录 一、了解什么是字体加密 二. 定位字体位置 三. python处理字体 1. 工具库 2. 字体读取 3. 处理字体 案例1:起点 案例2:字符偏移: 5请求数据 - 发现偏移量 5.4 多套字体替换 套用模板 版本1 版本2 四.项目实战 1. 采集目标 2. 逆向结果 一、了解什么是…

qt QTextCursor详解

1、概述 QTextCursor是Qt框架中用于在QTextDocument或QTextEdit中编辑和导航文本的类。它提供了对文本选择和编辑操作的低级控制&#xff0c;允许插入、删除、修改文本以及改变文本的格式。QTextCursor可以看作是一个在文本中移动的插入点或选择区域&#xff0c;通过它可以执行…

微信小程序uniapp基于Android的流浪动物管理系统 70c3u

文章目录 项目介绍具体实现截图技术介绍mvc设计模式小程序框架以及目录结构介绍错误处理和异常处理java类核心代码部分展示详细视频演示源码获取 项目介绍 以往流浪猫狗的救助网站相关信息的管理&#xff0c;都是工作人员手工统计。这种方式不但时效性低&#xff0c;而且需要查…

G1垃圾回收器日志详解

新生代收集 GC pause (G1 Evacuation Pause) (young) -- gc前堆内存分布情况 {Heap before GC invocations1592 (full 4):garbage-first heap total 6291456K, used 5011297K [0x0000000640000000, 0x0000000640206000, 0x00000007c0000000) --表示使用了G1,堆大小&…

25国考照片处理器使用流程图解❗

1、打开“国家公务员局”网站&#xff0c;进入2025公务员专题&#xff0c;找到考生考务入口 2、点击下载地址 3、这几个下载链接都可以 4、下载压缩包 5、解压后先看“使用说明”&#xff0c;再找到“照片处理工具”双击。 6、双击后会进入这样的界面&#xff0c;点击&…

oracle 9i 使用dbms_obfuscation_toolkit加密解密

加密(encrypt)解密(decrypt)采用 Oracle DBMS_OBFUSCATION_TOOLKIT package. 利用这个包,我们可以对数据进行DES,Triple DES或者MD5加密. DESGETKEY --产生密钥,用于DES算法 DES3GETKEY -- 产生密钥,用于Triple DES算法 DESENCRYPT -- 用DES算法加密数据 DESDECRYP…