相关的配置文件在 GitHub 存储库中可用在 Azure 上设置 PostgreSQL 和 Kafka
本节将提供有关如何为 PostgreSQL 配置 Azure 事件中心和 Azure DB 的指针。所有你需要的是一个微软 Azure帐户 –继续前进, 并注册一个免费的!
邮政的 Azure DB
PostgreSQL 的 Azure DB 是基于开源 PostgreSQL 数据库引擎的社区版本的托管关系数据库服务,提供两种部署模式。
在编写本文时,它支持 PostgreSQL 版本11.6
您可以使用各种选项在 Azure 上设置 PostgreSQL,包括 Azure门户、Azure CLI、Azure PowerShell 、ARM 模板。 Azure CLI完成操作后,您可以使用您最喜爱的编程语言(如Java、.NET、Node.js、Python、Go 等Node.js)轻松连接到数据库Python.NETGomicrosoft.com/azure/postgresql/overview?WT.mc_id=dzone-blog-abhishgu#azure-database-for-postgresql—hyperscale-citus”rel=”无跟随”=超量量(Citus)是另一种部署模式,可用于”接近或已超过 100 GB 数据的工作负载”。
请确保您将以下 PostgreSQL 相关信息放在方便的位置,因为您将需要它们来配置后续部分中的 Debezium 连接器 – 数据库主机名(和端口)、用户名、密码
Azure 事件中心
Azure 事件中心是一个完全托管的数据流平台和事件引入服务。它还提供了一个支持阿帕奇 Kafka 协议 1.0 及更晚的 Kafka 终结点,并可与 Kafka 生态系统中的现有 Kafka 客户端应用程序和其他工具合作 Kafka Connect ,包括(在此博客中演示)。
您可以使用 Azure 门户、Azure CLI、PowerShell或 ARM 模板创建 Azure 事件中心命名空间和其他资源。 PowerShell为了确保启用 Kafka 功能,您只需要选择 或 Standard Dedicated 层(因为基本层不支持事件中心上的 Kafka)。
设置后,请确保您保持连接字符串方便,因为您将需要它来配置 Kafka 连接。可以使用 Azure门户或 Azure CLI进行此功能
安装卡夫卡
要运行 Kafka 连接, 我将使用本地 Kafka 安装只是为了方便apache.org/downloads”rel=”不跟随”-只需下载阿帕奇卡夫卡,解压缩其内容,你很好去!
下载 Debezium 连接器并启动 Kafka 连接
首先,克隆此 Git 存储库:
Java
Ⅹ
git克隆https://github.com/abhirockzz/debezium-azure-postgres-cdc
cd debezium–azure–后灰色–cdc
下载 Debezium PostgreSQL 源连接器 JARs
1.2.0是撰写本文时的最新版本
Java
x
DEBEZIUM_CONNECTOR_VERSION=1.2。0
卷曲https://repo1.maven.org/maven2/io/debezium/debezium-连接器-postgres/$DEBEZIUM_CONNECTOR_VERSION]。最终/二十年代连接器-postgres-$DEBEZIUM_CONNECTOR_VERSION] 。Final - plugin. tar. gz - 输出 debezium - 连接器 - postgres. tar. gz
焦油 -xvzfdebezium-连接器-postgres.焦油.gz
现在,您应该会看到名为 的新文件夹 debezium-connector-postgres 。将连接器 JAR 文件复制到 Kafka 安装:
Java
x
导出KAFKA_HOME=到kafka安装e的路径。g/用户/foo/工作/kafka_2。12-2.3.0|
确认
ls - lrt $KAFKA_首页/图书馆 |格雷普 · 德贝齐姆
ls - lrt $KAFKA_首页/图书馆 |格雷普 · 普罗托布夫
ls - lrt $KAFKA_首页/图书馆 |格雷普 · 后格雷斯克尔
在启动 Kafka Connect 群集之前, connect.properties 编辑文件以包含以下属性的适当值: bootstrap.servers sasl.jaas.config , producer.sasl.jaas.config consumer.sasl.jaas.config 、、(只需替换占位符)
启动 Kafka 连接群集(我在模式下运行 distributed 它):
Java
x
导出KAFKA_HOME=到kafka安装e的路径。g/用户/foo/工作/kafka_2。12-2.3.0|
sh 连接。属性
等待 Kafka 连接实例启动 - 您应该在 Azure 事件中心看到 Kafka 连接内部主题,例如。
配置 PostgreSQL
在安装连接器之前,我们需要:确保可从 Kafka 连接群集访问 PostgreSQL 实例
确保 PostrgeSQL 复制设置设置为"逻辑"
创建可用于尝试更改数据捕获功能的表
如果要使用 Azure DB 进行 PostgreSQL,请使用az postgres 服务器防火墙规则创建防火墙规则,创建命令以将 Kafka Connect 主机列入白名单。在我的情况下,它是一个本地的 Kafka Connect 群集,因此我只需导航到 Azure门户(我的PostrgreSQL 实例的连接安全部分),并选择添加当前客户端IP地址,以确保我的本地 IP 已添加到防火墙规则中,例如:
若要更改 PostgreSQL 的 Azure DB 复制模式,可以使用az postgres 服务器配置命令:
Java
x
azpostgresserverofnamegroupof服务器configuration配置集 -资源-组 -name服务器名称--名称azure。-namereplication_support--值逻辑
更新配置后,您需要重新启动可以使用CLI(az postgres服务器重新启动)或门户执行的服务器。
数据库启动并运行后,创建表 - 我 psql 在此示例中使用了 CLI,但请随时使用任何其他工具。例如,要通过 SSL 连接到 Azure 上的 PostgreSQL 数据库(系统会提示您输入密码):
Java
x
psql-h.后格雷斯。数据库。蔚蓝.com-p5432-U-W-d-设置=sslmode=要求
例子
psql-h我的-pgsql.后格雷斯
蔚蓝.com -p 5432 -U foo foo@my-pgsql -W -d后格雷斯 -设置[sslmode]=需要
创建表
创建表待办事项(id串行描述VARCHAR(30todo_statusVARCHAR(10主键KEY(id));
安装 Debezium PostgreSQL 源连接器
更新pg-source-connector下面是一个示例:
Java
x
{
"配置": |
"连接器. 类""io. debezium. 连接器. postgresql. PostgresConnector",
"数据库.主机名".postgres.数据库.azure.com",
"数据库.port""5432",
"数据库.用户"""
"数据库.密码"""
"数据库.dbname"""
• "database.server.name":" "
"plugin.name""瓦尔2json",
"表.白名单"""
}
}
让我们浏览一下配置:
connector.class:连接器类的名称(这是一个静态值)
database.hostname和 database.port : PostgreSQL 实例以及端口的 IP 地址或主机名(例如 5432 )
database.user和 database.password : PostgreSQL 实例的用户名和密码
database.dbname:数据库名称,例如postgres
database.server.name: 逻辑名称,用于标识和提供要监视的特定 PostgreSQL 数据库服务器/群集的命名空间。
table.whitelist: 逗号分隔的正则表达式列表,指定要监视哪些表以进行更改数据捕获
plugin
G。wal2json
在编写本文时,Debezium 支持以下插件 decoderbufs wal2json wal2json_rds :、、 wal2json_streaming 和 wal2json_rds_streaming pgoutput 。我 wal2json 已在此示例中使用,Azure上也支持它!
最后,安装连接器!
Java
x
卷曲-XPOST-H"内容类型:应用程序/json"-数据@pg@pg-源-连接器
989594px;">
Kafka Connect 现在将开始监视表 todos 以创建、更新和删除事件
更改数据捕获操作
插入记录:
Java
x
后格雷斯。数据库。蔚蓝.com -p 5432 -U -W -d -设置=sslmode=要求
插入到待办事项(描述todo_statustodo_status值("安装后灰色"'完成');
插入到待办事项(描述todo_statustodo_status值('安装卡夫卡''完成');
插入到待办事项(描述todo_statustodo_status值("设置源连接器"'待定');
kafkacat 使用,但您也可以使用此处列出的任何选项创建消费者应用
更新 metadata.broker.list sasl.password 和 属性, kafkacat.conf 以包括卡夫卡代理的详细信息。在不同的终端中,使用它读取 CDC 有效负载:
Java
x
康夫
出口经纪人\e.g表示事件中心-我的-事件中心-命名空间。服务总线.窗口。净:9093
导出主题\。
卡夫卡卡特-b$BROKER-t$TOPIC-o开始
Java
x
{
["架构": [...],
"有效负载": |
"之前"为空,
"后": |
"id"1,
"描述""安装 postgresql",
"todo_status""完成"
},
["源":]
"版本""1.2.0. final",
"连接器""后灰色",
"名称""填充",
"ts_ms"1593018069944,
"快照""最后",
"db""后灰色",
"表""待办事项",
"txId"602,
"lsn"184579736,
"xmin"空
},
"op""c",
"ts_ms"1593018069947,
}
事件及其及其( payload 为了简洁而 schema 省略)由 及其组成。在部分中,请注意创建操作 ( ) 的表示方式 - 表示这是一个新 payload "op": "c" "before": null INSERT ed 行, after 为行中的每列提供 source 值,提供从其中拾取此事件的 PostgreSQL 实例元数据等。
您也可以尝试与更新或删除操作相同,并反省 CDC 事件,例如