canal中间件集成springboot实战落地

news/2025/1/11 11:52:56/

目录

一、数据库开启相关权限功能:

  二、canal 服务端配置启动:从官网下载程序和源码到本地环境

三、canal客户端配置启动:


canal中间件集成springboot实战落地开始分享,这是目前互联网很常见的中间件,监听数据库变化、全量数据缓存等功能,起到很方便的作用,原理和使用场景可以直接参考官网,介绍的很详细,中文文档:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

今天我们就直接开始分享实战使用,使用分为三大步骤,数据库开启big-log功能,canal服务端配置启动、canal客户端配置启动和数据测试:

一、数据库开启相关权限功能:

 Canal 模拟 MySQL 从节点获得数据库服务器的数据,很明显,对 MySQL服务器的配置完全可以参考MySQL的主从复制中主节点的配置。

1、先开启 Binlog 写入功能,开启bin-log权限:

SHOW VARIABLES like "%log_bin%";

数据库显示已开启状态:

2、 配置 binlog-format 为 ROW 模式:


SHOW VARIABLES like "%binlog_format%";

查询显示: 

3、 主数据库的唯一值server_id配置:


SHOW VARIABLES like "%server_id%";

 查询显示:

4、创建一个用户用来同步数据:

CREATE USER canal IDENTIFIED BY 'canal';     GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 
--如果创建报错,可以先刷新一下权限
FLUSH PRIVILEGES;--创建后查询用户相关信息
SELECT * from mysql.user;SELECT user, select_priv,Repl_slave_priv,Repl_client_priv from mysql.user;

 创建后查询:

5、 查询此主库的状态:

show master status;

查询结果:

mysql-bin.000001 信息来源于mysql配置文件配置,my.int文件,下面会说到 

6、查询此从库状态:

show slave status;

查询为空,因为这个数据库是主库:

7、 查看当前所有binlog的日志存储

show binary logs;

结果:

 8、查看当前已经消费到了什么位置

show binlog events in 'mysql-bin.000001';

结果:

 9、以上信息均基于my.int配置如下:

 
[client]port=3306
default-character-set=utf8
[mysql]default-character-set=utf8[mysqld]# The TCP/IP Port the MySQL Server will listen on
port=3306#开启查询缓存
explicit_defaults_for_timestamp=true#Path to installation directory. All paths are usually resolved relative to this.
basedir=C:/Program Files/mysql-5.7.18-winx64/#Path to the database root
#datadir="C:/Program Files/mysql-5.7.18-winx64/data/"# The default character set that will be used when a new schema or table is
# created and no character set is defined
character-set-server=utf8# The default storage engine that will be used when create new tables when
default-storage-engine=INNODB# Set the SQL mode to strict
sql-mode="STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"#用于第一次登录 可以免密
skip-grant-tablesmax_connections=100server_id=1   # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
log-bin=mysql-bin  # 开启 binlog
binlog-format=ROW   # 选择 ROW 模式 MIXED
#binlog_format=MIXED   # 选择 ROW 模式 MIXED#query_cache_size=0# table_cache=256#innodb_log_file_size=24M

相关的配置主要是这几行:

server_id=1   # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
log-bin=mysql-bin  # 开启 binlog
binlog-format=ROW   # 选择 ROW 模式 MIXED
#binlog_format=MIXED   # 选择 ROW 模式 MIXED

  二、canal 服务端配置启动:从官网下载程序和源码到本地环境

1、下载入口:进入里面有各种版本

2、解压缩到固定目录:

直接解压即可

3、修改核心配置文件:

canal.deployer-1.1.6\canal.deployer-1.1.6\conf 目录下的 canal.properties 文件,核心的几个配置如下:

# tcp bind ip 服务端ip
canal.ip =127.0.0.1
#服务端默认端口号
canal.port =11111
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
#监控的模块之一,可以是多个 canal.destinations = promotion,example
canal.destinations = promotion

4、修改需要监控的包配置:新建promotion目录,添加修改配置文件和example目录是并列关系

canal.deployer-1.1.6\canal.deployer-1.1.6\conf\promotion

# position info
canal.instance.master.address=127.0.0.1:3306
#主库binlog日志
canal.instance.master.journal.name=mysql-bin.000001
#监控的位置
canal.instance.master.position=1612
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex=master-db.frend
# 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
canal.instance.filter.regex=.*\\..*
#canal.instance.filter.black.regex=mysql\\.slave_.*

5、启动服务,win环境 点击 bin目录下的 startup.bat

