使用 Canal 实时从 MySql 向其它库同步数据

server/2024/11/29 16:53:27/

目前绝大多数项目还是采用 mysql 作为数据存储,对于用户访问量较高的网站来说,mysql 读写性能有限,我们通常会把 mysql 中的数据实时同步到 Redis、mongodb、elastic search 等中间件中,应对高并发访问场景,减轻 mysql 压力,防止数据库宕机。在项目开发中,为了不会原有代码进行侵入,采用 canal 中间件实现 mysql 向其它库的实时同步,是一种很不错的方案。

canal 译意为水道/管道/沟渠,主要用途是基于 mysql 数据库增量日志解析,提供增量数据订阅和消费,其工作原理是:模拟 mysql slave 的交互协议,伪装自己为 mysql slave ,向 mysql master 发送 dump 协议,mysql master 收到 dump 请求,开始推送 binary log 给 canal,canal 解析 binary log 提供出对具体表数据的增删改操作内容。

本篇博客将采用 docker-compose 搭建 mysql 和 canal,并采用代码方式演示如果使用 canal 从mysql 中同步数据,监听对数据表的增删改操作。在本篇博客的最后会提供源代码下载。

Canal 的 gitHub 地址为:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

mysql-和-canal">一、部署 mysql 和 canal

本篇博客使用虚拟机进行部署,我的虚拟机操作系统是 CentOS7(ip 地址是 192.168.136.128),已经安装好了 docker 和 docker-compose ,首先我们先创建好相关的目录,我创建的主目录是 /app/canal,具体结构如下:

image

在 /app/canal 创建一个 mysql 目录,在 mysql 下创建一个 data 目录,并创建了 mysql 的配置文件 my.cnf

首先列出 mysql 的配置文件 my.cnf 的内容,主要配置是开启 binlog 日志:

 
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# canal 需要使用 ROW 模式
binlog-format=ROW
# 如果你只想同步部分数据库的话,可以进行如下配置
# 如果同步多个数据库的话,可以配置多行。
# 如果不配置的话,默认是 mysql 所有库都进行同步
# binlog-do-db=canaldb
# binlog-do-db=mytestdb
# 不要和 canal 的 slaveId 重复,mysql 默认是 1
server-id=1
# 数据目录
datadir=/var/lib/mysql
symbolic-links=0
socket=/var/lib/mysql/mysql.sock
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

然后在 /app/canal 下创建 docker-compose.yml 文件,内容如下:

 
version: "3.5"
services:
mysql-server:
image: mysql:5.7.42
container_name: mysql-server
restart: always
ports:
- 3306:3306
volumes:
# mysql的数据存放目录映射
- /app/canal/mysql/data:/var/lib/mysql
# mysql的配置文件映射
- /app/canal/mysql/my.cnf:/etc/mysql/my.cnf
environment:
# mysql的root密码设置
- MYSQL_ROOT_PASSWORD=root
networks:
- canal_net
canal-server:
image: canal/canal-server:v1.1.7
container_name: canal-server
restart: always
ports:
- 11111:11111
environment:
# 设置连接 canal 服务的用户名和密码
- CANAL_ADMIN_USER=admin
- CANAL_ADMIN_PASSWORD=password
# 设置 canal 实例的名称
- canal.destinations=jobs
# 设置 canal 实例作为 mysql 从库的 server_id
- canal.instance.mysql.slaveId=100
# 设置连接 mysql 的地址
- canal.instance.master.address=mysql-server:3306
# 设置连接 mysql 的账号密码
- canal.instance.dbUsername=root
- canal.instance.dbPassword=root
# 设置要解析的表(可以使用正则表达式),多个之间配置用英文逗号分隔
- canal.instance.filter.regex=canaldb.t_employee
networks:
- canal_net
depends_on:
- mysql-server
# 网络配置
networks:
canal_net:
driver: bridge

然后在 /app/canal 目录下,运行 docker-compose up -d 即可启动 mysql 和 canal 服务。

image

在 canal 服务配置中 canal.instance.filter.regex 用来配置要同步的数据库表,可以配置正则表达式:

 
多个正则之间以英文逗号分隔,转义符需要使用双斜杠(\\)。常见例子如下:
1. 所有表: .* or .\\..
2. canal 数据库下所有表: canal\\..*
3. canal 数据库下的所有以 canal 开头的表:canal\\.canal.*
4. canal 数据库下的一张表:canal.test1
5. 多个正则表达式组合使用,然后以英文逗号隔开:canal\\..*,mysql.test1,mysql.test2

