Linux安装canal

news/2024/10/23 5:40:16/

Linux安装canal

文章目录

  • Linux安装canal
    • msql修改
      • 1. mysql创建用户并授权
      • 2. 修改mysql的配置文件
      • 3. 重启mysql
      • 4. 检查是否打开binlog
      • 5. 查看binlog日志列表和当前正在写入的binlog
    • canal
      • 1. 下载canal:
      • 2. 在服务器上解压
      • 3. 修改配置文件
      • 4. 启动canal
      • 5. 查看日志是否正常
      • 6. 别忘了开放防火墙,默认端口号:11111
    • java
      • 1. pom.xml
      • 2. application.yml
      • 3. spring.factories
      • 4. CannalProperties.java
      • 5. 新建CanalUtil.java
    • 开始测试
      • 1. 启动项目
      • 2. 数据库中执行insert语句
      • 3. 控制台查看结果
      • 4. 结合到es上使用

msql修改

1. mysql创建用户并授权

-- 创建用户 用户名:canal 密码:xxxxxx
create user 'canal'@'%' identified by 'xxxxxx';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'xxxxxx';

2. 修改mysql的配置文件

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

3. 重启mysql

4. 检查是否打开binlog

show variables like 'log_bin';

5. 查看binlog日志列表和当前正在写入的binlog

# 查看binlog列表
show binary logs;
# 查看当前正在写入的binlog文件
show master status;

记着正在写入的binlog名称 mysql-bin.000001

canal

1. 下载canal:

地址:https://github.com/alibaba/canal/releases,选择deployer的版本

2. 在服务器上解压

tar -zxvf 压缩包名

3. 修改配置文件

相对路径:/canal/conf/example/instance.properties

//数据库连接地址
canal.instance.master.address=xx.xx.xx.xx:3306
//刚才记录的正在写入的binlog
canal.instance.master.journal.name=mysql-bin.000006
//刚才创建的数据库用户
canal.instance.dbUsername=canal
//刚才创建的数据库用户密码
canal.instance.dbPassword=XXXXXX

4. 启动canal

相对路径:/canal/bin 下

stop.sh 停止
restart.sh 重启
startup.sh 启动

到/bin下 执行 ./startup.sh 启动

5. 查看日志是否正常

相对路径:/canal/logs/example/example.log

如果直接输出下面的就没问题了,
按上面步骤正常无问题,如果有异常,请复制example.log中的异常自行百度查询

