canal详解及demo

news/2024/12/19 23:53:56/

提示:如何保证Redis中的数据与数据库中的数据一致性?数据同步canal的介绍和demo、大型企业如何实现mysql到redis的同步?使用binlog实时更新redis缓存、canal的接入教程、win下canal的服务器端、canal客户端的创建、连接、测试教程、数据同步方式canal

文章目录

  • 前言
  • 一、canal是什么?
    • 1.1、工作原理
  • 二、canal服务端demo
    • 1.先看一下mysql的配置
    • 2.修改配置
    • 3.编辑my.ini 文件
    • 4.创建canal用户
    • 5.启动canal服务
    • 6.修改canal.properties
    • 7.修改instance.properties
    • 8.启动canal
    • 9.创建canal客户端
  • 三、canal客户端demo
    • 1.maven依赖
    • 2.demo代码
  • 三、测试
    • 1.启动服务器端
    • 2.启动测试端
    • 3.修改数据库
  • 总结


前言

很多时候我们需要数据库和redis(或者其他中间件,mq、es等)有交互。数据量少、并发量少的时候还好,那一旦并发上来了,怎么保证redis和数据库的数据一致性呢?这时就用到数据库和redis的同步工具-canal了。


一、canal是什么?

canal是阿里中基于java写的一个组件,他的官网是:canal官网
。他的作用是读取mysql数据的binlog日志,然后将其转换为对应的数据(数据的变化或者变化后的数据,跟配置有关),并且同步到相关中间件(本次demo中是用的reids中间件)

1.1、工作原理

原理是(从官网截的图)
在这里插入图片描述

二、canal服务端demo

1.先看一下mysql的配置

  • show variables like ‘%log_bin%’; – 判断binlong是on还是off,默认是off,需要打开
  • show variables like ‘binlog_format’; – 判断binlog的记录方式,有row和statement、fix,这里是用row,row是记录数据数据的变化,变成了啥,statement是记录执行的sql,看不到数据的变化以及最终的结果,fix是俩的混合
  • show variables like ‘%server_id%’; 是配置主从的,一般主节点设为1
  • show master status; 表示当前mysql中的binlog日志记录在哪里了,现在记录的多大内存了

2.修改配置

如果配置不满足,则需要修改mysql的配置文件: my.ini
win下的查找配置文件的方式: 服务 -》 属性 -》 可执行路径。 本次我这里的是:
“C:\Program Files\MySQL\MySQL Server 5.7\bin\mysqld.exe” --defaults-file=“C:\ProgramData\MySQL\MySQL Server 5.7\my.ini”
在这里插入图片描述

3.编辑my.ini 文件

搜索log-bin,在这个log-bin下面新建两条:
log-bin=mysql-bin
binlog-format=ROW
在这里插入图片描述
然后重新启动
在这里插入图片描述
再次执行下看结果:
在这里插入图片描述

4.创建canal用户

创建用户,专门给canal使用

-- 创建一个canal用户(专门去给canal这个同步数据软件使用)
create user canal IDENTIFIED by 'canal';
-- 给canal赋予select、repl权限
grant select ,REPLICATION SLAVE,REPLICATION CLIENT on *.* to 'canal'@'%';
-- 刷新权限
flush privileges;

5.启动canal服务

此时拿到一个canal的服务包。 我的百度云盘:云盘

6.修改canal.properties

解压后,进入到conf目录,找到 canal.properties 文件。这里需要注意的其实就一行:
在这里插入图片描述
新增这个:

canal.destinations = example
//如果有多个的话,就用逗号分隔,例如:
//canal.destinations = promption,seckill

这里是一个,就需要在conf中有一个文件夹,文件夹的名字与这个配置的名字一致,如果是两个,就需要两个文件夹了
在这里插入图片描述

7.修改instance.properties

进入这个example文件夹里面,还有一个配置文件(instance.properties),这个就是我们要改的配置文件了
在这里插入图片描述
这个配置文件中,比较重要的几个是:

# 要监听的数据库的地址
canal.instance.master.address=127.0.0.1:3306
# 当前binlog记录的位置
canal.instance.master.journal.name=mysql-bin.000002
# 当前binlog记录到哪了
canal.instance.master.position=154
# canal连接的账号和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 要监听的数据库表,例如: micromall.sms_home,miscromall.sms_brand
canal.instance.filter.regex=.*\\..*
# 不需要监听的数据库表,例如:mysql\\.test_.*
canal.instance.filter.black.regex=

其中canal.instance.master.journal.name对应的是file。
canal.instance.master.position 对应的是position, 这俩只需要配置一次,后续就会有一个缓存文件(meta.dat)去自己记录最大缓存到哪了
在这里插入图片描述

8.启动canal