我们上面配置的是 canaldb.t_employee ,也就是仅同步 canaldb 数据库下的 t_employee 表,因此在部署完 mysql 之后,需要运行以下 sql 脚本在数据库中简单出该表,并添加一些示例数据:

 
DROP TABLE IF EXISTS `t_employee`;
CREATE TABLE `t_employee` (
`e_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '员工id',
`e_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '员工姓名',
`e_age` int(11) NOT NULL DEFAULT 0 COMMENT '员工年龄',
PRIMARY KEY (`e_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `t_employee` VALUES (1, '任肥肥', 41);
INSERT INTO `t_employee` VALUES (2, '侯胖胖', 42);
INSERT INTO `t_employee` VALUES (3, '乔豆豆', 40);

二、实时同步数据

新建一个 springboot 工程,具体结构如下所示:

image

由于工程代码非常简单,这里就直接进行介绍,先看 pom 文件引用的依赖:

 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jobs</groupId>
<artifactId>springboot_canal</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<!--引入 canal 的 springboot 依赖-->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
</dependencies>
</project>

这里最主要就是引入了 canal-spring-boot-starter 这个依赖,

该依赖包是第三方爱好者提供,github 网址为:GitHub - NormanGyllenhaal/canal-client: spring boot canal starter 易用的canal 客户端 canal client

我们需要创建一个实体类,其字段需要与要同步数据的 mysql 数据库表保持一致,具体细节如下:

 
package com.jobs.pojo;
import lombok.Data;
//注意:
//这里的属性名称,需要与数据库表中的字段名称,保持一致
@Data
public class Employee {
private Long e_id;
private String e_name;
private Integer e_age;
}

然后再开发一个 handler 用于处理从 canal 服务获取到的解析记录(对数据的增删改记录)即可:

 
package com.jobs.handler;
import com.jobs.pojo.Employee;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@CanalTable("t_employee")
@Component
public class EmployeeHandler implements EntryHandler<Employee> {
@Override
public void insert(Employee employee) {
System.out.println("添加了:" + employee);
//这里可以将数据添加到同步的目标库中,比如 redis 缓存
}
@Override
public void update(Employee before, Employee after) {
System.out.println("更新前:" + before);
System.out.println("更新后:" + after);
//这里可以将数据更新到同步的目标库中,比如 redis 缓存
}
@Override
public void delete(Employee employee) {
System.out.println("删除了:" + employee);
//这里可以将数据从同步的目标库中删除掉,比如 redis 缓存
}
}

最后在工程的 application.yml 文件中配置好连接 canal 服务的信息:

 
canal:
# canal 服务部署时,配置的 destination 值,此处要保持一直
destination: jobs
# canal服务地址
server: 192.168.136.128:11111
# 连接 canal 服务的用户名和密码
user-name: admin
password: password

三、验证成果

启动 springboot 工程,然后使用 navcat 连接到 mysql 数据库,对 canaldb 下的表 t_employee 中的记录进行增删改:

首先我们在 mysql 中添加一条新纪录:李墩墩38 岁,然后程序就监听到数据添加事件,控制台打印结果如下:

image

然后我们在 mysql 中修改李墩墩的数据,修改为蔺赞赞36岁,然后程序就监听到数据修改事件,控制台打印结果如下:

image

最后我们在 mysql 中删除任肥肥这条记录,然后程序就监听到数据修改事件,控制台打印结果如下:

image

本篇博客的 Demo 代码,仅仅是把数据打印出来,在实际开发中,大家可以根据自己的业务,进行相应的操作。

到此为止使用 Canal 服务从 mysql 实时同步数据的内容已经介绍完毕,大家可以下载源代码自行体验:

本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/springboot_canal.zip


http://www.ppmy.cn/server/145945.html

相关文章

知识图谱嵌入评估的常用任务

知识图谱嵌入&#xff08;KGE&#xff09;是通过将图中的实体和关系表示为低维向量&#xff0c;从而使得原本复杂的图结构可以被机器学习模型处理&#xff0c;并用于后续任务。有效的评估方法能够帮助研究者和工程师了解模型在不同任务中的表现&#xff0c;并优化模型以提升其在…

卷积神经网络:图像特征提取与分类的全面指南

目录 引言 卷积层&#xff1a;图像特征的初步提取 局部连接与权重共享 多个卷积核与特征图 激活函数 池化层&#xff1a;降低维度与增强不变性 最大池化与平均池化 空间不变性 全连接层&#xff1a;特征整合与分类决策 特征整合 分类器 Dropout与正则化 训练与优化…

【C++贪心 数论】991. 坏了的计算器|1909

本文涉及知识点 C贪心 数论&#xff1a;质数、最大公约数、菲蜀定理 LeetCode991. 坏了的计算器 在显示着数字 startValue 的坏计算器上&#xff0c;我们可以执行以下两种操作&#xff1a; 双倍&#xff08;Double&#xff09;&#xff1a;将显示屏上的数字乘 2&#xff1b…

快速排序 C++

题目一 解题思路 快排思路 首先设定一个分界值(基准值)&#xff0c;通过该分界值将数组分成左右两部分。将大于或等于分界值的数据集中到数组右边&#xff0c;小于分界值的数据集中到数组的左边。此时&#xff0c;左边部分中各元素都小于分界值&#xff0c;而右边部分中各元素…

【Maven Helper】分析依赖冲突案例

目录 Maven Helper实际案例java文件pom.xml文件运行抛出异常分析 参考资料 Maven Helper A must have plugin for working with Maven. easy way for analyzing and excluding conflicting dependenciesactions to run/debug maven goals for a module that contains the cur…

通过DBUA升级 Oracle 11g到Oracle12c版本

Oracle 11g升级到Oracle12c Oracle11g数据库环境准备与数据备份 环境&#xff1a; oracle11.2.0.4 to oralce12.2.0.1 升级方案&#xff1a; 升级方案很多种&#xff0c;我们ORACLE培训课程第8阶段有所讲所有的升级方案&#xff0c;我们这里采用DBUA官方建议的方法 1、手…

ctfshow -web 89-115-wp

89. 显然&#xff0c;这里是需要绕过preg_match&#xff0c;绕过preg_match有三种方法 CTF 总结02&#xff1a;preg_match()绕过_pregmatch函数绕过-CSDN博客 90. 考intval。 这个与赣ctf有道题差不多&#xff0c;我是直接传入num4476a&#xff0c;intval&#xff08;4476a&a…

Vue 开发中为什么要使用穿透符::deep()

在 Vue 开发中&#xff0c;有时候样式需要穿透才能生效&#xff0c;通常是因为使用了作用域样式 (scoped styles) 的缘故。 1. 什么是作用域样式 (scoped styles)? 在 Vue 单文件组件 (SFC) 中&#xff0c;使用 <style scoped> 声明的样式只会作用于当前组件的元素。Vu…