Canal CDC

news/2025/2/16 3:22:57/

文章目录

      • 1.Canal介绍
        • 1.1 Mysql 的binlog介绍
        • 1.2 Canal 的运行原理
        • 1.3 Canal使用场景
      • 2.Mysql 的配置准备
      • 3.Canal 的准备
      • 4.Canal 数据结构分析
      • 5.Java 代码
      • 6.Kafka 测试

1.Canal介绍

阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。

1.1 Mysql 的binlog介绍

MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。

binlog可分为STATEMENT, MIXED, ROW

  • statement:语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。

  • row:行级, binlog 会记录每次操作后每行记录的变化。

  • mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照ROW 的方式进行处理

Canal 想做监控分析,选择 row 格式比较合适。

1.2 Canal 的运行原理

在这里插入图片描述
Mysql的主从复制:

  1. Master改变数据, 写入到二进制文件中
  2. slave 从master 发送dump协议, 读取二进制文件到自己的relay log
  3. slave读取relay log到自己的数据库

canal就是将自己伪装为slave

1.3 Canal使用场景

1> 进行异地数据库之间的同步框架

在这里插入图片描述
2> 更新缓存, 实现缓存和数据库的一致性
在这里插入图片描述
3> 抓取业务表的新增变化数据,用于制作实时统计(我们就是这种场景)

2.Mysql 的配置准备

CREATE TABLE user_info(
`id` VARCHAR(255),
`name` VARCHAR(255),
`sex` VARCHAR(255)
);
vim /etc/my.cnf

在这里插入图片描述

systemctl restart mysqld

3.Canal 的准备

在这里插入图片描述
修改配置
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
防火墙关闭

在这里插入图片描述

4.Canal 数据结构分析

在这里插入图片描述
发送的是Message, 由很多Entry组成, 一个Entry对应一个Sql命令

Entry: TableName, EntryTyple, StoreValue, RowChange

RowChange为反序列化后的数据, 如果要使用的话必须通过StoreValue反序列化为RowChange后才可以使用

5.Java 代码

maven

       <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.0.25</version></dependency>
public class Test {public static void main(String[] args)throws Exception {// Canal中的数据结构: Message - Entry(对应一个Sql) - TableName, EntryType, StoreValue-RowChangeCanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.66.66", 11111), "example", "", "");while (true){// 连接connector.connect();// 订阅connector.subscribe("cdc_test.*");// 获取数据Message message = connector.get(100);// 获取Entry 集合List<CanalEntry.Entry> entries = message.getEntries();if (entries.size() <= 0){System.out.println("稍等一会.........");Thread.sleep(1000);}else {for (CanalEntry.Entry entry : entries) {// 1.获取表名String tableName = entry.getHeader().getTableName();// 2.获取类型CanalEntry.EntryType entryType = entry.getEntryType();// 3.获取序列化后的数据ByteString storeValue = entry.getStoreValue();// 判断当前的类型是否为Rowif (CanalEntry.EntryType.ROWDATA.equals(entryType)){// 5.反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);// 6.获取EventTypeCanalEntry.EventType eventType = rowChange.getEventType();// 7.获取数据集List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {JSONObject beforeJson = new JSONObject();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {beforeJson.put(column.getName(), column.getValue());}JSONObject afterJson = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterJson.put(column.getName(), column.getValue());}System.out.println("Table:" + tableName + "eventType: " + eventType + "before: " + beforeJson + "after: " + afterJson);}}else {System.out.println("当前数据类型为:" + entryType);}}}}}
}

在这里插入图片描述

6.Kafka 测试

修改 canal.properties 中 canal 的输出 model,默认 tcp,改为输出到 kafka

在这里插入图片描述
修改 Kafka 集群的地址

canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

修改 instance.properties 输出到 Kafka 的主题以及分区数

# mq config
canal.mq.topic=canal_test
canal.mq.partitionsNum=1
# hash partition config
#canal.mq.partition=0
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic canal_test

向数据库添加数据后

在这里插入图片描述


http://www.ppmy.cn/news/533460.html

相关文章

ComponentScan

一、ComponentScan注解是什么 如果你理解了ComponentScan&#xff0c;你就理解了Spring是一个依赖注入(dependency injection)框架。所有的内容都是关于bean的定义及其依赖关系。 定义Spring Beans的第一步是使用正确的注解Component或Service或Repository或者Controller 但是…

@ComponentScan

ComponentScan 作用 ComponentScan用于批量注册bean。 这个注解会让spring扫描指定包及其子包中所有的类&#xff0c;得到一批类的数组&#xff0c;然后将满足过滤器条件的类作为bean注册到spring容器中。 常用参数 value&#xff1a;指定需要扫描的包 ComponentScan({ “xxx.…

CANopen是个啥?

CANopen是个啥&#xff1f; 一、CANopen协议的诞生和意义二、为什么选择CANopen 一、CANopen协议的诞生和意义 我个人的理解就是基于CAN BUS的上层应用协议&#xff0c;就好像有菜有锅有调料&#xff0c;不同的人做出来的菜是不一样的&#xff0c;CANopen就是希望建立一个应用标…

CAN接口简介

1.1 CAN总线介绍 顾名思义&#xff0c;CAN总线名称如下&#xff1a; 目前世界上绝大多数汽车制造厂商都采用CAN总线来实现汽车内部控制系统之间的数据通信。 CAN总线由CAN_H、CAN_L双绞线组成&#xff0c;通过差分电压传输信号&#xff0c;提高了抗干扰能力&#xff0c;保证…

CANoe (1)

CANifif(表达式&#xff09; 语句&#xff1b; if(表达式&#xff09;语句1&#xff1b; else 语句2&#xff1b;forfor&#xff08;<初始化>;<条件表达式>;<增量>) 语句&#xff1b;whilewhile(表达式&#xff09; 语句&#xff1b;do whiledo 循环体语句…

Canal学习

环境准备 MySQL搭建及binlog开启 MySQL搭建教程&#xff08;Windows10&#xff09;&#xff1a;https://blog.csdn.net/liwenyang1992/article/details/121513620 MySQL搭建教程&#xff08;Linux&#xff0c;包含MySQL5.7&#xff0c;Zookeeper&#xff0c;Kafka等&#xf…

1 CAN

一、起源 CAN总线&#xff0c;全称Controller Area Network&#xff0c;控制器局域网络&#xff0c;是由德国博世BOSCH公司于1986年专门为汽车行业开发的一种串行通信总线&#xff0c;BOSCH公司以研发生产汽车电子产品和提供汽车解决方案著称&#xff0c;直到现在也是汽车领域知…

pecan

https://segmentfault.com/a/1190000003718598 上一篇文章我们了解了一个巨啰嗦的框架&#xff1a;Paste PasteDeploy Routes WebOb。后来OpenStack社区的人受不了这么啰嗦的代码了&#xff0c;决定换一个框架&#xff0c;他们最终选中了Pecan。Pecan框架相比上一篇文章的…