基于canal监听MySQL binlog实现数据增量同步

ops/2024/9/23 14:29:21/

一、背景

业务反馈客服消息列表查询速度慢,有时候甚至要差不多20秒,急需优化提升速度。

二、方案

引入

首先,体验系统,发现查询慢的正是消息列表查询接口。

接着去看代码的设计,流程比较长,但从代码逻辑上设计没有问题什么大问题。

接着拿到查库的主SQL,发现连接的表比较多,然后在测试库看索引,索引缺了一些。加上索引,然后确定确实走了预期的索引之后就给正式库加上了索引。速度确实有了较大提升。能够进入十秒内,但因为数据量大,还是需要4~5秒左右。

不过,业务对这个已经比较满意了,但还是提出能否速度更快。优化完索引之后,可以试试上缓存,但细看了一下数据,变化频率不低,不太适合缓存。

接着就想到了本文的主角-----Elastic Search。支持高性能检索,倒排索引的机制,也更适合大数据量的场景。于是就以测试库来尝试,做引入ES的探索。

选型

ES和数据库作为不同的系统,要查询效果一致,首先就要考虑数据的同步问题。

首先对于数据库已有的数据,做全量同步。这个是没有异议的。

但对于增量数据如何处理就引出了几种常用的方案。

方案一

数据库更新的时候,直接同步更新ES。这种方式实现简单,数据同步也及时,但每处更新数据库都要加上更新ES的操作,在后续开发中,很容易会遗漏。(某些时候可能需要用SQL改数据库数据,这种操作无法更新ES)

方案二

数据库更新的时候,就将更新的操作发送到MQ,让MQ异步去更新ES。这种方案相比第一种复用性更高,可维护性更强一些,但还是有缺漏的风险。(某些时候可能需要用SQL改数据库数据,这种操作无法更新ES)

方案三

监听数据库binlog日志,只要有变更记录的操作,就同步更新ES。这种方案实现起来较复杂,但基本写完一套之后,后续基本不需要再变动。

综合考虑下来,我觉得第三种方案是最好的。在之前的学习中,我尝试了给项目引入Easy ES框架来实现ES的引入。

springboot整合easy-es实现数据的增删改查_easy es-CSDN博客

接下来就差数据的全量同步和增量同步了。全量同步可以直接通过工具来实现,而增量同步就需要工具结合代码来实现了。本文就是打算用阿里的canal来实现监听MySQL数据库的binlog日志。

三、原理

MySQL的分布式是基于主从架构实现的。一般情况下是一主多从,其中一个数据库作为主节点,其他数据库作为从节点,主从节点之间通过订阅binlog的方式实现数据同步。

canal的数据增量同步底层就是利用MySQL的主从同步机制实现的。将canal伪装成master的一个slave节点,向master节点发起dump协议,master节点在接收到dump协议之后,就会将binlog日志推送给canal,canal拿到binlog日志之后执行相应的操作从而实现数据同步。

四、配置流程

官方教程

4.1、配置MySQL

查看源MySQL的binlog配置,确保MySQL开启了binlog日志。

SHOW VARIABLES LIKE "%bin%"

日志的格式为ROW

查看源MySQL的server_id

准备好一个拥有slave权限的MySQL账号。

4.2、配置canal

1.下载canalicon-default.png?t=N7T8https://github.com/alibaba/canal/releases

2.解压,配置canal,修改文件conf/example/instance.properties

配置canal的server_id,注意要和上面查看的源MySQL的不一样。

配置源MySQL的ip+端口

配置源MySQL的账号密码

3.启动项目

在bin目录下找到对应系统的启动文件,双击启动。

/bin/startup.bat(window)

/bin/startup.sh(linux)

查看日志文件,检查是否启动成功。logs/canal/canal.log和logs/example/example.log

服务启动成功

4.3、canal集成到springboot

添加依赖

		<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>

直接用官方的测试代码验证(不需要更改,如果canal安装在本地的话)

