【Linux】Canal主从复制

news/2024/10/29 2:40:08/

canal.deployer-1.1.7.tar.gzcanal.adapter-1.1.7.tar.gzcanal.admin-1.1.7.tar.gzcanal.example-1.1.7.tar.gz

Canal_2">什么是Canal

阿⾥巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍⽣出了同步杭州和美国异地机房的需求,从 2010 年开始,阿⾥系公司开始逐步的尝试 基于数据库的⽇志解析, 获取增量变更进⾏同步,由此衍⽣出了增量订阅&消费的业务。Canal 是⽤ Java 开发的基于数据库增量⽇志解析,提供增量数据订阅&消费的中间件。 ⽬前。Canal 主要⽀持了 MySQL 的 Binlog 解析,解析完成后才利⽤ Canal Client 来处理获得 的相关数据。(数据库同步需要阿⾥的 Otter 中间件,基于 Canal)。

MySQL的binlog

binlog是什么?

MySQL 的⼆进制⽇志可以说 MySQL 最重要的⽇志了,它记录了所有的 DDL 和 DML(除 了数据查询语句)语句,以事件形式记录,还包含语句所执⾏的消耗的时间,MySQL 的⼆进 制⽇志是事务安全型的。

⼀般来说开启⼆进制⽇志⼤概会有 1%的性能损耗。⼆进制有两个最重要的使⽤场景:

其⼀:MySQL Replication 在 Master 端开启 Binlog,Master 把它的⼆进制⽇志传递给 Slaves 来达到 Master-Slave 数据⼀致的⽬的。

其⼆:⾃然就是数据恢复了,通过使⽤ MySQL Binlog ⼯具来使恢复数据。 ⼆进制⽇志包括两类⽂件: ⼆进制⽇志索引⽂件(⽂件名后缀为.index)⽤于记录所有 的⼆进制⽂件,⼆进制⽇志⽂件(⽂件名后 缀为.00000*)记录数据库所有的 DDL 和 DML(除 了数据查询语句)语句事件。

binlog的分类

MySQL Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置⽂件中可以选择配置
binlog_format= statement|mixed|row。三种格式的区别:

  1. statement:语句级,binlog 会记录每次⼀执⾏写操作的语句。相对 row 模式节省空 间,但是可能
    产⽣不⼀致性,⽐如“update tt set create_date=now()”,如果⽤ binlog ⽇志 进⾏恢复,由于执
    ⾏时间不同可能产⽣的数据就不同。
    优点:节省空间。

缺点:有可能造成数据不⼀致。

  1. row:⾏级, binlog 会记录每次操作后每⾏记录的变化。
    优点:保持数据的绝对⼀致性。因为不管 sql 是什么,引⽤了什么函数,他只记录 执⾏后的效果。
    缺点:占⽤较⼤空间。
  2. mixed:statement 的升级版,⼀定程度上解决了,因为⼀些情况⽽造成的 statement 模式不⼀致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含AUTO_INCREMENT 字段的表被更新时;执⾏ INSERT DELAYED 语句时;⽤ UDF 时,会按照ROW 的⽅式进⾏处理
    优点:节省空间,同时兼顾了⼀定的⼀致性。
    缺点:还有些极个别情况依旧会造成不⼀致,另外 statement 和 mixed 对于需要对 binlog 的监控
    的情况都不⽅便。

综合上⾯对比,Canal 想做监控分析,选择 row 格式比较合适。

Canal_41">Canal的工作原理

MySQL的主从复制

  • Master 主库将改变记录,写到⼆进制⽇志(Binary Log)中;
  • Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继⽇志(relay log);
  • Slave 从库读取并重做中继⽇志中的事件,将改变的数据同步到⾃⼰的数据库。

Canal_51">Canal的工作原理

就是把自己伪装成 Slave,从 Master 复制数据。

使用场景

原始场景: 阿⾥ Otter 中间件的⼀部分 Otter 是阿⾥⽤于进行异地数据库之间的同步框架,Canal 是其中⼀部分。

