CDH6.3.2引入debezium-connector-mysql-1.9.7监听mysql事件

news/2024/11/25 17:28:14/

1、首先说明一下为啥选用debezium,它能够根据事务的提交顺序向外推送数据,这一点非常重要。再有一个结合kafka集群能够保证高可用,对于熟悉java语言的朋友后面一篇博文会介绍怎样编写插件将事件自定义路由到你想要的主题甚至分区中。
提高按顺序消费事件的并发能力。

如果觉得好,请关注一下,后续将推出编写插件支持按照表名hash取模将事件分配到不同的主题或者分区当中支持多线程顺序并发消费,实现表与表之间的数据一致性

自定义插件链接地址:http://t.csdn.cn/xAPoH

下载链接地址
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz

2、首先将文件包下载到/opt文件夹中

cd /opt
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz

3、在将文件分发到其他几台部署了kafka服务器上

scp debezium-connector-mysql-1.9.7.Final-plugin.tar.gz hadoop102:/opt
scp debezium-connector-mysql-1.9.7.Final-plugin.tar.gz hadoop103:/opt

4、接下来就是安装三台服务器步骤一样,hadoop101,hadoop102,hadoop103

tar -zxvf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz
cd /opt/debezium-connector-mysql
mv *.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka/libs

5、编写配置文件

cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/etc/kafka/conf.dist/
cp connect-distributed.properties /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka/config/
mv connect-distributed.properties mysql-connect-distributed.properties
cp connect-log4j.properties /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka/config

6、修改配置文件

vi mysql-connect-distributed.properties
bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092
#如果想发送普通的json格式而不是avro格式的话,很简单key.converter.schemas.enable和value.converter.schemas.enable设置为false就行。这样就能发送普通的json格式数据。
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
plugin.path=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka/libs/
group.id=cdc-connect-mysql
offset.storage.topic=cdc-connect-mysql-offsets
offset.storage.replication.factor=3
offset.storage.partitions=50
config.storage.topic=cdc-connect-mysql-configs
config.storage.replication.factor=3
config.storage.partitions=1
status.storage.topic=cdc-connect-mysql-status
status.storage.replication.factor=3
status.storage.partitions=10

7、重启kafka装载插件
在这里插入图片描述
8、三台服务器都启动脚本运行

cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka/
./bin/connect-distributed.sh config/cdc-mysql-connect-distributed1.properties

如下图启动成功
在这里插入图片描述
9、测试是否成功等请求命令,我是在postman请求,方便管理

1、注册信连接器
请方式:POST 
请求地址:http://hadoop101:8083/connectors
请求类型:application/json
请求数据:
{"name": "debezium-test-5017","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.hostname": "hadoop101","database.port": "3306","database.user": "root","database.password": "XXXXX","database.include.list": "spark_event_data","database.dbname": "spark_event_data","database.serverTimezone": "UTC","database.server.id": "316545017","database.server.name": "debezium_mysql_wp","database.history.kafka.bootstrap.servers": "hadoop101:9092,hadoop102:9092,hadoop103:9092","database.history.kafka.topic": "db-event-history"}}
2、查看连接状态
请方式:GET 
请求地址:http://hadoop101:8083/connectors/debezium-test-5017/status
返回数据:
{"name": "debezium-test-5017","connector": {"state": "RUNNING","worker_id": "xxxxx:8083"},"tasks": [{"id": 0,"state": "RUNNING","worker_id": "xxxx:8083"}],"type": "source"
}
3、停止连接器
请方式:DELETE 
请求地址:http://hadoop101:8083/connectors/debezium-test-5017

http://www.ppmy.cn/news/43178.html

相关文章

C++类和对象终章——友元函数 | 友元类 | 内部类 | 匿名对象 | 关于拷贝对象时一些编译器优化

文章目录💐专栏导读💐文章导读🌷友元🌺概念🌺友元函数🍁友元函数的重要性质🌺友元类🍁友元类的重要性质🌷内部类(不常用)🌺内部类的性…

2023.04.16 学习周报

文章目录摘要文献阅读1.题目2.摘要3.简介4.Dual-Stage Attention-Based RNN4.1 问题定义4.2 模型4.2.1 Encoder with input attention4.2.2 Decoder with temporal attention4.2.3 Training procedure5.实验5.1 数据集5.2 参数设置和评价指标5.3 实验结果6.结论MDS降维算法梯度…

实力爆表,日日新成为AI领航者

目录正式发布自建算力SenseChat编程能力图像生成后言上周五,阿里发布大模型通义千问,正式开始邀请内测。本周一,人工智能巨头商汤科技正式发布“日日新”大模型体系,全面丰富的产品体系,多个功能表现超预期&#xff0c…

13.Java面向对象----嵌套类

Java面向对象—嵌套类、内部类、匿名类 一、static静态 在《Java编程思想》有这样一段话:   “static方法就是没有this的方法。在static方法内部不能调用非静态方法,反过来是可以的。而且可以在没有创建任何对象的前提下,仅仅通过类本身来…

SadTalker项目上手教程

背景 最近发现一个很有趣的GitHub项目SadTalker,它能够将一张图片跟一段音频合成一段视频,看起来毫无违和感,如果不仔细看,甚至很难辨别真假,预计未来某一天,一大波网红即将失业。 虽然这个项目目前的主要…

【系统集成项目管理工程师】信息系统集成专业知识

信息系统集成专业技术 一 信息系统建设 1 信息系统生命周期 立项阶段:即概念阶段或需求阶段,这一阶段根据用户业务发展和经营管理的需要,提出建设信息系统的初步构想,然后对企业信息系统的需求进行深入调研和分析,形…

servlet处理数据输入到数据库与输出数据+list输出-----web应用与开发

实验名称 JSP访问数据库的应用 实验内容 使用MVC模式。视图部分由两个JSP页面构成,其中一个页面(insert.jsp)负责提供输入学生信息(学号、姓名、性别、年龄、所在系)的视图,即用户可以在该页面输入要添加的记录,然后将要添加的记录提交给servlet。Servlet负责插入记录…

ZNS 架构实现 : 解决传统SSD问题的高性能存储栈设计

声明 主页:元存储的博客_CSDN博客 依公开知识及经验整理,如有误请留言。 个人辛苦整理,付费内容,禁止转载。 内容摘要 2.2 ZNS 的架构实现 先看看 支持zone 存储的 SMR HDD 以及 支持 zonefs 的 nvme ssd 的整个存储栈形态 其中对…