浅谈canal实例 在docker里面安装canal镜像 Canal监听MySQL数据库变更并同步更新Redis和Elasticsearch 示例

server/2025/3/22 7:20:39/

目录

1. 环境准备

1.1 MySQL配置

1.2 部署Canal Server

2. Spring Boot项目配置

2.1 添加依赖

2.2 配置参数

3. 实现Canal监听与同步

3.1 Canal客户端监听

3.2 同步到Redis

3.3 同步到Elasticsearch

4. 注意事项


在Spring Boot中通过Canal监听MySQL数据库变更并同步更新Redis和Elasticsearch,可按照以下步骤实现:


1. 环境准备

1.1 MySQL配置
  • 开启Binlog并设置为ROW模式:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
  • 创建Canal用户并授权:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
1.2 部署Canal Server
  1. 下载Canal Server:Canal Releases
  2. 修改配置 conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306 
canal.instance.dbUsername=canal 
canal.instance.dbPassword=canal 
canal.instance.filter.regex=.*\\..*  # 监听所有库表,或指定如test.user

2. Spring Boot项目配置

2.1 添加依赖
<!-- Canal客户端 -->
<dependency><groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.6</version>
</dependency>
<!-- Redis -->
<dependency><groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Elasticsearch -->
<dependency><groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2.2 配置参数

application.yml

canal:server: 127.0.0.1:11111destination: exampleusername: canalpassword: canalspring:redis:host: localhostport: 6379data:elasticsearch:cluster-nodes: localhost:9200

3. 实现Canal监听与同步

3.1 Canal客户端监听
@Component
public class CanalListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate ElasticsearchRestTemplate esTemplate;@PostConstructpublic void init() {CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal");Thread thread = new Thread(() -> {connector.connect(); connector.subscribe(".*\\..*"); while (true) {Message message = connector.getWithoutAck(100); long batchId = message.getId(); if (batchId != -1) {processEntry(message.getEntries()); connector.ack(batchId); }}});thread.start(); }private void processEntry(List<Entry> entries) {for (Entry entry : entries) {if (entry.getEntryType()  == EntryType.ROWDATA) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); for (RowData rowData : rowChange.getRowDatasList())  {String tableName = entry.getHeader().getTableName(); EventType eventType = rowChange.getEventType(); // 解析变更前后的数据Map<String, String> before = parseColumns(rowData.getBeforeColumnsList()); Map<String, String> after = parseColumns(rowData.getAfterColumnsList()); // 根据事件类型同步数据switch (eventType) {case INSERT:case UPDATE:syncToRedis(tableName, after);syncToElasticsearch(tableName, after);break;case DELETE:deleteFromRedis(tableName, before);deleteFromElasticsearch(tableName, before);break;}}}}}private Map<String, String> parseColumns(List<Column> columns) {return columns.stream() .collect(Collectors.toMap(Column::getName,  Column::getValue));}
}
3.2 同步到Redis
private void syncToRedis(String tableName, Map<String, String> data) {String key = tableName + ":" + data.get("id");  // 假设主键为idredisTemplate.opsForValue().set(key,  data);
}private void deleteFromRedis(String tableName, Map<String, String> data) {String key = tableName + ":" + data.get("id"); redisTemplate.delete(key); 
}
3.3 同步到Elasticsearch
private void syncToElasticsearch(String tableName, Map<String, String> data) {IndexQuery indexQuery = new IndexQueryBuilder().withId(data.get("id")) .withObject(data).build();esTemplate.index(indexQuery,  IndexCoordinates.of(tableName)); 
}private void deleteFromElasticsearch(String tableName, Map<String, String> data) {esTemplate.delete(data.get("id"),  IndexCoordinates.of(tableName)); 
}

4. 注意事项

  1. 异常处理:增加重试机制或记录错误日志,确保网络波动时的数据一致性。
  2. 性能优化:批量处理Canal消息,减少Redis/ES的频繁写入。
  3. 数据结构:确保Elasticsearch的索引Mapping与MySQL表结构兼容。
  4. 事务管理:如需强一致性,可结合本地事务表或消息队列(如RocketMQ)做可靠投递。

通过以上步骤,Spring Boot应用能够实时监听MySQL变更,并自动同步到Redis和Elasticsearch,保障数据一致性。


http://www.ppmy.cn/server/176985.html

相关文章

数据结构-----队列

顺序队列&#xff08;Queue&#xff09; 一、队列核心概念 1. 基本特性 先进先出&#xff08;FIFO&#xff09;&#xff1a;最早入队的元素最先出队操作限制&#xff1a; 队尾&#xff08;Rear&#xff09;&#xff1a;唯一允许插入的位置队头&#xff08;Front&#xff09;&…

第四周日志-用网络请求理解bp(2)

python网络请求库实现数据抓取、API调用还是后端服务的交互 以urllib3库为例 请求&#xff1a; import urllib3 http urllib3.PoolManager() # 创建连接池管理对象url1"" r1 http.request(GET,url1) #request print(r1.status) request&…

鸿蒙NEXT项目实战-百得知识库05

代码仓地址&#xff0c;大家记得点个star IbestKnowTeach: 百得知识库基于鸿蒙NEXT稳定版实现的一款企业级开发项目案例。 本案例涉及到多个鸿蒙相关技术知识点&#xff1a; 1、布局 2、配置文件 3、组件的封装和使用 4、路由的使用 5、请求响应拦截器的封装 6、位置服务 7、三…

neo4j-如何让外部设备访问wsl中的neo4j

WSL 运行在一个虚拟网络环境中&#xff0c;它的 IP 只能被宿主 Windows 访问&#xff0c;外部设备无法直接访问 WSL 的端口。你需要在 Windows 上转发端口&#xff0c;让外部设备可以访问 Windows 并映射到 WSL。 1. 获取 WSL 的 IP 地址 在 WSL 中运行以下命令获取其 IP 地址…

Doris性能优化建议

1、jdbc连接中添加参数rewriteBatchedStatementstrue,将 JDBC 单条插入优化为批量操作 2、将单条插入攒成批后再插入,可先使用redis的zset存储&#xff0c;&#xff0c;每3秒后取出写入表中&#xff0c;写入失败再写回redis的zset 3、fe.conf中添加 按照机器可用内存的10/7赋…

第二天 流程控制(if/for/while) - 列表/元组/字典操作

前言 在IT运维和系统管理领域&#xff0c;资源监控是至关重要的基础技能。本教程将带领Python初学者&#xff0c;通过编写一个实用的系统资源监控脚本&#xff0c;掌握Python基础语法中的流程控制、数据结构操作等核心知识。即使您完全没有编程经验&#xff0c;只要跟着本文一…

Go 1.24.1 编译错误:`can‘t find export data (bufio: buffer full)` 的解决之旅

一、前言 最近在用 Go 1.24.1 开发时&#xff0c;我遇到了一个让人头疼的编译错误。错误信息如下&#xff1a; # internal/runtime/math C:\Program Files\Go\src\internal\runtime\math\math.go:7:8: could not import internal/goarch (cant find export data (bufio: buff…

⭐算法OJ⭐二叉树的后序遍历【树的遍历】(C++实现)Binary Tree Postorder Traversal

⭐算法OJ⭐二叉树的中序遍历【树的遍历】&#xff08;C实现&#xff09;Binary Tree Inorder Traversal ⭐算法OJ⭐二叉树的前序遍历【树的遍历】&#xff08;C实现&#xff09;Binary Tree Preorder Traversal Given the root of a binary tree, return the postorder traver…