Canal实战使用(集群部署)和原理解析

news/2025/2/13 22:33:40/

1.mysql数据同步工作原理

 

MySQL master将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

MySQL slave将master的binary log events拷贝到它的中继日志(relay log)

MySQL slave重放relay log中事件,将数据变更反映它自己的数据

2.canal工作原理

canal模拟mysql数据同步的原理,伪装自己为mysql slave,向mysql master发送dump协议。

mysql master收到dump请求,开始推送binary log给slave(canal)

使用步骤

1.数据库需要开启binlog

show variables like 'binlog_format'
show variables like 'log_bin'

显示未on则开启。

查看binlog日志


show binlog events in 'DJT2TRG3-bin.000433';

可以查找执行操作的position

可以通过修改canal的meta.dat文件 修改位置 从之前的位置开始同步操作。

下载canal 服务端 canal deployer 和服务端 canal adapter。

下载完成解压后,修改配置文件,由于主要的内容就是修改配置文件,以下贴出我使用的配置:

服务端配置文件canal_local.properties:

# register ip
canal.register.ip = 127.0.0.1# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = canal-cluster-1
canal.admin.register.name = canal-server-2

canal.properties(可以在canal-admin中配置)

#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info 如果不配置的话 canal使用的数据存在内置数据库H2,重启等会丢失数据
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360#################################################
######### 		destinations		#############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = localcanal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = ##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

Instance.properties

# MySQL 集群配置中的 serverId 概念,需要保证和当前 MySQL 集群中 id 唯一 (v1.0.26+版本之后 canal 会自动生成,不需要手工指定)
# canal.instance.mysql.slaveId=0
# 源库地址
canal.instance.master.address=11.17.6.185:4679# 方式一:binlog + postion 如果用方式一 也可以不用填position 和binlog canal会自动获取
# mysql源库起始的binlog文件
# canal.instance.master.journal.name=mysql-bin.000008
# mysql主库链接时起始的binlog偏移量
# canal.instance.master.position=257890708 
# mysql主库链接时起始的binlog的时间戳
# canal.instance.master.timestamp=# 方式二:gtid(推荐)
# 启用 gtid 方式同步
canal.instance.gtidon=true
# gtid
canal.instance.master.gtid=92572381-0b9c-11ec-9544-8cdcd4b157e0:1-1066141# 源库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# MySQL 数据解析编码
canal.instance.connectionCharset = UTF-8# MySQL 数据解析关注的表,Perl 正则表达式
canal.instance.filter.regex=acpcanaldb.*# MySQL 数据解析表的黑名单,过滤掉不同步的表
# canal.instance.filter.black.regex=mysql\\.slave_.*

客户端配置:

my_test.yml(adapter\conf\rdb目录下可以添加多个.yml,都会读取)

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
#dbMapping:
#  database: account
#  table: demo
#  targetTable: demo
#  targetPk:
#    id: id
#  mapAll: true
#  targetColumns:
#    id:
#    name:
#    test:
#    role_id:
#    c_time:
#    test1:
#  etlCondition: "where c_time>={}"
#  commitBatch: 3000 # 批量提交的大小## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
##使用mirror方式 整个库的改动都会同步 但是必须两个库保持一致,不同的已存在表不会处理,另外因为是
##两个同名库,所以必须两个服务器上(两个ip)搭建mysql
##如果不用mirror方式,只会同步一个表,新增表添加yml文件,比较灵活
dbMapping:mirrorDb: truedatabase: account

配置完成后,先启动服务端(省略zk安装,较为简单可以自己百度),然后启动客户端,新建表添加数据进行测试。需要注意如果要同步DDL语句,必须用mirror方式,否则只能同步dml单表记录。

下边是运行截图:

 

 

 


http://www.ppmy.cn/news/62030.html

相关文章

软考A计划-常用公式复习

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例 👉关于作者 专注于Android/Unity和各种游戏开发技巧,以及各种资源分享&am…

SQL知识汇总

什么时候用存储过程合适 当一个事务涉及到多个SQL语句时或者涉及到对多个表的操作时就要考虑用存储过程;当在一个事务的完成需要很复杂的商业逻辑时(比如,对多个数据的操作,对多个状态的判断更改等)要考虑&#xff1b…

Nmap安全工具使用手册

Nmap安全工具使用手册 什么是Nmap? Nmap(Network Mapper) 是一款自由、开源的网络探测和安全审核工具。它适用于Windows、Linux、UNIX等各种操作系统平台,可以帮助管理员或安全研究人员判断目标机器的状态和对应的服务程序,同时也能探查隐蔽…

【Linux网络】网络应用层的 http 和 https协议

文章目录 1、http协议1.1 认识URL1.2 http协议格式1.3 http的方法(GET和POST)1.4 状态码1.5 cookie1.6 短连接和长连接 2、https协议2.1 常见的加密方式2.2 探究https协议的加密2.3 CA证书 1、http协议 在之前学习序列化和反序列化的时候,认…

模糊PID模糊控制(清晰化方法梯形图实现)

模糊PID的模糊化请参看下面的博客文章: 博途PLC模糊PID三角隶属度函数指令(含Matlab仿真)_plc 模糊pid_RXXW_Dor的博客-CSDN博客三角隶属度函数FC,我们采用兼容C99标准的函数返回值写法,在FB里调用会更加直观,下面给大家具体讲解代码。常规写法的隶属度函数FC可以参看下…

人大金仓亮相国际金融展,打造“金融+产业+生态”创新模式

4月27日,以“荟萃金融科技成果,展现数字金融力量,谱写金融服务中国式现代化新篇章”为主题的2023中国国际金融展圆满落幕。作为已经举办30年的行业盛会,人大金仓再一次重磅亮相,全方位展示国产数据库前沿应用和创新服务…

php 设置meta标签中的keywords | description | content-type | copyright的方法函数

怎么设置meta标签中的所有值 if(!function_exists(meta)) {/** 从键/值数组生成元标记,生成meta标签的keywords,description,Content-type,author等* param array* param string* param string* param string* return string*/function meta($name,$conte…

pdf怎么分割成一页一页的文件?

pdf怎么分割成一页一页的文件?相信很多使用电脑办公的小伙伴们都知道,无论是pdf文件还是ppt文件其都是由很多页组成的。一般当我们需要其中的部分内容时候,可以通过手动的方式将ppt文件来拆分成一页一页的来使用。但是这种手动的方法对于pdf文…