正常启动是这种控制台日志

6、server 日志查看目录在:

 日志文件里面有详细信息

三、canal客户端配置启动:

1、jar包引入:

     <!-- 引入canal --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.6</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions></dependency><!--我这边不手动引入会报找不到com.alibaba.otter.canal.protocol.Message--><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.6</version></dependency>

2、配置信息相关


canal:server:ip: 127.0.0.1port: 11111promotion:destination: promotion#subscribe: .*\..*batchSize: 1000

3、客户端初始化类

package com.nandao.datasource.dynamic.mybatis.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.net.InetSocketAddress;@Configuration
@EnableScheduling
@EnableAsync
public class CanalPromotionConfig {@Value("${canal.server.ip}")private String canalServerIp;@Value("${canal.server.port}")private int canalServerPort;@Value("${canal.server.username:blank}")private String userName;@Value("${canal.server.password:blank}")private String password;@Value("${canal.promotion.destination}")private String destination;@Bean("promotionConnector")public CanalConnector newSingleConnector(){//有默认账号,初始化的时候可以不加String userNameStr = "blank".equals(userName) ? "" : userName;String passwordStr = "blank".equals(password) ? "" : password;return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,canalServerPort), destination, userNameStr, passwordStr);}
}

4、监听的业务类

package com.nandao.datasource.dynamic.mybatis.service.impl;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.nandao.datasource.dynamic.mybatis.service.IProcessCanalData;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;@Service
@Slf4j
public class PromotionData implements IProcessCanalData {private final static String SMS_HOME_ADVERTISE = "sms_home_advertise";private final static String SMS_HOME_BRAND = "sms_home_brand";private final static String SMS_HOME_NEW_PRODUCT = "sms_home_new_product";private final static String SMS_HOME_RECOMMEND_PRODUCT = "sms_home_recommend_product";/*存储从表名到Redis缓存的键*/private Map<String,String> tableMapKey = new HashMap<>();@Autowired@Qualifier("promotionConnector")private CanalConnector connector;@Value("${canal.promotion.subscribe:server}")private String subscribe;@Value("${canal.promotion.batchSize}")private int batchSize;@PostConstruct@Overridepublic void connect() {connector.connect();if("server".equals(subscribe))connector.subscribe(null);//可以直接采用服务端配置的扫描表的范围elseconnector.subscribe(subscribe);//自定义表的范围,不依赖服务端connector.rollback();}@PreDestroy@Overridepublic void disConnect() {connector.disconnect();}@Async@Scheduled(initialDelayString="${canal.promotion.initialDelay:5000}",fixedDelayString = "${canal.promotion.fixedDelay:5000}")@Overridepublic void processData() {try {if(!connector.checkValid()){log.warn("与Canal服务器的连接失效!!!重连,下个周期再检查数据变更");this.connect();}else{Message message = connector.getWithoutAck(batchSize);//获取batchSize条数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {log.info("本次[{}]没有检测到促销数据更新。",batchId);}else{log.info("本次[{}]促销数据本次共有[{}]次更新需要处理",batchId,size);/*一个表在一次周期内可能会被修改多次,而对Redis缓存的处理只需要处理一次即可*/Set<String> factKeys = new HashSet<>();for(CanalEntry.Entry entry : message.getEntries()){if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());String tableName = entry.getHeader().getTableName();//获取表名if(log.isDebugEnabled()){CanalEntry.EventType eventType = rowChange.getEventType();log.debug("数据变更详情:来自binglog[{}.{}],数据源{}.{},变更类型{}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(),tableName,eventType);}factKeys.add(tableMapKey.get(tableName));}for(String key : factKeys){if(StringUtils.isNotEmpty(key))  {log.info("删除需要");}}connector.ack(batchId); // 提交确认log.info("本次[{}]处理促销Canal同步数据完成",batchId);}}} catch (Exception e) {log.error("处理促销Canal同步数据失效,请检查:",e);}}
}

处理字段信息:增删改

for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();for (CanalEntry.Column column : columns) {if(column.getName().equals("id")) {deleteDoc(column.getValue());break;}}} else if (eventType == CanalEntry.EventType.INSERT) {List<CanalEntry.Column> columns = rowData.getAfterColumnsList();ProductESVo productESVo = new ProductESVo();JSONObject jsonData=new JSONObject();//jsonData.put("", tableName);String docId = makeVo(columns,productESVo);String docIdNew = makeVoNew(columns,jsonData, tableName);insertDoc(docId,productESVo);insertDocNew(docId,jsonData);} else {List<CanalEntry.Column> columns = rowData.getAfterColumnsList();ProductESVo productESVo = new ProductESVo();String docId = makeVo(columns,productESVo);if(null != docId){if(null == productESVo){log.info("商品的删除状态字段update为已删除,从ES中移除");deleteDoc(docId);}else  updateDoc(docId,productESVo);}}}

 遍历字段:

 private String makeVoNew(List<CanalEntry.Column> columns,JSONObject jsonData,String tableName ){String docId = null;for (CanalEntry.Column column : columns) {String colName = column.getName();String colValue = column.getValue();jsonData.put(colName,colValue);if(colName.equals(T_ID)) {docId = colValue;}if(parentTableList.contains(tableName)){jsonData.put("relation_flag",tableName);}if(subTableList.contains(tableName)){jsonData.put("relation_flag",tableName);jsonData.put("parent",docId);}}return docId;}

5、启动客户端服务:5秒监听一次

 6、修改数据库数据:立刻看到监听到数据

 看到监听的数据消费,到此,canal 实战流程分享完毕,大家一定要动手操作演练,才能熟练掌握,下篇我们分享canal实战过程中遇到的问题,敬请期待!


http://www.ppmy.cn/news/6942.html

相关文章

Android 10.0 修改LocalOnlyHotspot默认的SSID和密码

目录 1.概述 2.修改LocalOnlyHotspot默认的SSID和密码的核心代码 3.修改LocalOnlyHotspot默认的SSID

微信小程序 | 小程序组件化开发

&#x1f5a5;️ 微信小程序 专栏&#xff1a;小程序组件化开发 &#x1f9d1;‍&#x1f4bc; 个人简介&#xff1a;一个不甘平庸的平凡人&#x1f36c; ✨ 个人主页&#xff1a;CoderHing的个人主页 &#x1f340; 格言: ☀️ 路漫漫其修远兮,吾将上下而求索☀️ &#x1f44…

20 个常用的 pandas 使用技巧

大家好&#xff0c;我是小寒。 今天来分享 20 个常用的 pandas 使用技巧。如果觉得不错&#xff0c;点赞、转发安排起来。 1、以 Markdown 格式输出 DataFrame import pandas as pddf pd.DataFrame({a: [1, 2, 3, 4],b: [5, 6, 7, 8]})# You can control the printing of th…

YOLOV5融合SE注意力机制和SwinTransformer模块开发实践的中国象棋检测识别分析系统

本文紧接前文&#xff1a; 《基于yolov5s实践国际象棋目标检测模型开发》 《yolov5s融合SPD-Conv用于提升小目标和低分辨率图像检测性能实践五子棋检测识别》 首先来看下最终效果&#xff1a; 在我棋类检测系统开发之——五子棋检测那篇博文写完之后就萌生了想做一下基于目标…

有效操作:Ubuntu上已经安装最新node但是node -v返回的版本号确实错的;ubuntu第一次启动vue项目报npm版本错误

** 如已经安装过最新版的node话可直接跳到操作6&#xff1a; 1.查看node版本&#xff0c;没安装的请先安装&#xff1b; node -v 如果安装成功的话会返回版本号&#xff1a; 2.如果nodejs包出错需要重新安装的话&#xff0c;删除不干净会有可能出现问题&#xff0c;下面就介…

国考省考行测:细节理解,对错判断,要素查找,问什么,找什么,对比分析

国考省考行测&#xff1a;细节理解&#xff0c;对错判断&#xff0c;要素查找&#xff0c;问什么&#xff0c;找什么&#xff0c;对比分析 2022找工作是学历、能力和运气的超强结合体! 公务员特招重点就是专业技能&#xff0c;附带行测和申论&#xff0c;而常规国考省考最重要…

c++算法基础必刷题目——模拟

文章目录模拟算法1、字符串的展开2、多项式输出3、机器翻译模拟算法 模拟算法是一种最基本的算法思想&#xff0c;是对程序员基本编程能力的一种考查&#xff0c;其解决方法就是根据题目给出的规则对题目要求的相关过程进行编程模拟。在解决模拟类问题时&#xff0c;需要注意字…

关于CSS选择器优先级的规则说明

简单规则&#xff1a; !important > 行内样式 > id选择器 > 类选择器 > 元素选择器 > 通配选择器 选择器举例说明&#xff1a; !important&#xff1a; <h1 id"title">好好学习&#xff0c;天天向上</h1> <style type"text/…