2023-05-06 18:19:46.374 [New I/O server worker #1-2] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^数据库名\..*$
2023-05-06 18:23:11.578 [New I/O server worker #1-4] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to 数据库名\..*
2023-05-06 18:23:11.579 [New I/O server worker #1-4] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^数据库名\..*$
2023-05-06 18:27:43.075 [New I/O server worker #1-1] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to 数据库名\..*
2023-05-06 18:27:43.075 [New I/O server worker #1-1] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^数据库名\..*$

6. 别忘了开放防火墙,默认端口号:11111

java

1. pom.xml

<!--canal-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.3</version>
</dependency>

2. application.yml

canal-monitor-mysql:host-name: ip地址port: 端口号database-name: 数据库名table-name: 表名,暂时未使用

3. spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.w.yizhi.common.oss.CanalUtil

4. CannalProperties.java

package com.w.yizhi.common.oss.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Data
@Configuration
@ConfigurationProperties(prefix = "canal-monitor-mysql")
public class CannalProperties {private String hostName;private Integer port;private String tableName;private String databaseName;}

5. 新建CanalUtil.java

package com.w.yizhi.common.oss;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.w.yizhi.common.oss.properties.CannalProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;@Component
@EnableConfigurationProperties({ CannalProperties.class })
@Slf4j
public class CanalUtil {private final static int BATCH_SIZE = 10000;/*** 打印canal server解析binlog获得的实体类信息*/private static void handleDATAChange(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}//RowChange对象,包含了一行数据变化的所有特征CanalEntry.RowChange rowChage;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();log.info("Canal监测到更新:【{}】", entry.getHeader().getTableName());for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {switch (eventType) {/*** 删除操作*/case DELETE:printColumn(rowData.getBeforeColumnsList());break;/*** 添加操作*/case INSERT:printColumn(rowData.getAfterColumnsList());break;/*** 更新操作*/case UPDATE:printColumn(rowData.getAfterColumnsList());break;default:break;}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}@Beanpublic Boolean startMonitorSQL(CannalProperties cannalProperties) {while (true) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(cannalProperties.getHostName(), cannalProperties.getPort()), "example", "", "");try {//打开连接connector.connect();log.info("数据库检测连接成功!");connector.checkValid();//订阅数据库表,全部表qconnector.subscribe(cannalProperties.getTableName() + "\\..*");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);} else {handleDATAChange(message.getEntries());}// 提交确认connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();log.error("成功断开监测连接!尝试重连");} finally {connector.disconnect();//防止频繁访问数据库链接: 线程睡眠 10秒try {Thread.sleep(10 * 1000);} catch (InterruptedException e) {e.printStackTrace();}}}}
}

开始测试

1. 启动项目

09:27:13.344 [restartedMain] INFO  c.w.y.c.o.CanalUtil - [canal,92] - 数据库检测连接成功!

2. 数据库中执行insert语句

INSERT INTO `ry-admin`.`bs_note_info` (`id`, `title`, `context`, `file_name`, `bucket_name`, `original`, `type`, `file_size`, `create_user_id`, `create_user_name`, `create_time`, `update_user_id`, `update_user_name`, `update_time`, `del_flag`) VALUES (NULL, 'title', 'context', NULL, NULL, NULL, NULL, NULL, 1, 'admin', '2023-04-26 21:09:09', NULL, NULL, NULL, '0');

3. 控制台查看结果

09:30:13.345 [restartedMain] INFO  c.w.y.c.o.CanalUtil - [handleDATAChange,47] - Canal监测到更新:【bs_note_info】
id : 14    update=true
title : title    update=true
context : context    update=true
file_name :     update=true
bucket_name :     update=true
original :     update=true
type :     update=true
file_size :     update=true
create_user_id : 1    update=true
create_user_name : admin    update=true
create_time : 2023-04-26 21:09:09    update=true
update_user_id :     update=true
update_user_name :     update=true
update_time :     update=true
del_flag : 0    update=true

4. 结合到es上使用


http://www.ppmy.cn/news/63113.html

相关文章

C++的类型转换

文章目录 1. C语言中的类型转换2. 为什么C需要四种类型转换3. C强制类型转换3.1 static_cast3.2 reinterpret_cast3.3 const_cast3.4 dynamic_cast 1. C语言中的类型转换 在C语言中&#xff0c;如果赋值运算符左右两侧类型不同&#xff0c;或者形参与实参类型不匹配&#xff0…

685页40万字某省市场监管智慧应用一体化项目(word可编辑)

1.2.3.1 数字XX公共能力建设现状 1.2.3.1.1 数字XX通用基础应用平台现状 通用基础应用平台提供具有共性特征的跨部门、跨层级业务应用&#xff0c;与本项目有关的平台包括某省网上办事大厅、某省政务服务 APP 统一平台&#xff08;X政通 APP&#xff09;、某省公共信用信息平…

Linux | 学习笔记(适合小白)

操作系统概述&#xff1a; 计算机是由硬件和软件这两个主要部分组成的操作系统是软件的一类&#xff0c;主要作用是协助用户调度硬件工作&#xff0c;充当用户和计算机硬件之间的桥梁常见的操作系统&#xff1a;PC端&#xff1a;Windows&#xff0c;Linux&#xff0c;MacOS&…

热门的免费 API 合辑整理

快递物流查询 全国快递物流查询&#xff1a;目前已支持600快递公司的快递信息查询。 跨境国际快递物流查询 &#xff1a; 支持900物流商&#xff0c;提供实时查询和单号订阅API接口。 物流时效性查询&#xff1a;预估从下单开始直到收到货物的时间&#xff0c;计算物流的时效…

go读request.Body内容踩坑记

go读request.Body内容踩坑记 踩坑代码如下&#xff0c;当时是想获取body传过来的json func demo(c *httpserver.Context) {type ReqData struct {Id int json:"id" validate:"required" schema:"id"Title string json:"…

xib替代main.storyboard

xib替代main.storyboard 其实xib和storyboard在编译时都会变成nib文件。 删除storyboard 删除main.storyboard和ViewController 创建新VC 因为上一步干脆删掉了自带的ViewController&#xff0c;所以这里创建一个新的VC。 创建 创建自定义VC&#xff0c;叫做“TestXibVi…

《Linux 内核设计与实现》12. 内存管理

文章目录 页区获得页获得填充为 0 的页释放页 kmalloc()gfp_mask 标志kfree()vmalloc() slab 层slab 层的设计slab 分配器的接口 在栈上的静态分配单页内核栈 高端内存的映射永久映射临时映射 每个 CPU 的分配新的每个 CPU 接口 页 struct page 结构表示系统中的物理页&#x…

Rosetta从头蛋白抗体设计、结构优化及在药物研发中的应用

Rosetta从头蛋白抗体设计、结构优化及在药物研发中的应用 第一天 内容 主要知识点 从蛋白质折叠到蛋白质设计教学目标&#xff1a;了解本方向内容、理论基础、研究意义。 蛋白质折叠与结构预测简介 主链二面角与二级结构侧链堆积与三级结构蛋白质设计简介 蛋白质设计的分…