docker-compose安装canal并利用rabbitmq同步多个mysql数据

ops/2025/1/11 19:49:00/

必看:本文默认已经安装好了docker-compose、rabbitmqmysql并且mysql开启了binlog日志,不需要再安装;

流程图

如上图所示,左边是MQ模式流程图,右边则是TCP模式的流程图;

最终的目的是利用canal监听多个MySQL数据变化,然后将数据同步到redis/es...第三方数据库容器中;

原理

大概原理:相对于mysql来说,canal充当的是mysql的从节点,然后canal读取mysql的binlog文件记录数据的变化;

使用MQ模式时:相对于mq来说,canal充当的则是消息生产者的角色,canal将监听到的数据发布到mq的队列中,然后mq的消费者负责对第三方数据库容器的增加、删除、修改的同步;

使用TCP模式时:需要我们的应用不断轮询去canal中不断获取变化数据,然后再去同步redis/es等第三方数据库容器

本文目录格式(只需关注这几个)

canal

   |- conf

       |- canal_test

           |- instance.properties

       |- example

           |- instance.properties

       canal.destinations

   |- logs

// 其中canal、logs都需要创建;canf里面的canal_test和example根据canal.destinations里面的canal.destinations参数进行配置,如果直接docker虚拟机中导出的conf文件只有example,所以需要将 instance.properties复制到另外一个文件夹里面去...

TCP模式的详细操作步骤

1、查看MySQL是否开启binlog

show variables like 'log_bin';

2、注册MySQL用户

# 新建用户 用户名:canal  密码:canal 
CREATE USER canal IDENTIFIED by 'canal';
# 授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新MySQL的系统权限相关表
FLUSH PRIVILEGES;# 查看用户
SELECT * FROM mysql.user where User='canal';

3、使用docker下载canal --- 用于复制instance.properties、canal.properties

3.1、下载并运行canal

docker run -p 11111:11111 --name canal -d canal/canal-server:latest

3.2、复制容器内的conf文件到宿主机:主要文件instance.properties、canal.properties

docker cp canal:/home/admin/canal-server/conf ./

4、修改配置

4.1、修改:./conf/canal.properties

canal.destinations = canal_test,example ### 很重要,但是可以默认为“example”;这样的话在springboot工程中也只需要配置“canal.destination=example”即可;这个参数可以为多个,使用“,”隔开,配置多个这个时,需要在conf文件夹下面分开配置“instance.properties”canal.serverMode = tcp### 默认tcp,可选tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ

4.2、修改:./conf/example(canal.destinations中配置的)/instance.properties

canal.instance.mysql.slaveId=1001              ### 模拟从节点,要和mysql的id不一致即可
canal.instance.master.address=127.0.0.1:3006  ### mysql1的 ip和端口
canal.instance.dbUsername=canal                ### 数据库用户名
canal.instance.dbPassword=canal                ###  数据库密码
canal.instance.filter.regex=.*\..*            ### 监听的表...(这里是所有表)
canal.mq.topic=example                        ### 如果要发布到MQ时,发布的topic,可以使用默认

 4.3、修改:./conf/canal_test(canal.destinations中配置的)/instance.properties

java">canal.instance.mysql.slaveId=1002             ### 模拟从节点,要和mysql的id不一致即可
canal.instance.master.address=127.0.0.2:3006  ### mysql2的 ip和端口
canal.instance.dbUsername=canal                ### 数据库用户名
canal.instance.dbPassword=canal                ###  数据库密码
canal.instance.filter.regex=.*\..*                ### 监听的表...(这里是所有表)
canal.mq.topic=这个才是rabbitmq的routing的key                        ### 如果要发布到MQ时,发布的topic

注意:本文是一个canal监听两个mysql,所以我配置的是两个canal.destinations,一个为canal_test,一个为example,所以配置了两个instance.properties,分别是4.2、4.3,目录路径为“conf/canal_test/instance.properties”和“conf/example/instance.properties”

如果需要监听所有表可以采用下面的方式配置regex

  • 同步所有数据库的所有表

canal.instance.filter.regex=.\\..

  • 只同步名为 db1 和 db2 的数据库下的所有表:

canal.instance.filter.regex=db1\\..*,db2\\..*

  • 只同步 db1 数据库下的 table1 和 table2 表

canal.instance.filter.regex=db1\\.table1,db1\\.table2

  • 同步所有数据库中名称以 user 开头的表

canal.instance.filter.regex=.*\\.user.*

注意:只需要修改以上配置即可,其余参数均可保持也最好保持默认;不然可能不会报错但是rabbitmq收不到消息...

