docker-compose安装canal并利用rabbitmq同步多个mysql数据

embedded/2025/1/13 4:46:43/

必看:本文默认已经安装好了docker-compose、rabbitmqmysql并且mysql开启了binlog日志,不需要再安装;

流程图

如上图所示,左边是MQ模式流程图,右边则是TCP模式的流程图;

最终的目的是利用canal监听多个MySQL数据变化,然后将数据同步到redis/es...第三方数据库容器中;

原理

大概原理:相对于mysql来说,canal充当的是mysql的从节点,然后canal读取mysql的binlog文件记录数据的变化;

使用MQ模式时:相对于mq来说,canal充当的则是消息生产者的角色,canal将监听到的数据发布到mq的队列中,然后mq的消费者负责对第三方数据库容器的增加、删除、修改的同步;

使用TCP模式时:需要我们的应用不断轮询去canal中不断获取变化数据,然后再去同步redis/es等第三方数据库容器

本文目录格式(只需关注这几个)

canal

   |- conf

       |- canal_test

           |- instance.properties

       |- example

           |- instance.properties

       canal.destinations

   |- logs

// 其中canal、logs都需要创建;canf里面的canal_test和example根据canal.destinations里面的canal.destinations参数进行配置,如果直接docker虚拟机中导出的conf文件只有example,所以需要将 instance.properties复制到另外一个文件夹里面去...

TCP模式的详细操作步骤

1、查看MySQL是否开启binlog

show variables like 'log_bin';

2、注册MySQL用户

# 新建用户 用户名:canal  密码:canal 
CREATE USER canal IDENTIFIED by 'canal';
# 授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新MySQL的系统权限相关表
FLUSH PRIVILEGES;# 查看用户
SELECT * FROM mysql.user where User='canal';

3、使用docker下载canal --- 用于复制instance.properties、canal.properties

3.1、下载并运行canal

docker run -p 11111:11111 --name canal -d canal/canal-server:latest

3.2、复制容器内的conf文件到宿主机:主要文件instance.properties、canal.properties

docker cp canal:/home/admin/canal-server/conf ./

4、修改配置

4.1、修改:./conf/canal.properties

canal.destinations = canal_test,example ### 很重要,但是可以默认为“example”;这样的话在springboot工程中也只需要配置“canal.destination=example”即可;这个参数可以为多个,使用“,”隔开,配置多个这个时,需要在conf文件夹下面分开配置“instance.properties”canal.serverMode = tcp### 默认tcp,可选tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ

4.2、修改:./conf/example(canal.destinations中配置的)/instance.properties

canal.instance.mysql.slaveId=1001              ### 模拟从节点,要和mysql的id不一致即可
canal.instance.master.address=127.0.0.1:3006  ### mysql1的 ip和端口
canal.instance.dbUsername=canal                ### 数据库用户名
canal.instance.dbPassword=canal                ###  数据库密码
canal.instance.filter.regex=.*\..*            ### 监听的表...(这里是所有表)
canal.mq.topic=example                        ### 如果要发布到MQ时,发布的topic,可以使用默认

 4.3、修改:./conf/canal_test(canal.destinations中配置的)/instance.properties

java">canal.instance.mysql.slaveId=1002             ### 模拟从节点,要和mysql的id不一致即可
canal.instance.master.address=127.0.0.2:3006  ### mysql2的 ip和端口
canal.instance.dbUsername=canal                ### 数据库用户名
canal.instance.dbPassword=canal                ###  数据库密码
canal.instance.filter.regex=.*\..*                ### 监听的表...(这里是所有表)
canal.mq.topic=这个才是rabbitmq的routing的key                        ### 如果要发布到MQ时,发布的topic

注意:本文是一个canal监听两个mysql,所以我配置的是两个canal.destinations,一个为canal_test,一个为example,所以配置了两个instance.properties,分别是4.2、4.3,目录路径为“conf/canal_test/instance.properties”和“conf/example/instance.properties”

如果需要监听所有表可以采用下面的方式配置regex

  • 同步所有数据库的所有表

canal.instance.filter.regex=.\\..

  • 只同步名为 db1 和 db2 的数据库下的所有表:

canal.instance.filter.regex=db1\\..*,db2\\..*

  • 只同步 db1 数据库下的 table1 和 table2 表

canal.instance.filter.regex=db1\\.table1,db1\\.table2

  • 同步所有数据库中名称以 user 开头的表

canal.instance.filter.regex=.*\\.user.*

注意:只需要修改以上配置即可,其余参数均可保持也最好保持默认;不然可能不会报错但是rabbitmq收不到消息...

