mongodb使用debezium

devtools/2024/10/19 4:21:33/

前置

服务器上需要安装jdk11
jdk下载地址

kafka安装

官网下载地址

安装教程

debezium__9">debezium 安装

运行 Debezium 连接器需要 Java 11 或更高版本
Debezium 并不是一个独立的软件,而是很多个 Kafka 连接器的总称。这些 Kafka 连接器分别对应不同的数据库,比如 MySQL、Oracle 等。按 Kafka 连接器的常见命名规则,可能我们会把它们叫做 MySQL Kafka Source Connector 之类。

部署

debezium_15">1.下载对应版本的debezium插件

插件地址
在这里插入图片描述
在这里插入图片描述

2.文件解压

将下载的文件解压,将解压后的文件放到kafka的plugin文件夹下(该plugin文件夹为自己创建的plugin文件夹)*,例如
在这里插入图片描述

3. 通过 kafka connect部署

kafka connect有两种部署方式,一是单机部署,二是分布式部署。单机部署配置kafka/config/connect-standalone.properties 文件,分布式部署则配置kafka/config/connect-distributed.properties。分布式部署支持通过rest api管理connector

此处是单机部署,配置文件为kafka/config/connect-standalone.properties,主要修改以下内容:

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/local/kafka/plugin

4.启动kafka-connect

需要先启动kafka

bin/connect-standalone.sh config/connect-standalone.properties

debezium_39">5.创建对应的debezium配置文件

在这里插入图片描述

curl -X POST http://${debezium所在服务器}:8083/connectors

{"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams"}
}
  • 如果需要在cdc输出的语句上显示before信息,需要开启mongodb版本 6.0 中的新增功能changeStreamPreAndPostImages,并且在capture.mode上使用change_streams_with_pre_image或change_streams_update_full_with_pre_image
  • 如果capture.mode未设置成change_streams_with_pre_imagechange_streams_update_full_with_pre_image的话,在进行删除时cdc输出会没有before信息
    在这里插入图片描述
db.runCommand({collMod: "对应的controllerName", changeStreamPreAndPostImages: {enabled: true} 
})
例如:
use db_cdc_1
db.runCommand({collMod: "c_cdc_2", changeStreamPreAndPostImages: {enabled: true} 
}){"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams_with_pre_image"}
}

在这里插入图片描述

重点参数

参数描述
connector.class固定值io.debezium.connector.mongodb.MongoDbConnector
mongodb.connection.stringmongodb连接信息
collection.include.list需要监听的具体collection
topic.prefixkafkaTopic前缀
capture.mode输出模式(默认:change_streams_update_full)

capture.mode

模式描述
change_streams输出变化流,但是在进行update操作时,不输出after字段
change_streams_update_full在change_streams的基础上,增加after字段,用于输出现在变化后的数据的内容
change_streams_with_pre_image在change_streams的基础上,增加before字段的输出,但需要进行配置
change_streams_update_full_with_pre_image在change_streams_with_pre_image的基础上增加,增加after字段,用于输出现在变化后的数据的内容

其他未使用参数

参数描述
database.include.list需要监听的具体database
database.exclude.list不监听的database(不要与database.include.list填写相同的db)
collection.exclude.list不监听的collection(不要与collection.include.list填写相同的collection)
snapshot.mode指定在连接器启动时执行快照的条件。Initial(默认:重头开始)当连接器启动时,如果没有在其偏移主题中检测值,它会执行数据库的快照。never(从当前位置开始)当连接器启动时,它会跳过快照进程,并立即开始将数据库记录的操作流传输到 oplog。

更多参数请参考
在这里插入图片描述

cdc结果

原数据

{"userId": "1000000","allPoints": 190,"createTime": {"$date": "2024-04-25T13:31:59.678Z"},"updateTime": {"$date": "2024-04-25T13:31:59.678Z"}
}

添加数据

在这里插入图片描述
在这里插入图片描述
capture.mode两种模式输出结果一样

push数据

{$push: {"history":{"historyId": "1","changerPoints": 0,"beforePoints": 0,"afterPoints": 0,"status": "0","createTime": {"$date": "2024-01-01T16:00:00.000Z"},"comment": "测试数据","versionNo": 0}},
}

在这里插入图片描述

第一次添加

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history\": [{\"historyId\": \"1\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}]}","truncatedArrays": null}}

在这里插入图片描述

第二次添加

