浅谈canal实例 在docker里面安装canal镜像 Canal监听MySQL数据库变更并同步更新Redis和Elasticsearch 示例

embedded/2025/3/25 20:30:44/

目录

1. 环境准备

1.1 MySQL配置

1.2 部署Canal Server

2. Spring Boot项目配置

2.1 添加依赖

2.2 配置参数

3. 实现Canal监听与同步

3.1 Canal客户端监听

3.2 同步到Redis

3.3 同步到Elasticsearch

4. 注意事项


在Spring Boot中通过Canal监听MySQL数据库变更并同步更新Redis和Elasticsearch,可按照以下步骤实现:


1. 环境准备

1.1 MySQL配置
  • 开启Binlog并设置为ROW模式:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
  • 创建Canal用户并授权:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
1.2 部署Canal Server
  1. 下载Canal Server:Canal Releases
  2. 修改配置 conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306 
canal.instance.dbUsername=canal 
canal.instance.dbPassword=canal 
canal.instance.filter.regex=.*\\..*  # 监听所有库表,或指定如test.user

2. Spring Boot项目配置

2.1 添加依赖
<!-- Canal客户端 -->
<dependency><groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.6</version>
</dependency>
<!-- Redis -->
<dependency><groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Elasticsearch -->
<dependency><groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2.2 配置参数

application.yml

canal:server: 127.0.0.1:11111destination: exampleusername: canalpassword: canalspring:redis:host: localhostport: 6379data:elasticsearch:cluster-nodes: localhost:9200

3. 实现Canal监听与同步

3.1 Canal客户端监听
@Component
public class CanalListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate ElasticsearchRestTemplate esTemplate;@PostConstructpublic void init() {CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal");Thread thread = new Thread(() -> {connector.connect(); connector.subscribe(".*\\..*"); while (true) {Message message = connector.getWithoutAck(100); long batchId = message.getId(); if (batchId != -1) {processEntry(message.getEntries()); connector.ack(batchId); }}});thread.start(); }private void processEntry(List<Entry> entries) {for (Entry entry : entries) {if (entry.getEntryType()  == EntryType.ROWDATA) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); for (RowData rowData : rowChange.getRowDatasList())  {String tableName = entry.getHeader().getTableName(); EventType eventType = rowChange.getEventType(); // 解析变更前后的数据Map<String, String> before = parseColumns(rowData.getBeforeColumnsList()); Map<String, String> after = parseColumns(rowData.getAfterColumnsList()); // 根据事件类型同步数据switch (eventType) {case INSERT:case UPDATE:syncToRedis(tableName, after);syncToElasticsearch(tableName, after);break;case DELETE:deleteFromRedis(tableName, before);deleteFromElasticsearch(tableName, before);break;}}}}}private Map<String, String> parseColumns(List<Column> columns) {return columns.stream() .collect(Collectors.toMap(Column::getName,  Column::getValue));}
}
3.2 同步到Redis
private void syncToRedis(String tableName, Map<String, String> data) {String key = tableName + ":" + data.get("id");  // 假设主键为idredisTemplate.opsForValue().set(key,  data);
}private void deleteFromRedis(String tableName, Map<String, String> data) {String key = tableName + ":" + data.get("id"); redisTemplate.delete(key); 
}
3.3 同步到Elasticsearch
private void syncToElasticsearch(String tableName, Map<String, String> data) {IndexQuery indexQuery = new IndexQueryBuilder().withId(data.get("id")) .withObject(data).build();esTemplate.index(indexQuery,  IndexCoordinates.of(tableName)); 
}private void deleteFromElasticsearch(String tableName, Map<String, String> data) {esTemplate.delete(data.get("id"),  IndexCoordinates.of(tableName)); 
}