常⻅场景 1:更新缓存

常⻅场景 2:抓取业务表的新增变化数据,⽤于制作实时统计

Canal_67">Canal的组件用途

** canal-deploy **

可以直接监听MySQL的binlog,把⾃⼰伪装成MySQL的从库,只负责接收数据,并不做处理。

** canal-adapter **

相当于canal的客户端,会从canal-deploy中获取数据,然后对数据进⾏同步。

** canal-admin **

为canal提供整体配置管理、节点运维等⾯向运维的功能,提供相对友好的Web-ui操作界⾯,⽅便更多⽤户快速和安全的操作。

Canal_81">Canal的搭建

首先搭建SQLServer

MySQL安装:

Linux安装MySQL

配置文件

[mysqld]
port=3306
basedir=/usr/local/mysql/mysql8
datadir=/usr/local/mysql/data
log-error=/usr/local/mysql/log/mysql_error.log
general_log=ON
general_log_file=/usr/local/mysql/log/mysql_general.log
log-bin=mysql-bin
binlog_format=row
# binlog-do-db=dbname
character-set-server=utf8mb4
default-storage-engine=INNODB
default_authentication_plugin=mysql_native_password
lower_case_table_names=1
plugin_dir=/usr/local/mysql/mysql8/lib/plugin
server_id=1

注意的是log-bin必须开启,binlog_format必须设置为row, binlog-do-db根据情况设置要同步的数据库,如果不设置那就是所有的库都会在binlog中

cd /usr/local/mysql/mysql8/bin
./mysqld --initialize --user=mysql --datadir=/usr/local/mysql/data/ 
./mysqld_safe --user=mysql &
./mysql -uroot -p

配置权限:

alter user 'root'@'localhost' identified by "123456";
create user root@'%' identified by '123456';
grant all privileges on *.* to root@'%';
CREATE USER canal IDENTIFIED BY 'canal'; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
flush privileges;

查看binlog日志状态

show variables like '%log_bin%'
show variables like 'binlog_format%'; 

Canal_142">安装Canal

Canal_144">下载Canal

Releases · alibaba/canal

mkdir /usr/local/canal
cd /usr/local/canal
# 下载很慢
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8-alpha2/canal.deployer-1.1.8-SNAPSHOT.tar.gz
tar -xzvf canal.deployer-1.1.8-SNAPSHOT.tar.gz

canal部署deployer

解压安装包

解压压缩包到/usr/local/canal/deployer

mkdir deployer
tar -zxvf canal.deployer-1.1.7.tar.gz -C ./deployer/
修改配置⽂件deployer/conf/example/instance.properties
# 需要同步数据的MySQL地址
canal.instance.master.address=192.168.187.88:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# ⽤于同步数据的数据库账号
canal.instance.dbUsername=canal
# ⽤于同步数据的数据库密码
canal.instance.dbPassword=canal
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*
启动
./deployer/bin/startup.sh 
使⽤Java测试canal-deployer

创建maven项⽬

 <dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.2</version></dependency></dependencies>

编写代码测试:

package com.stirngzhua;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;
import java.util.List;/*** @Author Stringzhua* @Date 2024/10/25 16:28* description:*/
public class Test {public static void main(String[] args) throws InvalidProtocolBufferException {// 1.获取 canal 连接对象CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("120.26.240.11", 11111), "example", "", "");System.out.println(canalConnector);while (true) {// 2.获取连接canalConnector.connect();// 3.指定要监控的数据库canalConnector.subscribe("test.*");// 4.获取 MessageMessage message = canalConnector.get(100);List<CanalEntry.Entry> entries = message.getEntries();if (entries.size() <= 0) {System.out.println("没有数据,休息一会");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} else {for (CanalEntry.Entry entry : entries) {// TODO 获取表名String tableName = entry.getHeader().getTableName();// TODO Entry 类型CanalEntry.EntryType entryType = entry.getEntryType();// TODO 判断 entryType 是否为 ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// TODO 序列化数据ByteString storeValue = entry.getStoreValue();// TODO 反序列化CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);// TODO 获取事件类型CanalEntry.EventType eventType = rowChange.getEventType();// TODO 获取具体的数据List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();// TODO 遍历并打印数据for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();JSONObject beforeData = new JSONObject();for (CanalEntry.Column column : beforeColumnsList) {beforeData.put(column.getName(), column.getValue());}JSONObject afterData = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterData.put(column.getName(), column.getValue());}System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",After:"+ beforeData + ",After:" + afterData);}}}}}}
}

http://www.ppmy.cn/news/1542754.html

相关文章

微调大模型-4-合并基座模型

合并模型 新建文件夹,默认参数,开始导出,但是发现报错: 报错显示磁盘空间不足,这是由于AutoDL云,当前默认空间用的是系统盘,空间只有30G。 参考:https://blog.csdn.net/lwd19981223/article/details/130740905/ 将合并路径存储到50个G的数据盘,数据库路径是autodl-t…

标准数字隔离器主要特性和应用---腾恩科技

在现代电子系统中&#xff0c;不同电路部分之间需要可靠的隔离&#xff0c;尤其是在高压环境或必须保持敏感信号完整性的情况下。一种这样的解决方案是使用标准数字隔离器。这些组件在电路的不同部分之间提供电气隔离&#xff0c;确保安全、降噪和可靠的信号传输。本文深入探讨…

【c语言测试】

1. C语言中&#xff0c;逻辑“真”等价于&#xff08; &#xff09; 题目分析&#xff1a; “逻辑真”在C语言中通常指的是非零数。 A. 大于零的数B. 大于零的整数C. 非零的数 (正确答案)D. 非零的整数 正确答案&#xff1a;C 2. 若定义了数组 int a[3][4];&#xff0c;则对…

【LeetCode】每日一题 2024_10_27 冗余连接(并查集)

前言 每天和你一起刷 LeetCode 每日一题~ 今年过去 300 天了呀 LeetCode 启动&#xff01; 题目&#xff1a;冗余连接 代码与解题思路 题目翻译&#xff1a;找到一条边删除之后&#xff0c;所有节点依旧是连通的 看到连通块&#xff0c;就不由自主的先把并查集的思路套进去…

李宇皓现身第十届“文荣奖”,allblack造型帅气绅士引关注

近日&#xff0c;第十届“文荣奖”在众人的期待中拉开帷幕&#xff0c;与众多影视奖项不同&#xff0c;“文荣奖”始终关注年轻群体需求&#xff0c;致力于发掘和扶植影视新人新作&#xff0c;为热爱影视行业的新人提供宝贵机会与激励。今年的文荣奖评委阵容十分强大&#xff0…

在HBuilder中利用Moment.js插件实现时间戳的转换

1.登录网站Moment.js 中文网根据需要进行下载并运行到终端 2.在mine.js中配置&#xff1a; Vue.filter(formatDate, function(value, format) { if (!value) return ; // 如果没有提供格式&#xff0c;则使用默认格式 format format || YYYY-MM-DD HH:mm:ss; // 使…

鸿蒙网络编程系列33-TLS回声服务器示例

1. 网络通讯的安全性问题 在本系列的第1、2、3、25篇文章&#xff0c;分别讲解了使用UDP以及TCP进行通讯的方式&#xff0c;并且以回声服务器的形式分别演示了客户端与服务端之间的通讯。这种通讯方式一般来说没什么问题&#xff0c;但是在需要对内容保密的情况下就不可取了&a…

使用 docker 的方式部署 NFS server 提供文件共享能力

文章目录 [toc]构建 NFS server 镜像准备 Dockerfile准备 .bashrc 文件准备 nfsd.sh 构建镜像 特权模式dockerdocker-compose docker run 的方式环境变量方式配置文件挂载方式 docker-compose 的方式本地挂载 NFS题外话什么是 Capabilities常见的 Capabilities 列表获取 Capabi…