5、启动docker-compose.yml;本文使用的是docker-compose启动方式

version: '3.8'
services:canal-server:image: canal/canal-server:latestcontainer_name: canal-servervolumes:- ./logs/:/home/admin/canal-server/logs- ./conf/:/home/admin/canal-server/confports:- "11111:11111"

按照本文挂载conf的方式的话,instance.properties文件所在位置必须是“conf/canal.destination配置的参数的目录名称”。比如本文“canal.destination=canal_test,example”,意思就是有两个参数“canal_test”和“example” 那么我们要在conf文件夹下面创建两个文件夹名称为“canal_test”和“example” ,然后在这个文件夹下面再创建“instance.properties”文件即可

6、启动canal

docker-compose up -d

7、查看logs日志

docker-compose logs -f
tail -f ./logs/canal_test/canal_test.log
tail -f ./logs/example/example.log

8、自己的应用程序连接canal进行数据同步

8.1、pom.xml

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.7</version>
</dependency>
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.7</version>
</dependency>

8.2、SimpleCanalClientExample.java

java">public class SimpleCanalClientExample {public static void main(String[] args) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("canal所在服务器ip", 11111(默认为11111端口)),"canal_test(canal.properties里面的canal.destinations配置的参数,只能选择一个)", "canal的username,没有用户名的话为空即可", "canal的password,没有密码的话为空即可");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*"); // 监听的数据库表,canal.user:代表监听canal库下面的user表...参考instance.properties里面的配置...connector.rollback();while (true) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();Thread.sleep(3000L);printEntry(message.getEntries());connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

 以上就是TCP模式...

MQ模式的详细操作步骤

MQ模式的操作步骤和TCP的操作步骤只有第“4、修改配置”、“8、自己的应用程序连接canal进行数据同步”不同,其余是一样的,所以只展示不一样的就行;

4、修改配置

4.1、修改:./conf/canal.properties

java">canal.destinations = canal_test,example ### 很重要,但是可以默认为“example”;这样的话在springboot工程中也只需要配置“canal.destination=example”即可;这个参数可以为多个,使用“,”隔开,配置多个这个时,需要在conf文件夹下面分开配置“instance.properties”canal.serverMode = rabbitmq     ### 默认tcp,可选tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
rabbitmq.host = 127.0.0.1(RabbitMQ服务器的IP)  ### rabbitmq配置 ,如果是其他MQ修改其他配置即可;RabbitMQ是采用的默认端口5672,好像不能修改端口rabbitmq.virtual.host = /            ### MQ的virtual.host
rabbitmq.exchange = canal.direct    ### 交换机
rabbitmq.username = username        ### 用户名
rabbitmq.password = password        ### 密码
rabbitmq.queue =                ### 可以为null,因为使用的是direct,所以只需交换机即可
rabbitmq.routingKey = routing_key11    ### 这一个貌似没用,canal是根据instance.properties里面的canal.mq.topic=配置来确定的key
rabbitmq.deliveryMode = direct    ### 交换机模式,topic、fanout、direct;

注意:本文的交换机名称“canal.direct”,交换机模式为“direct”;rabbitmq的routingKey好像不能再canal.properties中配置,要在4.2步中的topic中配置...

4.2、修改:./conf/example(canal.destinations中配置的)/instance.properties

java">canal.instance.mysql.slaveId=1001              ### 模拟从节点,要和mysql的id不一致即可
canal.instance.master.address=127.0.0.1:3006  ### mysql1的 ip和端口
canal.instance.dbUsername=canal                ### 数据库用户名
canal.instance.dbPassword=canal                ###  数据库密码
canal.instance.filter.regex=.*\..*                ### 监听的表...(这里是所有表)
canal.mq.topic=user2redis_example(这个才是rabbitmq的routing的key) ### 如果要发布到MQ时,发布的topic
#### canal.mq.dynamicTopic=  这个参数我没有做过测试...不知具体效果。..  

4.3、修改:./conf/canal_test(canal.destinations中配置的)/instance.properties

java">canal.instance.mysql.slaveId=1002             ### 模拟从节点,要和mysql的id不一致即可
canal.instance.master.address=127.0.0.2:3006  ### mysql2的 ip和端口
canal.instance.dbUsername=canal                ### 数据库用户名
canal.instance.dbPassword=canal                ###  数据库密码
canal.instance.filter.regex=.*\..*            ### 监听的表...(这里是所有表)
canal.mq.topic=user2redis_canal_test(这个才是rabbitmq的routing的key) ### 如果要发布到MQ时,发布的topic/key
#### canal.mq.dynamicTopic=  这个参数我没有做过测试...不知具体效果。..  

