Canal学习

news/2025/2/16 3:31:35/

环境准备

MySQL搭建及binlog开启

MySQL搭建教程(Windows10):https://blog.csdn.net/liwenyang1992/article/details/121513620

MySQL搭建教程(Linux,包含MySQL5.7,Zookeeper,Kafka等):https://blog.csdn.net/liwenyang1992/article/details/121724311

开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,在my.ini文件中添加配置(Windows10):

# 配置binlog
# 设置日志路径,注意路经需要mysql用户有权限写
log-bin=mysql-bin # 开启 binlog
# 设置日志格式
binlog-format=ROW # 选择 ROW 模式server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

MySQL Binlog的格式有三种,分别是 STATEMENT / MIXED / ROW。

  • statement:语句级,binlog会记录每次一执行写操作的语句。相对 row模式节省空间,但是可能产生不一致性,比如“update xxtable set create_date=now()”,如果用 binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
    优点:节省空间 。
    缺点:有可能造成数据不一致。
  • row:行级,binlog会记录每次操作后每行记录的变化。
    优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。
    缺点:占用较大空间。
  • mixed:statement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题,默认还是statement,在某些情况下例如:当函数中包含UUID()时;包含AUTO_INCREMENT字段的表被更新时;执行INSERT DELAYED语句时;用 UDF 时;会按照ROW的方式进行处理。
    优点:节省空间,同时兼顾了一定的一致性。
    缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。

综合上面对比,Canal想做监控分析,选择row格式比较合适 。

授权canal连接MySQL账号,具有作为MySQL slave的权限, 如果已有账户可直接 grant授权:

-- 创建用户,账号是canal,密码是canal
CREATE USER canal IDENTIFIED BY 'canal';
-- 授权语句
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
-- 刷新权限
FLUSH PRIVILEGES;

权限说明:

PermissionDescription
SELECT获取快照或者查询表信息时使用。
RELOAD允许连接器使用FLUSH语句清除或重新加载内部缓存、刷新表或获取锁。
SHOW DATABASES获取数据库信息。
REPLICATION SLAVE连接并读取MySQL服务器binlog
REPLICATION CLIENT允许执行如下语句
SHOW MASTER STATUS
SHOW SLAVE STATUS
SHOW BINARY LOGS

Canal配置

下载canal, 访问 下载页面 , 选择需要的包下载, 如以 1.1.6 版本为例:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

conf目录下,canal.properties文件,这个文件是canal的基本通用配置,canal端口号默认就是11111,canal的输出model默认tcp,支持tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ等类型

一个canal服务中可以有多个instance,conf目录下的每一个example即是一个实例,每个实例下面都有独立的配置文件instance.properties。

默认只有一个实例example,如果需要多个实例处理不同的MySQL数据的话,直接拷贝出多个example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties中的canal.destinations=实例1,实例2,实例3。(英文逗号分隔)。

修改conf/example目录下instance.properties文件:

# 要与MySQL数据库中配置server-id不一致
canal.instance.mysql.slaveId=1002# 需要改成自己的数据库信息
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# 需要改成自己的数据库信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
canal.instance.connectionCharset = UTF-8
#table regex 1.1.6会报错
canal.instance.filter.regex = .\*\\\\..\*
  • 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

配置参见:https://github.com/alibaba/canal/wiki/QuickStart

数据库及SQL准备

创建数据库goodsdb,并创建书籍信息表:

DROP TABLE IF EXISTS canal_book_t;
CREATE TABLE canal_book_t (id int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',book_name varchar(128) NOT NULL COMMENT '书名',book_author varchar(64) NOT NULL COMMENT '作者',book_description varchar(1024) DEFAULT NULL COMMENT '描述',book_price decimal(8,2) NOT NULL COMMENT '价格',book_publish_date date NOT NULL COMMENT '出版日期',create_user varchar(64) NOT NULL COMMENT '创建人',create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',PRIMARY KEY (id),KEY idx_book_name (book_name),KEY idx_book_price (book_price)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT '书籍信息表';

插入单条数据:

INSERT INTO canal_book_t(book_name,book_author,book_description,book_price,book_publish_date,create_user,create_time)
VALUES('领域驱动设计精粹','覃宇','领域驱动设计(DDD)是时下软件设计领域中的热门话题,它通过指导我们构建领域模型,来表达丰富的软件功能需求,并由此实现可以满足用户真正需要的软件。',64.99,'2018-07-10','李四',NOW());

插入多条数据:

INSERT INTO canal_book_t(book_name,book_author,book_description,book_price,book_publish_date,create_user,create_time)
VALUES('Spring Data JPA 从入门到精通','张振华','《Spring Data JPA从入门到精通》以Spring Boot为技术基础,从入门到精通,由浅入深地介绍Spring Data JPA的使用。有语法,有实践,有原理剖析。',59.00,'2018-05-30','张三',NOW()),('铁军团队','欧德张','本书深刻揭示阿里铁军的团建模式、目标设定、PK文化、制度保障等可执行的方法体系。',69.00,'2021-05-20','王五',NOW());

修改多个字段

UPDATE canal_book_t SET book_name = 'DDD领域驱动设计精粹',book_author = 'Vaughn Vernon' WHERE id = 1;

修改多条数据

UPDATE canal_book_t SET book_price = book_price * 0.7 WHERE book_price > 60.00;

删除一条数据

DELETE FROM canal_book_t WHERE id = 1;

批量删除

DELETE FROM canal_book_t WHERE id IN (2,3);

使用

通过ClientAPI直接使用

使用Canal ClientAPI代码及原理参见官方文档:https://github.com/alibaba/canal/wiki/ClientAPI、https://github.com/alibaba/canal/wiki/ClientExample

Canal数据对象格式简单介绍:EntryProtocol.proto

Entry  Header  logfileName [binlog文件名]  logfileOffset [binlog position]  executeTime [binlog里记录变更发生的时间戳,精确到秒]  schemaName   tableName  eventType [insert/update/delete类型]  entryType   [事务头BEGIN/事务尾END/数据ROWDATA]  storeValue  [byte数据,可展开,对应的类型为RowChange]  
RowChangeisDdl       [是否是ddl变更操作,比如create table/drop table]sql         [具体的ddl sql]rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]beforeColumns [Column类型的数组,变更前的数据字段]afterColumns [Column类型的数组,变更后的数据字段]ColumnindexsqlType     [jdbc type]name        [column name]isKey       [是否为主键]updated     [是否发生过变更]isNull      [值是否为null]value       [具体的内容,注意为string文本] 