到这里以后,就可以启动了,直接双击bin文件夹下的 start.bat
在这里插入图片描述

9.创建canal客户端

然后创建一个canal客户端去测试一下。

三、canal客户端demo

可以参考官网: https://github.com/alibaba/canal/wiki/ClientExample

在这里插入图片描述

1.maven依赖

<!-- canan 的依赖 这个最好是和服务端的版本一致 -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>

2.demo代码

默认是127.0.0

package com.zheng.canal;
import java.net.InetSocketAddress;
import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/*** @author: ztl* @date: 2024/11/24 22:31* @desc:*/
public class MyCanal {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}}

三、测试

1.启动服务器端

启动服务器端,也就是canal中的服务器,我本次是 D:\java\ruanjian\canal\canal.deployer-1.1.4\bin 下的 startup.bat 脚本![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/34deb7542e564e3a9cab0b03e95438cc.png

2.启动测试端

运行步骤2中的main方法: 此时看到的效果是不断的再刷新
在这里插入图片描述

3.修改数据库

修改数据库中的数据,然后看控制台中的变化:
(本次我修改了mytest库中的course_2023表中的第一条数据,控制台的变化如下:)
在这里插入图片描述
到此,我们就能拿到变化后的数据了,后面的操作我不用说大家也知道:将数据转化为实体,然后set进redis、es等各个地方。


总结

以上是一个简单的demo,大家可以基于demo或者官网去在自己项目中做一些个性化的处理。


http://www.ppmy.cn/news/1556508.html

相关文章

idea | maven项目标红解决方案 | 强制刷新所有依赖

场景&#xff1a;父pom多模块&#xff0c;新增时&#xff0c;依赖正常&#xff0c;但是application.yml看起来没被springboot识别&#xff0c;试过rebuild、重开idea清除缓存&#xff0c;重新maven面板reload all maven projects, 试过pom文件的依赖先移除再重新粘贴导入进来&a…

在 Linux 下,将 shell 脚本打包成二进制程序

在 Linux 下&#xff0c;将 shell 脚本打包成二进制程序并不是一个直接的过程&#xff0c;因为 shell 脚本本质上是文本文件&#xff0c;由 shell 解释器执行。不过&#xff0c;你可以使用几种方法来实现类似的目的&#xff1a; ### 1. 使用 shc 工具 shc 是一个可以将 shell…

用python实现滑雪小游戏,附源码

一个简单的基于文本的滑雪小游戏示例代码&#xff0c;在这个游戏中玩家控制一个滑雪者在有障碍物的雪道上滑行&#xff0c;尽量避开障碍物并获取更高的分数。 ● Skier类表示滑雪者&#xff0c;有位置属性和移动、转向方法。 ● Obstacle类表示障碍物&#xff0c;有位置属…

Java连接chatGPT步骤(免费key获取方法)

1.首先&#xff0c;找到了这个网站&#xff0c;介绍了五个免费使用ChatGPT API的开源项目 https://www.51cto.com/article/786796.html2.然后本人选择使用GPT-API-free&#xff0c;如果选择gpt-3.5-turbo模型的话&#xff0c;每天可以请求100条&#xff1b;可以了&#xff0c;…

基于python对网页进行爬虫简单教程

python对网页进行爬虫 基于BeautifulSoup的爬虫—源码 """ 基于BeautifulSoup的爬虫### 一、BeautifulSoup简介1. Beautiful Soup提供一些简单的、python式的函数用来处理导航、搜索、修改分析树等功能。它是一个工具箱&#xff0c;通过解析文档为用户提供需要…

AI监控赋能健身馆与游泳馆全方位守护,提升安全效率

一、AI视频监控技术的崛起 随着人工智能技术的不断发展&#xff0c;AI视频监控正成为各行业保障安全、提升效率的关键工具。相比传统监控系统&#xff0c;AI技术赋予监控系统实时分析、智能识别和精准预警的能力&#xff0c;让“被动监视”转变为“主动防控”。 二、AI监控应用…

《Hive 存储格式详解》

一、引言 在大数据处理中&#xff0c;Hive 是一个广泛使用的数据仓库工具&#xff0c;它提供了一种类似于 SQL 的查询语言&#xff0c;使得用户可以方便地对大规模数据集进行分析和处理。Hive 的存储格式对于数据的存储效率、查询性能和数据压缩等方面都有着重要的影响。本文将…

MyBatis常见面试题总结

#{} 和 ${} 的区别是什么&#xff1f; 注&#xff1a;这道题是面试官面试我同事的。 答&#xff1a; ${}是 Properties 文件中的变量占位符&#xff0c;它可以用于标签属性值和 sql 内部&#xff0c;属于原样文本替换&#xff0c;可以替换任意内容&#xff0c;比如${driver}…