注意:本文是一个canal监听两个mysql,所以我配置的是两个canal.destinations,一个为canal_test,一个为example,所以配置了两个instance.properties,分别是4.2、4.3,目录路径为“conf/canal_test/instance.properties”和“conf/example/instance.properties”

.....省略其他步骤

注意:本文以及默认配置了RabbitMQ的交换机和队列...,要在第6步启动canal之前需要先配置rabbitmq的交换机队列;本文配置的交换机名称为“canal.direct ”、模式采用“direct”,绑定的key为“user2redis”;所以此处省略配置rabbitmq的步骤;

如果没有在webui中配置MQ的交换机队列,也可以先启动第8步中的消费者自动创建队列和交换机...

8、自己的应用程序(消费端)连接canal进行数据同步

8.1、pom.xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><!-- 主要是为了解决RabbitMQ传递对象时使用的是JDK默认序列化方法 -->
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>

8.2、application.yml

spring:rabbitmq:host: 127.0.0.1(rabbitmq的ip)port: 5672username: 用户名password: 密码listener:simple:prefetch: 1 # 默认MQ依次轮询投递给绑定的消费者,但是没有考虑到消费者是否已经处理完消息,此时可能出现消息堆积;设置此参数,每次只能获取一条消息,处理完成才能获取下一个消息;acknowledge-mode: auto # 确认机制;none:关闭;manual:手动ack;auto:自动ackretry:enabled: true # 重试机制;initial-interval: 1000ms # 初始的失败等待时长为1s;multiplier: 1 # 下次等待的时长倍数;下次等待时长=multiplier * initial-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态;如果在业务中包含事务,这里修改为false;

8.3、CanalDirectExchange.java

java">@Component
@Slf4j
public class CanalDirectExchange {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "canal.queue", durable = "true"),exchange = @Exchange(value = "canal.direct", type = ExchangeTypes.DIRECT, durable = "true"),key = {"user2redis_canal_test"}))public void receive(String message) {log.info("routing key = user2redis队列canal.queue监听到的数据: {}", message);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "canal.queue4", durable = "true"),exchange = @Exchange(value = "canal.direct", type = ExchangeTypes.DIRECT, durable = "true"),key = {"user2redis_example"}))public void directQueue2(String message) {log.info("routing key = example 队列canal.queue监听到的数据: {}", message);}
}

8.4、main.java

java">@Slf4j
@SpringBootApplication
public class ComsumerMain {public static void main(String[] args) throws UnknownHostException {SpringApplication.run(ComsumerMain.class, args);}@Beanpublic MessageConverter messageConverter() {Jackson2JsonMessageConverter json = new Jackson2JsonMessageConverter();json.setCreateMessageIds(true); // 生产UUID,用于消息的唯一id,解决幂等性;return json;}
}

错误踩坑

错误一:这个错误的异常原因是,tcp模式连接不上canal;可能原因如下:

在使用TCP连接时

  1. 检查“canal.destination”配置是否错误,是否和canal.properties里面的canal.destinations参数保持一致;
  2. 查看日志是否启动成功;
  3. 检查canal.properties中的canal.serverMode的模式是否是tcp模式;
Exception in thread "main" com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused: connectat com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:200)at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:116)at com.lting.canal.client.SimpleCanalClientExample.main(SimpleCanalClientExample.java:21)
Caused by: java.net.ConnectException: Connection refused: connectat java.base/sun.nio.ch.Net.connect0(Native Method)at java.base/sun.nio.ch.Net.connect(Net.java:579)at java.base/sun.nio.ch.Net.connect(Net.java:586)at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:853)at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:152)

java">      CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("canal的IP", 11111),"canal_test(此时检查这个参数是否和canal.properties里面的canal.destinations参数保持一致)", "", "");

错误二:使用MQ模式时,监听到的数据是一堆二进制binlog.....,如下类似

java">*y7
.
binlog.000002Ǭ� *UTF-80�ӽ��28BJPQ �<
.
binlog.000002��� *UTF-80�ӽ��28BJP114362

这个原因,大概率是你删除了MQ中的某些配置,没有配置成功...解决方法是,按照本文的第4步将“canal.properties、instance.properties”多余参数全部保持默认即可...;

注意:网上很多文章写的是解析canal数据格式错误,需要按照“Protobuf 格式”格式解析,其实不然,就是单纯的“canal.properties、instance.properties”配置错误,如果配置正常,那么解析处理的String就是一个JSON数据格式;

错误三:java.lang.ClassNotFoundException: com.alibaba.druid.pool.DruidDataSource