package org.jeecg.modules.admin.assignImClassTeacher;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}/*** 打印canal server解析binlog获得的实体类信息*/private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等RowChange rowChage;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型EventType eventType = rowChage.getEventType();//打印Header信息System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));//判断是否是DDL语句if (rowChage.getIsDdl()) {System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());//如果是更新的语句} else {//变更前的数据System.out.println("------->; before");printColumn(rowData.getBeforeColumnsList());//变更后的数据System.out.println("------->; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}}

运行main方法

在更新后,每个字段都有一个update字段,如果值为true代表这个字段更新了,为false代表没更新。

拿到这些涉及数据库变更的事件之后,就可以根据需要去做数据的增量同步了。


http://www.ppmy.cn/ops/20844.html

相关文章

RTU遥测终端为城市排水安全保驾护航!

近年来&#xff0c;全球气候变迁与城市化进程不断加速&#xff0c;导致强降雨事件频发&#xff0c;道路低洼地带、下穿式立交桥和隧道等区域在暴雨中常易积水&#xff0c;严重阻碍了人民的出行&#xff0c;甚至危及生命与财产安全。而传统的排水管网管理方式已难以适应现代城市…

前端css中keyframes(关键帧)的简单使用

前端css中keyframes的使用 一、前言二、例子&#xff08;一&#xff09;、例子源码1&#xff08;二&#xff09;、源码1运行效果1.视频效果2.截图效果 三、结语四、定位日期 一、前言 关键帧keyframes是css动画的一种&#xff0c;主要用于定义动画过程中某一阶段的样式变化&am…

css阴影效果制作

前言 欢迎来到我的博客 个人主页&#xff1a;北岭敲键盘的荒漠猫-CSDN博客 本文为我整理的为盒子添加阴影效果的方法 阴影效果展示 可以帮我们制作这种立体效果。 也可以用这个阴影作为一个选中效果。 阴影属性 语法: box-shadow: h-shadow v-shadow blur spread color inset;…

Rust学习03:解决了如何更改项目名称的小问题

好不容易跑通了第一个小程序&#xff01; 高兴之余&#xff0c;突然又对小程序的名字不满意了&#xff0c;想改一个更好的。奇怪么&#xff1f;不奇怪&#xff01;对于一位想不开而来自学Rust的人又有什么事情是做不出的呐~~ 开始我是直接改了项目文件夹的名字~捅了马蜂窝了&am…

视频怎么批量压缩?5个好用的电脑软件和在线网站

视频怎么批量压缩&#xff1f;有时候我们需要批量压缩视频来节省存储空间&#xff0c;便于管理文件和空间&#xff0c;快速的传输发送给他人。有些快捷的视频压缩工具却只支持单个视频导入&#xff0c;非常影响压缩效率&#xff0c;那么今天就向大家从软件和在线网站2个角度介绍…

【布客技术评论】大模型开源与闭源:原因、现状与前景

在人工智能领域&#xff0c;大模型的开源与闭源一直是一个备受争议的话题。近期&#xff0c;某大厂厂长说了“开源模型永远超不过闭源模型”&#xff0c;结果&#xff0c;脸书就发布了开源模型Llama3&#xff0c;超过了OpenAI 的闭源模型 GPT4。本文将探讨大模型开源与闭源的原…

开发语言漫谈-ABAP

大多数程序员可能都没有听说过这门语言。ABAP是SAP公司专门用于SAP软件环境的专门语言。这么多专门就能知道这门语言邻域有多么狭窄。这门语言过去据称是一条闷声挣大钱的好途径&#xff0c;非常不卷&#xff0c;简直躺赢的好事。这么说也没毛病&#xff0c;关键在SAP的业务能有…

抖音 小程序 获取手机号 报错 getPhoneNumber:fail auth deny

这是因为 当前小程序没有获取 手机号的 权限 此能力仅支持小程序通过试运营期后可用&#xff0c;默认获取权限&#xff0c;无需申请&#xff1b; https://developer.open-douyin.com/docs/resource/zh-CN/mini-app/develop/guide/open-capabilities/acquire-phone-number-acqu…