5、启动docker-compose.yml;本文使用的是docker-compose启动方式

version: '3.8'
services:canal-server:image: canal/canal-server:latestcontainer_name: canal-servervolumes:- ./logs/:/home/admin/canal-server/logs- ./conf/:/home/admin/canal-server/confports:- "11111:11111"

按照本文挂载conf的方式的话,instance.properties文件所在位置必须是“conf/canal.destination配置的参数的目录名称”。比如本文“canal.destination=canal_test,example”,意思就是有两个参数“canal_test”和“example” 那么我们要在conf文件夹下面创建两个文件夹名称为“canal_test”和“example” ,然后在这个文件夹下面再创建“instance.properties”文件即可

6、启动canal

docker-compose up -d

7、查看logs日志

docker-compose logs -f
tail -f ./logs/canal_test/canal_test.log
tail -f ./logs/example/example.log

8、自己的应用程序连接canal进行数据同步

8.1、pom.xml

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.7</version>
</dependency>
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.7</version>
</dependency>

8.2、SimpleCanalClientExample.java

java">public class SimpleCanalClientExample {public static void main(String[] args) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("canal所在服务器ip", 11111(默认为11111端口)),"canal_test(canal.properties里面的canal.destinations配置的参数,只能选择一个)", "canal的username,没有用户名的话为空即可", "canal的password,没有密码的话为空即可");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*"); // 监听的数据库表,canal.user:代表监听canal库下面的user表...参考instance.properties里面的配置...connector.rollback();while (true) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();Thread.sleep(3000L);printEntry(message.getEntries());connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

 以上就是TCP模式...

MQ模式的详细操作步骤

MQ模式的操作步骤和TCP的操作步骤只有第“4、修改配置”、“8、自己的应用程序连接canal进行数据同步”不同,其余是一样的,所以只展示不一样的就行;

4、修改配置

4.1、修改:./conf/canal.properties

java">canal.destinations = canal_test,example ### 很重要,但是可以默认为“example”;这样的话在springboot工程中也只需要配置“canal.destination=example”即可;这个参数可以为多个,使用“,”隔开,配置多个这个时,需要在conf文件夹下面分开配置“instance.properties”canal.serverMode = rabbitmq     ### 默认tcp,可选tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
rabbitmq.host = 127.0.0.1(RabbitMQ服务器的IP)  ### rabbitmq配置 ,如果是其他MQ修改其他配置即可;RabbitMQ是采用的默认端口5672,好像不能修改端口rabbitmq.virtual.host = /            ### MQ的virtual.host
rabbitmq.exchange = canal.direct    ### 交换机
rabbitmq.username = username        ### 用户名
rabbitmq.password = password        ### 密码
rabbitmq.queue =                ### 可以为null,因为使用的是direct,所以只需交换机即可
rabbitmq.routingKey = routing_key11    ### 这一个貌似没用,canal是根据instance.properties里面的canal.mq.topic=配置来确定的key
rabbitmq.deliveryMode = direct    ### 交换机模式,topic、fanout、direct;

注意:本文的交换机名称“canal.direct”,交换机模式为“direct”;rabbitmq的routingKey好像不能再canal.properties中配置,要在4.2步中的topic中配置...

4.2、修改:./conf/example(canal.destinations中配置的)/instance.properties

java">canal.instance.mysql.slaveId=1001              ### 模拟从节点,要和mysql的id不一致即可
canal.instance.master.address=127.0.0.1:3006  ### mysql1的 ip和端口
canal.instance.dbUsername=canal                ### 数据库用户名
canal.instance.dbPassword=canal                ###  数据库密码
canal.instance.filter.regex=.*\..*                ### 监听的表...(这里是所有表)
canal.mq.topic=user2redis_example(这个才是rabbitmq的routing的key) ### 如果要发布到MQ时,发布的topic
#### canal.mq.dynamicTopic=  这个参数我没有做过测试...不知具体效果。..  

4.3、修改:./conf/canal_test(canal.destinations中配置的)/instance.properties

java">canal.instance.mysql.slaveId=1002             ### 模拟从节点,要和mysql的id不一致即可
canal.instance.master.address=127.0.0.2:3006  ### mysql2的 ip和端口
canal.instance.dbUsername=canal                ### 数据库用户名
canal.instance.dbPassword=canal                ###  数据库密码
canal.instance.filter.regex=.*\..*            ### 监听的表...(这里是所有表)
canal.mq.topic=user2redis_canal_test(这个才是rabbitmq的routing的key) ### 如果要发布到MQ时,发布的topic/key
#### canal.mq.dynamicTopic=  这个参数我没有做过测试...不知具体效果。..  