在这里插入图片描述

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history.1\": {\"historyId\": \"2\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}}","truncatedArrays": null}}

在这里插入图片描述

如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据的完整数据,例如
在这里插入图片描述

修改数组中的值

在这里插入图片描述

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history.1.historyId\": \"100\"}","truncatedArrays": null}}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据,例如
在这里插入图片描述

pull操作

在这里插入图片描述

{$pull: {history: {historyId: "100",},},
}

在这里插入图片描述
此时会把现有的所有数据都返回

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history\": [{\"historyId\": \"1\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"2\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"3\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"4\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}]}","truncatedArrays": null}}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据,例如在这里插入图片描述

删除字段

在这里插入图片描述

{  "payload": {"before": null,"after": null,"updateDescription": {"removedFields": ["updateTime"],"updatedFields": "{}","truncatedArrays": null}}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据(此处删除的是另外一个字段),例如
在这里插入图片描述

删除数据

在这里插入图片描述
在这里插入图片描述
如果capture.mode未设置成change_streams_with_pre_imagechange_streams_update_full_with_pre_image的话,在进行删除时cdc输出会没有before信息
在这里插入图片描述
通过开启mongodb版本 6.0 中的新增功能changeStreamPreAndPostImages,并且在capture.mode上使用change_streams_with_pre_image或change_streams_update_full_with_pre_image即可解决

db.runCommand({collMod: "对应的controllerName", changeStreamPreAndPostImages: {enabled: true} 
})
例如:
use db_cdc_1
db.runCommand({collMod: "c_cdc_2", changeStreamPreAndPostImages: {enabled: true} 
}){"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams_with_pre_image"}
}

在这里插入图片描述


http://www.ppmy.cn/devtools/27629.html

相关文章

TiDB系列之:部署TiDB集群常见报错解决方法

TiDB系列之:部署TiDB集群常见报错解决方法 一、部署TiDB集群二、unsupported filesystem ext3三、soft limit of nofile四、THP is enabled五、numactl not usable六、net.ipv4.tcp_syncookies 1七、service irqbalance not found,八、登陆TiDB数据库 一、部署TiDB…

Json 反序列化错误

Json 反序列化错误 JsonReaderException: Unexpected character encountered while parsing value: . Path ‘’, l 可能是因为字符串是BOM UTF-8格式,需要把BOM去掉 public byte[] RemoveBOMData(byte[] bytesWithBom) {// UTF-8 BOM的字节表示&#…

牛客NC353 回文子串的数量【中等 字符串,枚举,回文 C++/Java/Go/PHP 高频】

题目 题目链接: https://www.nowcoder.com/practice/3e8b48c812864b0eabba0b8b25867738 思路 参考答案C class Solution {public:/*** 代码中的类名、方法名、参数名已经指定,请勿修改,直接返回方法规定的值即可*** param str string字符串…

ElasticSearch自动补全

一、拼音分词器: 当用户在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,如图: 这种根据用户输入的字母,提示完整词条的功能,就是自动补全了。 GET /_analyze {"text":"我爱螺蛳粉…

QT - 创建Qt Widgets Application项目

在Qt中结合OpenGL使用,可以创建一个Qt Widgets应用程序项目。在创建项目时,您可以选择使用OpenGL模板来生成一个已经集成了OpenGL的项目。这个模板会自动帮助您集成OpenGL和Qt,并生成一个基本的OpenGL窗口。您可以在这个窗口中进行OpenGL的开…

第九章动态规划——不同的搜索二叉树

目录 力扣题号:96. 不同的二叉搜索树 - 力扣(LeetCode) 题目描述 示例 1: 示例 2: 提示: 思路 什么是二叉搜索树 发现规律 当n为1和n为2时 当输入的n为3时 如果是以 1 为头节点 如果是以2为头节…

【Github】sync fork后,意外关闭之前提交分支的pr申请 + 找回被关闭的pr请求分支中的文件

【Github】sync fork后,意外关闭之前提交分支的pr申请 找回被关闭的pr请求分支中的文件 写在最前面原因解析提交pr,pr是什么?rebase 或者 merge 命令 找到分支中被删除的文件找到被关闭的提交请求pr方法1:在公共仓库被关闭的pr中…

jvm面试题30问

什么是JVM的跨平台? 什么是JVM的语言无关性? 什么是JVM的解释执行 什么是JIT? JIT:在Java编程语言和环境中,即时编译器(JIT compiler,just-in-time compiler)是一个把Java的字节码(…