基于Canal+kafka监听数据库变化的最佳实践

news/2024/10/31 4:22:25/

1、前言

        工作中,我们很多时候需要根据某些状态的变化更新另一个业务的逻辑,比如订单的生成,成交等,需要更新或者通知其他的业务。我们通常的操作通过业务埋点、接口的调用或者中间件完成。

        但是状态变化的入口比较多的时候,就很容易漏掉某些地方。代码维护起来也比较麻烦。今天介绍阿里出品的 【canal】中间件完成数据库字段的监听。

2、canal的简单介绍

        canal详见介绍件官网:https://github.com/alibaba/canal

 

2.1 家族成员:

【canal.adapter】:客户端落地的适配以及功能

      

 【canal.admin】:提供WebUI的管理界面

 【canal.deployer】:canal服务

 【canal.example】:客户端提供的demo

2.2 工作原理

 3、 实践目标

        使用canal监控mysql数据的变化,将变化的数据推送到kafka,并使用canal-admin动态管理需要监控的数据库表。

 4、工具准备

其中kafka是依赖zookeeper的,所以也需要zookeeper。

5、配置并启动kafka

Kafka QuickStart

5.1 修改配置

vim config/server.properties

换成自己的IP

替换成自己zookeeper的地址

 5.2 启动server

  • 启动zookeeper脚本

# bin/zkServer.sh start

  • 启动kafka脚本

# bin/kafka-server-start.sh -daemon config/server.properties &

  •  查看是否启动成功脚本

# jps -ml

 

此时kafka启动成功。

5.3 注意事项

值得注意的是官方文档中查看topic的命令,

# bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181

在心的kafka版本中已经改变,可移步kafka官方文档: Apache Kafka

新版本中使用bootstrap-server,如下

# bin/kafka-topics.sh --list --bootstrap-server localhost:9092 

6、启动canal-admin

6.1 修改配置

改成对应的ip

 6.2 执行 conf/canal.manage.sql

         该脚本是canal-admin的管理脚本。

 6.3 启动canal-admin

sh bin/startup.sh

 6.4 查看启动状态

 6.5 访问页面

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,

默认密码:admin/123456

7、启动canal-server

7.1 修改配置脚本

# vim conf/canal_local.properties

换成canal-admin的IP

7.2 启动服务 指定local

# sh bin/startup.sh local

7.3 查询启动状态

8、管理平台配置

8.1 查看canal服务的状态

 8.2 配置实例

 修改监听的数据库信息:

canal.instance.master.address=192.168.88.111:3306

canal.instance.dbUsername=***
canal.instance.dbPassword=***

#默认监听全库

canal.instance.filter.regex=test.test_user

#配置不可访问的库表

canal.instance.filter.black.regex=

#配置mq的主题/路由

canal.mq.topic=example

 保存即可。

8.3 启动实例

9、编写客户端监听kafka的客户端

@Test
public void test01(){// 修改打印日志的级别,不然会不停的打印debug日志,影响阅读LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();Logger root = loggerContext.getLogger("root");root.setLevel(Level.INFO);//设置消费者属性Properties properties = new Properties();properties.put("bootstrap.servers","192.168.88.111:9092");//反序列化器,与生产者的序列化器相对应properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);//设置消费者的消费者群组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"example");// properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastestKafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);try {//消费者订阅主题(可以多个,支持正则表达式,进行模糊匹配)consumer.subscribe(Collections.singletonList("example"));System.out.println("-------------------------消费端准备就绪,等待消息接受------------------------------------");//kafka消费者是通过拉取的方式获得服务端消息while(true){//循环调用poll方法,获取数据。ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String, String> record:records){String topic = record.topic();String value = record.value();if (StringUtils.isNotEmpty(value)) {System.out.println(String.format("topic:%s;" + "value:%s", topic,value));}}}} finally {consumer.close();}
}

 10、验证

修改数据库字段,可以接收到修改的信息


http://www.ppmy.cn/news/5657.html

相关文章

FT2232作为JTAG烧录器的使用步骤详解

FT2232作为JTAG烧录器的使用步骤详解FT2232作为JTAG烧录器的使用步骤详解配置OpenOCD环境(已经配置好的可以跳过)【步骤 1】安装 FT2232HL 芯片的驱动&#xff0c;安装文件为 CDM21228_Setup.exe。【步骤 2】 安装 FT_Prog_v3.6.88.402 Installer.exe【步骤 3】 使用 FT Prog 软…

你易忽略的三极管电路问题1:下拉电阻

如下这个三极管共射极驱动电路中&#xff0c;B、E极之间的下拉电阻的作用&#xff1f;是否可以将其去除&#xff1f;该电阻有两个重要的作用&#xff1a;在驱动信号关闭时给三极管基极一个固定的电平。当驱动信号&#xff08;SIGNAL&#xff09;关闭时&#xff0c;若没有下拉电…

【Java基础知识复盘】String、StringBuffer、StringBuilder篇——持续更新中

本人知识复盘系列的博客并非全部原创&#xff0c;大部分摘自网络&#xff0c;只是为了记录在自己的博客方便查阅&#xff0c;往后也会陆续在本篇博客更新本人查阅到的新的知识点&#xff0c;望悉知&#xff01; String类 在 Java 中字符串属于对象&#xff0c;Java 提供了 Str…

矢量网络分析仪如何测量史密斯图及滤波器的带宽?

矢量网络分析仪是一种很神奇的测量仪器&#xff0c;它的功能很强大也值得人们去探索。今天&#xff0c;安泰测试工程师就针对矢量网络分析仪中的史密斯图及滤波器的带宽测量进行简单的介绍&#xff0c;希望能够让更多的人对此有所了解&#xff0c;并产生兴趣。 首先仪器了解一下…

试卷安全分发系统

摘要 高校教务管理过程中&#xff0c;试卷以明文形式传输和集中存储&#xff0c;存在数据泄漏安全隐患。现提出了一个基于数字证书的试卷防泄漏方案&#xff0c;采用算法加密试卷&#xff0c;试卷在传输过程中中以密文的形式传输&#xff0c;每次传输的时候都会对试卷进行签名…

Mac怎么清理缓存?这两种方法都非常好用哦

与电脑系统或应用程序非常相似&#xff0c;您的Mac也有自己的系统缓存&#xff0c;它可以在后台临时存储数据&#xff0c;以加快软件安装速度并减少互联网数据使用量&#xff08;通过Apple&#xff09;。与电脑系统或应用程序类似&#xff0c;缓存数据可能会开始堆积——占用存…

艾美捷硝酸盐/亚硝酸盐荧光法检测试剂盒基本参数说明

一氧化氮&#xff08;NO&#xff09;由神经元、内皮细胞、血小板和中性粒细胞响应于稳态刺激产生微量。这种NO被迅速清除&#xff08;t4 s&#xff09;&#xff0c;并以旁分泌的方式传递细胞信号。NO与鸟苷酸环化酶的HEME辅基相互作用&#xff0c;激活酶并导致cGMP水平升高。其…

hadoop常用指令

-ls查看HDFS目录文件 如hadoop fs -ls / -mkdir&#xff1a;在HDFS上创建目录&#xff0c;可以创建多级目录 如hadoop fs -mkdir /tmp/user001 -copyFromLocal&#xff1a;复制本地文件系统到HDFS 如hadoop fs -copyFromLocal /home/01.txt /tmp/user001/ -copyToLocal&#xf…