配置

修改/conf/example目录下instance.properties配置:

#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=1002# enable gtid use true/false
canal.instance.gtidon=false# position info
canal.instance.master.address=192.168.3.100:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.\*\\\\..\*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

Java示例

通过Canal Client API直接获取数据示例:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;public class Main {public static void main(String[] args) {// 创建Canal连接,本地安装的canalCanalConnector connector =CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");// 获取连接connector.connect();// 指定要监控的数据库connector.subscribe("goodsdb.*");boolean flag = true;final int batchSize = 100;while (flag) {// 获取Message:一次canal从日志中抓取的信息,一个message可以包含多个sql执行的结果// Message message = connector.get(batchSize, 10L, TimeUnit.SECONDS);Message message = connector.get(batchSize);// Entry:对应一个sql命令,一个sql可能会对多行记录造成影响List<CanalEntry.Entry> entries = message.getEntries();if (entries.isEmpty()) {System.out.println("未获取到数据,暂停1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException exception) {System.err.println(exception);}continue;}for (CanalEntry.Entry entry : entries) {// EntryType:打散后的事件类型,主要用于标识事务的开始,变更数据,结束CanalEntry.EntryType entryType = entry.getEntryType();// 判断entryType是否为ROWDATA类型if (Objects.equals(CanalEntry.EntryType.ROWDATA, entryType)) {System.out.println("eventType:" + entry.getHeader().getEventType());// 序列化的,不可直接使用ByteString storeValue = entry.getStoreValue();try {// 反序列化RowChange rowChange = RowChange.parseFrom(storeValue);System.out.println(rowChange);} catch (InvalidProtocolBufferException exception) {System.err.println(exception);}}}}// 释放连接connector.disconnect();}
}

单元测试用例:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
public class CanalClientTest {@Testpublic void test() {// 获取Canal连接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");// 获取连接connector.connect();// 指定要监控的数据库connector.subscribe("goodsdb.canal_book_t");boolean flag = true;while (flag) {// 获取MessageMessage message = connector.get(100);List<CanalEntry.Entry> entries = message.getEntries();if (entries.isEmpty()) {log.info("暂无数据,休息1秒!");try {TimeUnit.SECONDS.sleep(1L);} catch (InterruptedException exception) {log.error("异常信息:{}", exception.getMessage());}} else {for (CanalEntry.Entry entry : entries) {// 从CanalEntry.Header中获取表名String tableName = entry.getHeader().getTableName();// 获取Entry类型CanalEntry.EntryType entryType = entry.getEntryType();// 判断类型是否为ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 得到序列化的数据ByteString storeValue = entry.getStoreValue();// 反序列化操作try {RowChange rowChange = RowChange.parseFrom(storeValue);// 获取事件类型CanalEntry.EventType eventType = rowChange.getEventType();// 获取具体的数据List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();switch (eventType) {case INSERT:for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();log.info("INSERT , BeforeColumnsList is {};AfterColumnsList is {}", beforeColumnsList, afterColumnsList);}break;case UPDATE:for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();log.info("UPDATE , BeforeColumnsList is {};AfterColumnsList is {}", beforeColumnsList, afterColumnsList);}break;case DELETE:for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();log.info("DELETE , BeforeColumnsList is {};AfterColumnsList is {}", beforeColumnsList, afterColumnsList);}break;default:log.info("RowChange Type is {}", eventType);}} catch (InvalidProtocolBufferException exception) {log.error("反序列化异常信息:{}", exception.getMessage());}}}}}connector.disconnect();}
}

通过同步Kafka间接使用

canal1.1.6版本一直不成功,这里使用canal1.1.5版本配置启动

创建的Topic为:T-Canal-Byte,注意:由于序列化原因,所以不要手工添加内容到队列,否则会导致序列化异常

配置

修改/conf目录下canal.properties配置:

#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
# 修改协议
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360#################################################
######### 		destinations		#############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=# 修改flatMessage
canal.mq.flatMessage = false
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = localcanal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8##################################################
######### 		     Kafka 		     #############
##################################################
# 修改kafka服务地址
kafka.bootstrap.servers = 192.168.3.100:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = ##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

修改/conf/example目录下instance.properties配置:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# 修改
canal.instance.mysql.slaveId=1011# enable gtid use true/false
canal.instance.gtidon=false# position info
# 修改
canal.instance.master.address=192.168.3.100:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
# 修改
canal.mq.topic=T-Canal-Byte
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

备注:常用kafka命令

./bin/kafka-topics.sh --create --zookeeper 192.168.3.100:2181 --replication-factor 1 --partitions 1 --topic T-Canal-Byte./bin/kafka-topics.sh --list --zookeeper 192.168.3.100:2181./bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.100:9092 --from-beginning --topic T-Canal-Byte

Java代码(基于Spring Boot)

SpringBoot+Kafka+Canal+Redis示例

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.1</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.lwy.it</groupId><artifactId>mysql-2-redis</artifactId><version>0.0.1-SNAPSHOT</version><name>mysql-2-redis</name><description>Demo project for MySQL Binlog Through Kafka to Update Redis Cache</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- RedisTemplate底层使用commons-pool2来作为连接池--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.6</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.6</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

application.properties配置:

############################################################
# REDIS 配置
############################################################
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.password=
spring.redis.lettuce.pool.max-active=18
spring.redis.lettuce.pool.max-wait=-1
spring.redis.lettuce.pool.max-idle=10
spring.redis.lettuce.pool.min-idle=2
spring.redis.lettuce.shutdown-timeout=100ms
spring.redis.timeout=100
spring.cache.cache-names=myCache
# RedisTemplate底层使用commons-pool2来作为连接池
spring.redis.lettuce.pool.enabled=true############################################################
# Kafka 配置
############################################################
# kafka集群信息,多个用逗号间隔
spring.kafka.bootstrap-servers=192.168.3.100:9092# 消费者配置项
# 消费者组
spring.kafka.consumer.group-id=canal-kafka-consumer
# 自动提交的时间间隔 在Spring Boot 2.X版本中值的类型为Duration,需要符合特定的格式,如:1S,1M,1H,2D
spring.kafka.consumer.auto-commit-interval=1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理
# none:如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常
# earliest:在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从头开始消费
# latest:在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从最新的数据开始消费
spring.kafka.consumer.auto-offset-reset=earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
spring.kafka.consumer.enable-auto-commit=false
# Key的序列化方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Value的序列化方式
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer# record:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# batch:当每一批poll()的数据被ListenerConsumer处理之后提交
# time:当每一批poll()的数据被ListenerConsumer处理之后,距离上次提交时间大于TIME时提交
# count:当每一批poll()的数据被ListenerConsumer处理之后,被处理record数量大于等于COUNT时提交
# count_time:TIME或COUNT中有一个条件满足时提交
# manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交
# manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
spring.kafka.listener.ack-mode=manual_immediate
# 容器运行的线程数
spring.kafka.listener.concurrency=1

配置类及Redis工具类:

package com.lwy.it.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.interceptor.CacheErrorHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** 使用Jackson2JsonRedisSerializer序列化和反序列化ø*/
@Configuration
public class RedisConfiguration extends CachingConfigurerSupport {@Bean("redisTemplate")public RedisTemplate redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {RedisTemplate redisTemplate = new RedisTemplate();// 启用连接共享lettuceConnectionFactory.setShareNativeConnection(true);// 配置连接工厂redisTemplate.setConnectionFactory(lettuceConnectionFactory);// 使用Jackson2JsonRedisSerializer来序列化和反序列化Redis的value值Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();// 指定药序列化的域,field,get和set,以及修饰符范围// ANY任何级别的字段都可以自动识别objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会抛出异常objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);// value采用Json序列化redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// 使用StringRedisSerializer来序列化和反序列化Redis的keyredisTemplate.setKeySerializer(new StringRedisSerializer());// 使用StringRedisSerializer来序列化和反序列化Redis的Hash keyredisTemplate.setHashKeySerializer(new StringRedisSerializer());// Hash value采用Json序列化redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);return redisTemplate;}@Overridepublic CacheErrorHandler errorHandler() {return new RedisCacheErrorHandler();}
}
package com.lwy.it.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.Cache;
import org.springframework.cache.interceptor.CacheErrorHandler;@Slf4j
public class RedisCacheErrorHandler implements CacheErrorHandler {@Overridepublic void handleCacheGetError(RuntimeException exception, Cache cache, Object key) {log.error(exception.getMessage(), exception);}@Overridepublic void handleCachePutError(RuntimeException exception, Cache cache, Object key, Object value) {log.error(exception.getMessage(), exception);}@Overridepublic void handleCacheEvictError(RuntimeException exception, Cache cache, Object key) {log.error(exception.getMessage(), exception);}@Overridepublic void handleCacheClearError(RuntimeException exception, Cache cache) {log.error(exception.getMessage(), exception);}
}
package com.lwy.it.util;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SetOperations;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;@Component
@Slf4j
public final class RedisOperateUtil {// 默认最长时效时间(秒)private static final long EXPIRED_TIME = 604800L;// 设置前缀private static final String PREFIX = "RedisCache:";@Autowiredprivate RedisOperations redisTemplate;private final String generateRedisKey(final String key) {if (StringUtils.hasText(key)) {StringBuilder stringBuilder = new StringBuilder(PREFIX);stringBuilder.append(key);return stringBuilder.toString();} else {throw new NullPointerException("Redis Key can not be null or empty.");}}private final Set<String> generateRedisKey(final Set<String> keys) {if (CollectionUtils.isEmpty(keys)) {throw new NullPointerException("Redis Key Set can not be null or empty.");}Set<String> set = new HashSet<>();for (String key : keys) {set.add(this.generateRedisKey(key));}return set;}/*** 设置默认过期市场为7天,此方法key不做处理** @param key Redis key*/private final void setDefaultExpiredTime(final String key) {Long ttl = this.ttl(key, TimeUnit.SECONDS);if (!Objects.isNull(ttl) && ttl.equals(-1L)) {this.expire(key, EXPIRED_TIME, TimeUnit.SECONDS);}}/*** DEL key** @param key Redis Key* @return 是否删除成功*/public Boolean del(final String key) {Boolean result = null;try {result = redisTemplate.delete(this.generateRedisKey(key));} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** DEL key key** @param keys Redis Keys* @return 删除的数量*/public Long del(final Set<String> keys) {Long result = null;try {result = redisTemplate.delete(this.generateRedisKey(keys));} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** EXISTS key** @param key Redis Key* @return 是否存在key*/public Boolean exists(final String key) {Boolean result = null;try {result = redisTemplate.hasKey(this.generateRedisKey(key));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** EXISTS key1 key2** @param keys Redis Keys* @return 指定为参数的键中存在的键数,多次提及和存在的键被多次计算。*/public Long exists(final Set<String> keys) {Long result = null;try {result = redisTemplate.countExistingKeys(this.generateRedisKey(keys));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** EXPIRE key seconds** @param key     Redis Key* @param timeout 超时时间* @param unit    时间粒度单位* @return 在管道/事务中使用时为 null*/public Boolean expire(final String key, final long timeout, TimeUnit unit) {Boolean result = null;try {result = redisTemplate.expire(this.generateRedisKey(key), timeout, unit);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** TTL key** @param key      Redis Key* @param timeUnit 时间粒度单位* @return 按照给定时间粒度单位,返回过期时间,时间粒度单位为空时默认为秒*/public Long ttl(final String key, TimeUnit timeUnit) {if (Objects.isNull(timeUnit)) {timeUnit = TimeUnit.SECONDS;}Long result = null;try {result = redisTemplate.getExpire(this.generateRedisKey(key), timeUnit);} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** TTL key** @param key Redis Key* @return 按照给定时间粒度单位,返回过期时间,时间粒度单位为空时默认为秒*/public Long ttl(final String key) {return this.ttl(key, TimeUnit.SECONDS);}/*** SET key value** @param key   Redis Key* @param value 存储的value* @param <V>   value泛型类型*/public <V> void set(final String key, final V value) {this.setex(key, value, EXPIRED_TIME, TimeUnit.SECONDS);}/*** GET key** @param key Redis Key* @param <V> value泛型类型* @return 返回存储的value*/public <V> V get(final String key) {ValueOperations<String, V> operations = redisTemplate.opsForValue();V result = null;try {result = operations.get(this.generateRedisKey(key));} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** GETSET key value** @param key   Redis Key* @param value 存储的value* @param <V>   value泛型类型* @return 指定key的值,并返回key的旧值*/public <V> V getset(final String key, final V value) {ValueOperations<String, V> operations = redisTemplate.opsForValue();V result = null;final String redisKey = this.generateRedisKey(key);try {result = operations.getAndSet(redisKey, value);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** MSET key value [key value ...]* 缺少过期时间,不建议使用** @param map 不能为空* @param <V> value泛型类型*/@Deprecatedpublic <V> void mset(final Map<String, V> map) {if (!CollectionUtils.isEmpty(map)) {ValueOperations<String, V> operations = redisTemplate.opsForValue();try {operations.multiSet(map);} catch (Exception exception) {log.error(exception.getMessage(), exception);}} else {log.warn("Parameters map is null or empty");}}/*** MGET key [key ...]* 建议使用LinkedHashSet** @param keys 不重复Redis Key集合,不能为空* @param <V>  value泛型类型* @return 结果List集合*/public <V> List<V> mget(final Set<String> keys) {List<V> result = Collections.emptyList();if (!CollectionUtils.isEmpty(keys)) {ValueOperations<String, V> operations = redisTemplate.opsForValue();try {result = operations.multiGet(this.generateRedisKey(keys));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}}return result;}/*** SETEX key seconds value** @param key     Redis Key* @param value   Redis Value* @param timeout 超时时间* @param unit    单位* @param <V>     value泛型类型*/public <V> void setex(final String key, final V value, final long timeout, TimeUnit unit) {if (Objects.isNull(unit)) {unit = TimeUnit.SECONDS;}ValueOperations<String, V> operations = redisTemplate.opsForValue();try {operations.set(this.generateRedisKey(key), value, timeout, unit);} catch (Exception exception) {log.error(exception.getMessage(), exception);}}/*** SETNX key value** @param key     Redis Key* @param value   Redis Value* @param timeout 超时时间* @param unit    单位* @param <V>     value泛型类型* @return 设置成功,返回true。设置失败,返回false。*/public <V> Boolean setnx(final String key, final V value, final long timeout, TimeUnit unit) {if (Objects.isNull(unit)) {unit = TimeUnit.SECONDS;}ValueOperations<String, V> operations = redisTemplate.opsForValue();Boolean result = null;try {result = operations.setIfAbsent(this.generateRedisKey(key), value, timeout, unit);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** INCR key* 如果key不存在,那么key的值会先被初始化为0,然后再执行INCR操作。* 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。** @param key Redis Key* @return 加上1之后,key的值。*/public Long incr(final String key) {return this.incrby(key, 1L);}/*** INCRBY key increment** @param key   Redis Key* @param delta 指定的增量值* @return 加上指定的增量值之后,key的值。*/public Long incrby(final String key, final long delta) {Long result = null;ValueOperations<String, Long> operations = redisTemplate.opsForValue();final String redisKey = this.generateRedisKey(key);try {result = operations.increment(redisKey, delta);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** DECRBY key** @param key Redis Key* @return 减去1之后,key的值。*/public Long decr(final String key) {return this.decrby(key, 1L);}/*** Redis Decrby命令将key所储存的值减去指定的减量值。* 如果key不存在,那么key的值会先被初始化为0,然后再执行DECRBY操作。* 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。** @param key   Redis Key* @param delta 指定减量值* @return 减去指定减量值之后,key的值。*/public Long decrby(final String key, final long delta) {ValueOperations<String, Long> operations = redisTemplate.opsForValue();Long result = null;final String redisKey = this.generateRedisKey(key);try {result = operations.decrement(redisKey, delta);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** HDEL key field [field ...]** @param key      Redis Key* @param hashKeys Hash Keys* @return 被成功删除字段的数量,不包括被忽略的字段。*/public Long hdel(final String key, final String... hashKeys) {Long result = null;try {result = redisTemplate.opsForHash().delete(this.generateRedisKey(key), hashKeys);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** HEXISTS key field** @param key     Redis Key* @param hashKey Hash Key* @return 如果哈希表含有给定字段,返回teue。如果哈希表不含有给定字段,或key不存在,返回false。*/public Boolean hexists(final String key, final String hashKey) {Boolean result = null;try {result = redisTemplate.opsForHash().hasKey(this.generateRedisKey(key), hashKey);} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** HGET key field** @param key     Redis Key* @param hashKey Hash Key* @param <V>     value泛型类型* @return 返回给定字段的值。如果给定的字段或key不存在时,返回null。*/public <V> V hget(final String key, final String hashKey) {HashOperations<String, String, V> operations = redisTemplate.opsForHash();V result = null;try {result = operations.get(this.generateRedisKey(key), hashKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** HGETALL key** @param key Redis Key* @param <V> value泛型类型* @return 以列表形式返回哈希表的字段及字段值。若key不存在,返回空列表。*/public <V> Map<String, V> hgetall(final String key) {HashOperations<String, String, V> operations = redisTemplate.opsForHash();Map<String, V> result = null;try {result = operations.entries(this.generateRedisKey(key));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** HKEYS key** @param key Redis Key* @return 包含哈希表中所有域(field)列表。当key不存在时,返回一个空列表。*/public Set<String> hkeys(final String key) {Set<String> result = null;try {result = redisTemplate.opsForHash().keys(this.generateRedisKey(key));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** HLEN key** @param key Redis Key* @return 获取哈希表中字段的数量*/public Long hlen(final String key) {Long result = null;try {result = redisTemplate.opsForHash().size(this.generateRedisKey(key));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** HMGET key field [field ...]* 建议使用LinkedHashSet** @param key      Redis Key* @param hashKeys Hash Key* @param <V>      value泛型类型* @return 一个包含多个给定字段关联值的表,表值的排列顺序和指定字段的请求顺序一样。*/public <V> List<V> hmget(final String key, final Set<String> hashKeys) {HashOperations<String, String, V> operations = redisTemplate.opsForHash();List<V> result = null;try {result = operations.multiGet(this.generateRedisKey(key), hashKeys);} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** HSET key field value** @param key     Redis Key* @param hashKey Hash Key* @param value   存储的值* @param <V>     value泛型类型*/public <V> void hset(final String key, final String hashKey, final V value) {final String redisKey = this.generateRedisKey(key);try {redisTemplate.opsForHash().put(redisKey, hashKey, value);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}}/*** HMSET key field value [field value ...]** @param key Redis Key* @param map Redis Key Value* @param <V> value泛型类型*/public <V> void hmset(final String key, final Map<String, V> map) {final String redisKey = this.generateRedisKey(key);if (!CollectionUtils.isEmpty(map)) {try {redisTemplate.opsForHash().putAll(redisKey, map);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}}}/*** HSETNX key field value* 只有在字段 field 不存在时,设置哈希表字段的值。** @param key     Redis Key* @param hashKey Hash Key* @param value   存储的值* @param <V>     value泛型类型* @return 设置成功,返回true。如果给定字段已经存在且没有操作被执行,返回false。*/public <V> Boolean hsetnx(final String key, final String hashKey, final V value) {Boolean result = null;final String redisKey = this.generateRedisKey(key);try {result = redisTemplate.opsForHash().putIfAbsent(redisKey, hashKey, value);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** HVALS key** @param key Redis Key* @param <V> value泛型类型* @return 一个包含哈希表中所有值的列表。当key不存在时,返回一个空表。*/public <V> List<V> hvals(String key) {List<V> result = null;try {result = redisTemplate.opsForHash().values(this.generateRedisKey(key));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** BLPOP key timeout** @param key     Redis Key* @param timeout 超时时间* @param unit    单位* @param <V>     value泛型类型* @return 被弹出元素的值*/public <V> V blpop(final String key, final long timeout, TimeUnit unit) {if (Objects.isNull(unit)) {unit = TimeUnit.SECONDS;}ListOperations<String, V> operations = redisTemplate.opsForList();V result = null;try {result = operations.leftPop(this.generateRedisKey(key), timeout, unit);} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** BRPOP key timeout** @param key     Redis Key* @param timeout 超时时间* @param unit    单位* @param <V>     value泛型类型* @return 被弹出元素的值*/public <V> V brpop(final String key, final long timeout, TimeUnit unit) {if (Objects.isNull(unit)) {unit = TimeUnit.SECONDS;}ListOperations<String, V> operations = redisTemplate.opsForList();V result = null;try {result = operations.rightPop(this.generateRedisKey(key), timeout, unit);} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** LINDEX key index** @param key   Redis Key* @param index 索引* @param <V>   value泛型类型* @return 通过索引获取列表中的元素*/public <V> V lindex(final String key, final long index) {ListOperations<String, V> operations = redisTemplate.opsForList();V result = null;try {result = operations.index(this.generateRedisKey(key), index);} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** LLEN key** @param key Redis Key* @return 列表长度*/public Long llen(final String key) {Long result = null;try {result = redisTemplate.opsForList().size(this.generateRedisKey(key));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** LPOP key** @param key Redis Key* @param <V> value泛型类型* @return 移出并获取列表的第一个元素*/public <V> V lpop(final String key) {ListOperations<String, V> operations = redisTemplate.opsForList();V result = null;try {result = operations.leftPop(this.generateRedisKey(key));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** LPUSH key value** @param key   Redis Key* @param value 存储的值* @param <V>   value泛型类型* @return 执行LPUSH命令后,列表的长度*/public <V> Long lpush(final String key, final V value) {ListOperations<String, V> operations = redisTemplate.opsForList();Long result = null;final String redisKey = this.generateRedisKey(key);try {result = operations.leftPush(redisKey, value);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** LPUSH key value [value ...]** @param key    Redis Key* @param values 存储的列表值* @param <V>    value泛型类型* @return 执行LPUSH命令后,列表的长度*/public <V> Long lpush(final String key, final Collection<V> values) {ListOperations<String, V> operations = redisTemplate.opsForList();Long result = null;final String redisKey = this.generateRedisKey(key);try {result = operations.leftPushAll(redisKey, values);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** LRANGE key start stop** @param key   Redis Key* @param start 开始索引* @param end   结束索引* @param <V>   value泛型类型* @return 列表指定范围内的元素*/public <V> List<V> lrange(final String key, final long start, final long end) {ListOperations<String, V> operations = redisTemplate.opsForList();List<V> result = null;try {result = operations.range(this.generateRedisKey(key), start, end);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** LSET key index value** @param key   Redis Key* @param index 索引* @param value 存储的值* @param <V>   value泛型类型*/public <V> void lset(final String key, final long index, final V value) {ListOperations<String, V> operations = redisTemplate.opsForList();try {operations.set(this.generateRedisKey(key), index, value);} catch (Exception exception) {log.error(exception.getMessage(), exception);}}/*** RPOP key** @param key Redis Key* @param <V> value泛型类型* @return 被移除的元素*/public <V> V rpop(final String key) {ListOperations<String, V> operations = redisTemplate.opsForList();V result = null;try {result = operations.rightPop(this.generateRedisKey(key));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** RPUSH key value** @param key   Redis Key* @param value 存储的值* @param <V>   value泛型类型* @return 执行RPUSH操作后,列表的长度*/public <V> Long rpush(final String key, final V value) {ListOperations<String, V> operations = redisTemplate.opsForList();Long result = null;final String redisKey = this.generateRedisKey(key);try {result = operations.rightPush(redisKey, value);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** RPUSH key value [value ...]** @param key    Redis Key* @param values 存储的列表值* @param <V>    value泛型类型* @return 执行RPUSH操作后,列表的长度*/public <V> Long rpush(final String key, Collection<V> values) {ListOperations<String, V> operations = redisTemplate.opsForList();Long result = null;final String redisKey = this.generateRedisKey(key);try {result = operations.rightPushAll(redisKey, values);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** SADD key member [member ...]** @param key    Redis Key* @param values 存储的列表值* @param <V>    value泛型类型* @return 被添加到集合中的新元素的数量,不包括被忽略的元素。*/public <V> Long sadd(final String key, final V... values) {SetOperations<String, V> operations = redisTemplate.opsForSet();Long result = null;final String redisKey = this.generateRedisKey(key);try {result = operations.add(redisKey, values);this.setDefaultExpiredTime(redisKey);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}/*** SCARD key** @param key Redis Key* @param <V> value泛型类型* @return 集合的数量。当集合key不存在时,返回0。*/public <V> Long scard(final String key) {Long result = null;try {result = redisTemplate.opsForSet().size(this.generateRedisKey(key));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** SISMEMBER key member** @param key    Redis Key* @param object 成员元素* @return 如果成员元素是集合的成员,返回true。如果成员元素不是集合的成员,或key不存在,返回false。*/public Boolean sismember(final String key, final Object object) {Boolean result = null;try {result = redisTemplate.opsForSet().isMember(this.generateRedisKey(key), object);} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** SMEMBERS key** @param key Redis Key* @param <V> value泛型类型* @return 集合中的所有成员。*/public <V> Set<V> smembers(final String key) {SetOperations<String, V> operations = redisTemplate.opsForSet();Set<V> result = null;try {result = operations.members(this.generateRedisKey(key));} catch (RedisConnectionFailureException exception) {log.error(exception.getMessage(), exception);} catch (QueryTimeoutException exception) {log.error(exception.getMessage(), exception);}return result;}/*** SREM key member [member ...]** @param key    Redis Key* @param values 删除的值* @return 被成功移除的元素的数量,不包括被忽略的元素。*/public Long srem(final String key, final Object... values) {Long result = null;try {result = redisTemplate.opsForSet().remove(this.generateRedisKey(key), values);} catch (Exception exception) {log.error(exception.getMessage(), exception);}return result;}
}

Kafka客户端实现:

package com.lwy.it.mq;import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.lwy.it.BookInfoVO;
import com.lwy.it.util.ListColumnToBeanUtil;
import com.lwy.it.util.RedisOperateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.TimeUnit;@Component
@Slf4j
public class CanalKafkaListener {private ObjectMapper objectMapper = new ObjectMapper();@Autowiredprivate RedisOperateUtil redisOperateUtil;@KafkaListener(topics = {"T-Canal-Byte"}, groupId = "canal-kafka-consumer")public void onMessage(ConsumerRecord<String, byte[]> consumerRecord, Acknowledgment acknowledgment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Message message = CanalMessageDeserializer.deserializer(consumerRecord.value());List<CanalEntry.Entry> entries = message.getEntries();for (CanalEntry.Entry entry : entries) {// 得到序列化的数据ByteString byteString = entry.getStoreValue();// 反序列化操作CanalEntry.RowChange rowChange = null;try {rowChange = CanalEntry.RowChange.parseFrom(byteString);} catch (InvalidProtocolBufferException exception) {log.error("异常原因:{},异常堆栈:{}", exception.getMessage(), exception);}// 获取事件类型CanalEntry.EventType eventType = rowChange.getEventType();// 获取具体的数据List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();switch (eventType) {case INSERT:if (!rowDatasList.isEmpty()) {for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();BookInfoVO bookInfoVO = ListColumnToBeanUtil.ListColumnToBean(afterColumnsList, BookInfoVO.class);redisOperateUtil.setex("bookId:" + bookInfoVO.getId(), bookInfoVO, 1L, TimeUnit.HOURS);}}break;case UPDATE:if (!rowDatasList.isEmpty()) {for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();BookInfoVO bookInfoVO = ListColumnToBeanUtil.ListColumnToBean(afterColumnsList, BookInfoVO.class);redisOperateUtil.del("bookId:" + bookInfoVO.getId());redisOperateUtil.setex("bookId:" + bookInfoVO.getId(), bookInfoVO, 1L, TimeUnit.HOURS);}}break;case DELETE:if (!rowDatasList.isEmpty()) {for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();BookInfoVO bookInfoVO = ListColumnToBeanUtil.ListColumnToBean(beforeColumnsList, BookInfoVO.class);redisOperateUtil.del("bookId:" + bookInfoVO.getId());}}break;default:log.info("RowChange Type is {}", eventType);}}acknowledgment.acknowledge();}}

其它工具类:

package com.lwy.it.util;import com.alibaba.otter.canal.protocol.CanalEntry;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.Field;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Objects;@Slf4j
public class ListColumnToBeanUtil {public static final <T> T ListColumnToBean(List<CanalEntry.Column> list, Class<T> targetClass) {if (list == null || list.isEmpty()) {throw new NullPointerException("获取到CanalEntry.Column列表为空");}T obj = null;try {obj = targetClass.newInstance();for (CanalEntry.Column column : list) {String columnName = column.getName();String columnFieldName = StringUtil.underlineToCamel(columnName);int sqlType = column.getSqlType();boolean isNull = column.getIsNull();String value = column.getValue();Field field = targetClass.getDeclaredField(columnFieldName);if (Objects.nonNull(field) && !isNull) {field.setAccessible(true);field.set(obj, getValue(sqlType, value));}}} catch (InstantiationException exception) {log.error(exception.getMessage(), exception);} catch (IllegalAccessException exception) {log.error(exception.getMessage(), exception);} catch (NoSuchFieldException exception) {log.error(exception.getMessage(), exception);}return obj;}private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");private static final SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static final Object getValue(int sqlType, String value) {Object result = null;switch (sqlType) {case 4:try {result = Integer.parseInt(value);} catch (NumberFormatException exception) {log.error(exception.getMessage(), exception);}break;case 12:result = value;break;case 3:try {result = Double.parseDouble(value);} catch (NumberFormatException exception) {log.error(exception.getMessage(), exception);}break;case 91:try {result = dateFormat.parse(value);} catch (ParseException exception) {log.error(exception.getMessage(), exception);}break;case 93:try {result = dateTimeFormat.parse(value);} catch (ParseException exception) {log.error(exception.getMessage(), exception);}break;default:log.warn("类型均不匹配:{}" + sqlType);}return result;}
}
package com.lwy.it.util;public class StringUtil {public static final String underlineToCamel(final String string) {if (isNullOrEmpty(string)) {return "";}int length = string.length();char[] chars = string.toCharArray();boolean flag = false;StringBuilder stringBuilder = new StringBuilder();for (int i = 0; i < length; i++) {char ch = Character.toLowerCase(chars[i]);if (ch == '_') {flag = true;continue;} else {if (flag) {stringBuilder.append(Character.toUpperCase(ch));flag = false;} else {stringBuilder.append(ch);}}}return stringBuilder.toString();}public static final boolean isNullOrEmpty(String string) {return (string == null || string.isEmpty() || !containsText(string));}private static final boolean containsText(String str) {int strLen = str.length();for (int i = 0; i < strLen; i++) {if (!Character.isWhitespace(str.charAt(i))) {return true;}}return false;}
}

示例VO:

package com.lwy.it;import lombok.Data;import java.io.Serializable;
import java.util.Date;@Data
public class BookInfoVO implements Serializable {private int id;private String bookName;private String bookAuthor;private String bookDescription;private double bookPrice;private Date bookPublishDate;private String createUser;private Date createTime;
}

Java代码示例Demo

package com.lwy.it;import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;@Slf4j
public class KafkaConsumerClient {public static Properties initProperties() {Properties properties = new Properties();// broker地址properties.put("bootstrap.servers", "192.168.3.100:9092");// 消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息properties.put("group.id", "canal-group");// 开启自动提交offsetproperties.put("enable.auto.commit", "false");// 自动提交offset延迟时间properties.put("auto.commit.interval.ms", "10000");// 默认是latest,如果需要从头开始消费partition消息,需改为earliest,且消费者组名变更才生效// properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 反序列化properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");return properties;}/*** 消费者消费消息*/public static void consumer() {Properties properties = initProperties();Consumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(properties);// 订阅主题consumer.subscribe(Arrays.asList("T-Canal-Byte"));while (true) {// 100ms阻塞超时时间ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1000));consumerRecords.forEach((ConsumerRecord<String, byte[]> consumerRecord) -> {System.out.println("~~~~~~~~~~~~~~~~~~~~~");System.out.println(CanalMessageDeserializer.deserializer(consumerRecord.value()));log.info("Topic is {},Offset is {},Key is {},Value is {}", consumerRecord.topic(), consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());System.out.println("~~~~~~~~~~~~~~~~~~~~~");});// 非空才手工提交if (!consumerRecords.isEmpty()) {// commitSync同步阻塞当前线程(自动失败重试)// consumer.commitSync();// commitAsync异步不会阻塞当前线程,没有失败重试,回调callback函数获取提交信息,记录日志consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (Objects.isNull(exception)) {System.out.println("手工提交Offset成功:" + offsets.toString());} else {System.out.println("手工提交Offset失败:" + offsets.toString());exception.printStackTrace();}}});}}}public static void main(String[] args) {consumer();}}

参考文档:

https://github.com/alibaba/canal/wiki

https://www.jianshu.com/p/63c4ef2893e5


http://www.ppmy.cn/news/533454.html

相关文章

1 CAN

一、起源 CAN总线&#xff0c;全称Controller Area Network&#xff0c;控制器局域网络&#xff0c;是由德国博世BOSCH公司于1986年专门为汽车行业开发的一种串行通信总线&#xff0c;BOSCH公司以研发生产汽车电子产品和提供汽车解决方案著称&#xff0c;直到现在也是汽车领域知…

pecan

https://segmentfault.com/a/1190000003718598 上一篇文章我们了解了一个巨啰嗦的框架&#xff1a;Paste PasteDeploy Routes WebOb。后来OpenStack社区的人受不了这么啰嗦的代码了&#xff0c;决定换一个框架&#xff0c;他们最终选中了Pecan。Pecan框架相比上一篇文章的…

CAN 简介

1. 目的 本文主要介绍一部分 CAN 协议层 。 2. CAN 简介 这里的内容参考的是 1991.9 的 2.0 版本的官方 CAN 规格书 。 2.1 概述 CAN (控制器局域网) 是一种串行通讯协议 &#xff0c;传输速度可达 1Mbit/s 。总线由单一可双向信号传送的通道组成 &#xff0c;但此通道的物理层…

CANoe集成解决方案

CANoe作为专业的系统级总线网络开发和测试工具被众多整车厂和供应商的系统设计师、开发工程师和测试工程师所广泛使用&#xff0c;由于市场对CANoe使用需求的多元化和不确定性&#xff0c;CANoe对外提供了通用API来供第三方应用集成来解决在人机交互过程中的各种问题。比如&…

CANoe下载地址

下载地址&#xff1a;点击进入下载地址

CANOE功能介绍

1.CANoe主界面 当计算机安装完CANoe后&#xff0c;用户只需选择“开始”→“所有程序 ”→Vector CANoe 11.0→CANoe 11.0 系 统 菜 单 命 令 即 可 启 动CANoe。 为了快速熟悉CANoe的常用功能&#xff0c;我们可以打开Vector官方的自带例程&#xff0c;一边学习一边实践相关功…

CANoe-CAN通信

CANoe软件的CAN通信必须使用Vector公司的相关组件&#xff0c;CAN通信方案有&#xff1a; 1、VN1640/VN1630(CAN盒)&#xff1b; Vector CAN Log报文收发分析工具VN1640/VN1630 常用测试CAN、LIN的主要是使用VN1640和VN1630这2个主流的工具&#xff0c;这2个工具的功能和通道完…

CANOpen

CANOpen CAN现场总线只定义了OSI网络模型的第一层(物理层) 和第二层(数据链路层) &#xff0c;而这两层一般已被CAN硬件完全实现了。由于没有规定应用层&#xff0c;本身并不完整。CANOpen属于应用层协议来定义CAN报文中的11/29位标识符和8字节数据的使用。 CANopen协议的子协…