4. 注意事项

  1. 异常处理:增加重试机制或记录错误日志,确保网络波动时的数据一致性。
  2. 性能优化:批量处理Canal消息,减少Redis/ES的频繁写入。
  3. 数据结构:确保Elasticsearch的索引Mapping与MySQL表结构兼容。
  4. 事务管理:如需强一致性,可结合本地事务表或消息队列(如RocketMQ)做可靠投递。

通过以上步骤,Spring Boot应用能够实时监听MySQL变更,并自动同步到Redis和Elasticsearch,保障数据一致性。


http://www.ppmy.cn/embedded/176617.html

相关文章

「0基础学爬虫」爬虫基础之抓包工具的使用

抓包工具概述 抓包工具&#xff0c;顾名思义&#xff0c;就是抓取网络数据包信息的工具。抓包工具最初主要应用于测试工作中&#xff0c;通过抓包工具查看网络数据包&#xff0c;并进行分析&#xff0c;来定位数据传输中的问题。随着不断发展&#xff0c;抓包工具的功能不断拓…

【论文阅读】大型语言模型能否实现软件漏洞的检测与修复?

这篇文章翻译自 CAN LARGE LANGUAGE MODELS FIND AND FIX VULNERABLE SOFTWARE? 大型语言模型能否实现软件漏洞的检测与修复&#xff1f; 先说说结论和一些有意思的发现&#xff0c;以及这篇文章最重要的一个点&#xff1a; 那肯定是可以的&#xff0c; 此前实验已证实GPT-…

水星(MERCURY)监控初始化的恢复和转码方法

水星(MERCURY)的安防监控恢复了很多&#xff0c;其嵌入式文件系统也一直迭代更新。做为数据恢复从业者每天处理最多的就是恢复数据&#xff0c;但是有的时候业务的需要我们不仅仅恢复出数据&#xff0c;还需要能够转码成通用的MP4类文件并要求画面和声音实现“同步”。 故障存…

Ubuntu22.04通过DKMS包安装Intel WiFi系列适配器(网卡驱动)

下载驱动包 访问 backport-iwlwifi-dkmshttps://launchpad.net/ubuntu/source/backport-iwlwifi-dkms 网站&#xff0c;找到适用于Ubuntu 22.04的update版本&#xff08;如backport-iwlwifi-dkms_xxxx_all.deb&#xff09;&#xff0c;下载至本地。 安装驱动 在下载目录中执行以…

除了setup的表达方法,vue3还有什么表达方法

在 Vue 3 中&#xff0c;除了使用 setup 函数的组合式 API 这种表达方法外&#xff0c;还可以使用选项式 API 和 <script setup> 语法糖&#xff0c;下面分别介绍这几种方式&#xff1a; 1. 选项式 API&#xff08;Options API&#xff09; 选项式 API 是 Vue 2 中就有…

JVM的组成--运行时数据区

JVM的组成 1、类加载器&#xff08;ClassLoader&#xff09; 类加载器负责将字节码文件从文件系统中加载到JVM中&#xff0c;分为&#xff1a;加载、链接&#xff08;验证、准备、解析&#xff09;、和初始化三个阶段 2、运行时数据区 运行时数据区包括&#xff1a;程序计数…

【算法】十大排序算法(含时间复杂度、核心思想)

以下是 **十大经典排序算法** 的时间复杂度、空间复杂度及稳定性总结&#xff0c;适用于面试快速回顾&#xff1a;排序算法对比表 排序算法最佳时间复杂度平均时间复杂度最差时间复杂度空间复杂度稳定性核心思想冒泡排序O(n)O(n)O(n)O(1)稳定相邻元素交换&#xff0c;大数沉底…

Typora安装使用教程 简单易用的Markdown编辑器

Typora markdown 编辑器下&#xff0c;最后一个免费版本 0.11.18&#xff0c;但可能会提示过期无法使用, 建议大家可以使用 0.9.96 Windows 版&#xff0c;下载 Windows X64 版。 Typora简介 Typora 是一款由 Abner Lee 开发的轻量级 Markdown 编辑器&#xff0c;与其他 Mark…