注意:本文是一个canal监听两个mysql,所以我配置的是两个canal.destinations,一个为canal_test,一个为example,所以配置了两个instance.properties,分别是4.2、4.3,目录路径为“conf/canal_test/instance.properties”和“conf/example/instance.properties”

.....省略其他步骤

注意:本文以及默认配置了RabbitMQ的交换机和队列...,要在第6步启动canal之前需要先配置rabbitmq的交换机队列;本文配置的交换机名称为“canal.direct ”、模式采用“direct”,绑定的key为“user2redis”;所以此处省略配置rabbitmq的步骤;

如果没有在webui中配置MQ的交换机队列,也可以先启动第8步中的消费者自动创建队列和交换机...

8、自己的应用程序(消费端)连接canal进行数据同步

8.1、pom.xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><!-- 主要是为了解决RabbitMQ传递对象时使用的是JDK默认序列化方法 -->
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>

8.2、application.yml

spring:rabbitmq:host: 127.0.0.1(rabbitmq的ip)port: 5672username: 用户名password: 密码listener:simple:prefetch: 1 # 默认MQ依次轮询投递给绑定的消费者,但是没有考虑到消费者是否已经处理完消息,此时可能出现消息堆积;设置此参数,每次只能获取一条消息,处理完成才能获取下一个消息;acknowledge-mode: auto # 确认机制;none:关闭;manual:手动ack;auto:自动ackretry:enabled: true # 重试机制;initial-interval: 1000ms # 初始的失败等待时长为1s;multiplier: 1 # 下次等待的时长倍数;下次等待时长=multiplier * initial-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态;如果在业务中包含事务,这里修改为false;

8.3、CanalDirectExchange.java

java">@Component
@Slf4j
public class CanalDirectExchange {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "canal.queue", durable = "true"),exchange = @Exchange(value = "canal.direct", type = ExchangeTypes.DIRECT, durable = "true"),key = {"user2redis_canal_test"}))public void receive(String message) {log.info("routing key = user2redis队列canal.queue监听到的数据: {}", message);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "canal.queue4", durable = "true"),exchange = @Exchange(value = "canal.direct", type = ExchangeTypes.DIRECT, durable = "true"),key = {"user2redis_example"}))public void directQueue2(String message) {log.info("routing key = example 队列canal.queue监听到的数据: {}", message);}
}

8.4、main.java

java">@Slf4j
@SpringBootApplication
public class ComsumerMain {public static void main(String[] args) throws UnknownHostException {SpringApplication.run(ComsumerMain.class, args);}@Beanpublic MessageConverter messageConverter() {Jackson2JsonMessageConverter json = new Jackson2JsonMessageConverter();json.setCreateMessageIds(true); // 生产UUID,用于消息的唯一id,解决幂等性;return json;}
}

错误踩坑

错误一:这个错误的异常原因是,tcp模式连接不上canal;可能原因如下:

在使用TCP连接时

  1. 检查“canal.destination”配置是否错误,是否和canal.properties里面的canal.destinations参数保持一致;
  2. 查看日志是否启动成功;
  3. 检查canal.properties中的canal.serverMode的模式是否是tcp模式;
Exception in thread "main" com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused: connectat com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:200)at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:116)at com.lting.canal.client.SimpleCanalClientExample.main(SimpleCanalClientExample.java:21)
Caused by: java.net.ConnectException: Connection refused: connectat java.base/sun.nio.ch.Net.connect0(Native Method)at java.base/sun.nio.ch.Net.connect(Net.java:579)at java.base/sun.nio.ch.Net.connect(Net.java:586)at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:853)at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:152)

java">      CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("canal的IP", 11111),"canal_test(此时检查这个参数是否和canal.properties里面的canal.destinations参数保持一致)", "", "");

错误二:使用MQ模式时,监听到的数据是一堆二进制binlog.....,如下类似

java">*y7
.
binlog.000002Ǭ� *UTF-80�ӽ��28BJPQ �<
.
binlog.000002��� *UTF-80�ӽ��28BJP114362

这个原因,大概率是你删除了MQ中的某些配置,没有配置成功...解决方法是,按照本文的第4步将“canal.properties、instance.properties”多余参数全部保持默认即可...;

注意:网上很多文章写的是解析canal数据格式错误,需要按照“Protobuf 格式”格式解析,其实不然,就是单纯的“canal.properties、instance.properties”配置错误,如果配置正常,那么解析处理的String就是一个JSON数据格式;

错误三:java.lang.ClassNotFoundException: com.alibaba.druid.pool.DruidDataSource