java">Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'sqlSessionFactory' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'dataSource' while setting bean property 'dataSource'; nested exception is org.springframework.beans.factory.CannotLoadBeanClassException: Cannot find class [com.alibaba.druid.pool.DruidDataSource] for bean with name 'dataSource' defined in class path resource [spring/tsdb/h2-tsdb.xml]; nested exception is java.lang.ClassNotFoundException: com.alibaba.druid.pool.DruidDataSource

这个原因就是,你的lib包下面缺少Druid依赖包;大概率就是你使用的是物理文件“canal.deployer-1.1.8-SNAPSHOTtar.gz”进行安装的,但是新版本的lib里面没有Druid相关依赖--

PS:--- 不知道是我下载的有问题还是...;反正就没有这个...

错误四:还有一个问题,在使用TCP连接的时候,有类似于“should start canal_test first”之类错误.(无法重现了);意思就是要想链接“canal_test”这个实例,必须先启动;

但是我们查看日志的时候是没有任何错误的时候,但是就是连接不上。

这个原因大概率就是你配置出现问题,在日志文件中是没有展示出来...官方没有答案,也没找到具体是那个参数配置;解决的办法就是保持默认配置,只修改一些核心配置即可。


http://www.ppmy.cn/embedded/153458.html

相关文章

Pulsar客户端如何控制内存使用

Pulsar客户端如何控制内存使用 一、使用场景 在实际应用中&#xff0c;Pulsar客户端的内存使用控制是一个重要的性能优化点。假设有一个搜索类业务需要记录用户搜索请求&#xff0c;以便后续分析搜索热点和优化搜索效果。以下是一个简化的代码示例&#xff1a; PulsarClient…

Unity 3D游戏开发从入门进阶到高级

本文精心整理了Unity3D游戏开发相关的学习资料&#xff0c;涵盖入门、进阶、性能优化、面试和书籍等多个维度&#xff0c;旨在为Unity开发者提供全方位、高含金量的学习指南.欢迎收藏。 学习社区 Unity3D开发者 这是一个专注于Unity引擎的开发者社区&#xff0c;汇聚了众多Un…

mysql根据表的统计信息核算一下表成本

show table status like ‘表名’; eg: show table status like ‘domain’; 在InnoDB引擎下&#xff0c;rows 是估计值。 rows&#xff1a;就是表里的记录数 data_length: 就是表的聚簇索引的字节数大小&#xff0c;此时用data_length 除以 1024 就是kb 为单位的大小&#xf…

Linux 中断下半部,软中断、tasklet 和工作队列

中断下半部概述&#xff1a;中断处理程序分上下两部分&#xff0c;上半部在关闭中断下执行时间敏感、与硬件相关且不能被打断的工作&#xff1b;下半部在开启中断下执行&#xff0c;可被打断&#xff0c;包括软中断、tasklet 和工作队列三种&#xff0c;用于处理相对耗时任务&a…

当Elasticsearch索引数据量过多时,可以采取以下措施进行优化和部署

调整索引分片数量&#xff1a;根据数据量和集群规模&#xff0c;重新分配索引的分片数量。较小的索引分片可以提高查询性能&#xff0c;但过多的分片也会增加管理开销。因此&#xff0c;需要根据具体情况进行权衡。调整副本数量&#xff1a;根据数据量和查询负载&#xff0c;适…

如何配置 CentOS 7 的 Yum 源并切换到国内镜像源

在使用 Linux 系统&#xff08;特别是 CentOS 或 RHEL 系列操作系统&#xff09;时&#xff0c;Yum 是一个非常常用的包管理工具&#xff0c;它能够方便地帮助我们安装、更新和删除软件包。通常&#xff0c;Yum 会连接到默认的官方镜像源下载软件包&#xff0c;但由于海外镜像的…

数组(Array)和链表(Linked List)

1. 基本定义 数组&#xff08;Array&#xff09;&#xff1a; 数组是一个固定大小的连续内存区域&#xff0c;用于存储多个相同类型的数据元素。在数组中&#xff0c;元素按照索引顺序排列&#xff0c;访问时可以通过索引直接访问。链表&#xff08;Linked List&#xff09;&a…

Rust 1.84.0 发布

Cargo 依赖版本选择改进 稳定了最小支持 Rust 版本&#xff08;MSRV&#xff09;感知的解析器&#xff0c;该解析器会优先选择与项目声明的 MSRV 兼容的依赖版本&#xff0c;减少了维护者支持旧工具链的工作量&#xff0c;无需手动为每个依赖选择旧版本。可以通过.cargo/config…