java">Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'sqlSessionFactory' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'dataSource' while setting bean property 'dataSource'; nested exception is org.springframework.beans.factory.CannotLoadBeanClassException: Cannot find class [com.alibaba.druid.pool.DruidDataSource] for bean with name 'dataSource' defined in class path resource [spring/tsdb/h2-tsdb.xml]; nested exception is java.lang.ClassNotFoundException: com.alibaba.druid.pool.DruidDataSource

这个原因就是,你的lib包下面缺少Druid依赖包;大概率就是你使用的是物理文件“canal.deployer-1.1.8-SNAPSHOTtar.gz”进行安装的,但是新版本的lib里面没有Druid相关依赖--

PS:--- 不知道是我下载的有问题还是...;反正就没有这个...

错误四:还有一个问题,在使用TCP连接的时候,有类似于“should start canal_test first”之类错误.(无法重现了);意思就是要想链接“canal_test”这个实例,必须先启动;

但是我们查看日志的时候是没有任何错误的时候,但是就是连接不上。

这个原因大概率就是你配置出现问题,在日志文件中是没有展示出来...官方没有答案,也没找到具体是那个参数配置;解决的办法就是保持默认配置,只修改一些核心配置即可。


http://www.ppmy.cn/ops/149229.html

相关文章

网络分析与监控:阿里云拨测方案解密

作者&#xff1a;俞嵩(榆松) 随着互联网的蓬勃发展&#xff0c;网络和服务的稳定性已成为社会秩序中不可或缺的一部分。一旦网络和服务发生故障&#xff0c;其带来的后果将波及整个社会、企业和民众的生活质量&#xff0c;造成难以估量的损失。 2020 年 12 月&#xff1a; Ak…

音频-扬声器和麦克风

首先&#xff0c;我们先介绍一下音频芯片&#xff1a;开发板上带有一个麦克风&#xff0c;一个扬声器&#xff0c;音频编解码芯片使用ES8311。麦克风直接连接到了ES8311芯片上&#xff0c;ES8311和扬声器之间&#xff0c;还有一个音频驱动放大器。ES8311通过I2S接口与ESP32-C3连…

WebSocket 扩展生态:协议与框架

在前七篇文章中,我们深入探讨了 WebSocket 的基础原理、开发实践和实战案例。今天,让我们把视野扩展到 WebSocket 的生态系统,看看有哪些扩展协议和框架可以帮助我们更好地开发 WebSocket 应用。我曾在一个大型即时通讯项目中,通过合理使用这些工具,将开发效率提升了 50%。 扩…

C++中基类与派生类析构函数的调用次序及重要性

在C++中,类的继承机制允许我们创建层次化的类结构,其中一个类(派生类)可以从另一个类(基类)继承属性和方法。当涉及到对象的构造和析构时,C++遵循特定的顺序规则,以确保资源得到正确管理和释放。本文将详细说明在子类析构时是否需要调用父类的析构函数,以及析构函数的…

lodash-实用库及常用方法

chunk()&#xff1a;将数组&#xff08;array&#xff09;拆分成多个指定长度的区块&#xff0c;并将这些区块组成一个新数组。例如&#xff1a; let array [1, 2, 3, 4, 5, 6]; let chunked _.chunk(array, 2); // [[1, 2], [3, 4], [5, 6]]debounce()&#xff1a;在执行回…

学前端 4 个月想进中厂,该怎么做?

大家好&#xff0c;我是程序员鱼皮。收到一位编程导航鱼友的提问&#xff0c;想要自学前端 4 个月进入中厂工作&#xff0c;让我帮忙给出一份学习计划。 鱼友提问 我刚刚考完研和准备期末考试&#xff0c;大三基本上在备研所以没有专门学习一项技术栈&#xff0c;简单学习过 …

【每日学点鸿蒙知识】调试、网络、缓存、富文本编辑等

1、如何使用发布证书进行调试&#xff1f; 由于部分功能需要校验证书信息&#xff0c;所以需要使用调试证书和发布证书分别进行调试&#xff0c;但是使用发布证书后出现安装错误05/14 19:04:39: Install Failed: error: failed to install bundle.code:9568322error: signatur…

飞凌嵌入式i.MX8M Mini核心板已支持Linux6.1

飞凌嵌入式FETMX8MM-C核心板现已支持Linux6.1系统&#xff0c;此次升级不仅使系统功能更加丰富&#xff0c;还通过全新BSP实现了内存性能的显著提升。 基于NXP i.MX8M Mini处理器设计开发的飞凌嵌入式FETMX8MM-C核心板&#xff0c;拥有4个Cortex-A53高性能核和1个Cortex-M4实时…