SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

embedded/2024/9/23 9:37:22/

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

    • 一、前言
    • 二、技术介绍(Flink CDC)
      • 1、Flink CDC
      • 2、Postgres CDC
    • 三、准备工作
    • 四、代码示例
    • 五、总结

一、前言

在工作中经常会遇到要实时获取数据库(postgresql、mysql等)的变更数据,主要体现数据的实时性;mysql数据库有canal工具实现很简单,但是基于postgresql数据库获取实时数据就比较复杂,之前已经写过一篇获取postgresql数据库实时数据的文章,如下:

【技术实现】java实时同步postgresql变更数据,基于WAL日志

但是,之前的实现方式比较繁琐,不利于维护,所有本文整合Flink CDC通过一个比较简单的方式实现;

二、技术介绍(Flink CDC)

1、Flink CDC

Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源数据变更捕获(CDC)框架。其核心功能是从各种关系型数据库(如MySQL、PostgreSQL、Oracle等)中捕获数据变更(如增删改操作),并将这些变更以流的形式提供给Flink等流处理引擎进行处理;
1)CDC(Change Data Capture):数据变更捕获的简称,用于监测并捕获数据库的变动,然后将这些变更按照发生顺序捕获,并写入到目标存储系统(如数据仓库、数据湖、消息队列等)。
2)Flink CDC:基于Flink的CDC实现,将CDC技术与Flink流处理引擎相结合,实现数据的实时捕获、处理和传输。

2、Postgres CDC

1)Postgres CDC(Change Data Capture)连接器是用于从PostgreSQL数据库捕获数据变更(如增删改操作)并将其以流的形式提供给数据处理引擎(如Flink)的组件;
2)PostgreSQL版本:Postgres CDC连接器通常支持PostgreSQL的多个版本,具体版本可能因连接器版本不同而有所差异。常见的支持版本包括9.6、10、11、12、13、14等;

三、准备工作

1、安装postgresql数据库,并创建库和测试使用的表,这里不再列举详细步骤;
在这里插入图片描述
2、修改postgresql数据库配置,通过wal日志监听变更数据

修改postgresql.conf文件,重启服务
wal_level=logical

3、springboot关键maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>3.0.1</version>
</dependency>

注:其它依赖不在列举,可以通过获取源码查看

四、代码示例

InitAction02.java

package com.sk.proxytest.init;import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;@Configuration
public class InitAction02 {@PostConstructpublic void run() throws Exception {DebeziumDeserializationSchema<String> deserializer =new JsonDebeziumDeserializationSchema();JdbcIncrementalSource<String> postgresIncrementalSource =PostgresSourceBuilder.PostgresIncrementalSource.<String>builder().hostname("127.0.0.1").port(5432).database("postgres").schemaList("public").tableList("public.student").username("postgres").password("password").slotName("flink").decodingPluginName("pgoutput") // use pgoutput for PostgreSQL 10+.deserializer(deserializer).includeSchemaChanges(true) // output the schema changes as well.splitSize(2) // the split size of each snapshot split.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(postgresIncrementalSource,WatermarkStrategy.noWatermarks(),"PostgresParallelSource").setParallelism(2).addSink(new CustomSink());//.print();env.execute("Output Postgres Snapshot");}}

CustomSink.java

package com.sk.proxytest.init;import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;@Log4j2
public class CustomSink extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {log.info("============数据发生变化:{}", value);}
}

执行结果:

1)新增数据
在这里插入图片描述

2)变更数据输出

2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.util.Threads$3: Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.connector.postgresql.PostgresStreamingChangeEventSource: Processing messages
2024-07-31T00:00:15,762 INFO  [debezium-reader-0] io.debezium.connector.postgresql.connection.WalPositionLocator: Message with LSN 'LSN{0/3588018}' arrived, switching off the filtering
2024-07-31T00:00:16,678 INFO  [Sink: Unnamed (1/4)#0] com.sk.proxytest.init.CustomSink: ============数据发生变化:{"before":null,"after":{"id":8,"name":"8","age":8,"remark":"8"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres_cdc_source","ts_ms":1722355215252,"snapshot":"false","db":"postgres","sequence":"[null,\"56131608\"]","schema":"public","table":"student","txId":932,"lsn":56131608,"xmin":null},"op":"c","ts_ms":1722355216336,"transaction":null}

五、总结

Postgres CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取二进制日志,即使发生故障,也会进行一次处理;

Postgres CDC 连接器

👇🏻 👇🏻 👇🏻注:文章源代码关注下面公众号获取👇🏻 👇🏻 👇🏻


http://www.ppmy.cn/embedded/88356.html

相关文章

SaaS 服务:满足个性化需求

软件即服务&#xff08;SaaS&#xff09;模式在当今企业IT架构中扮演着越来越重要的角色。它不仅为企业提供了一种灵活、成本效益高的软件使用方式&#xff0c;还通过持续的服务更新和优化&#xff0c;帮助企业保持技术前沿。观测云&#xff0c;作为中国首款 SaaS 可观测服务平…

Mac电脑数据恢复软件 Disk Drill 企业版安装

Mac分享吧 文章目录 效果一、下载软件二、开始安装1、双击运行软件&#xff0c;将拖入文件夹中&#xff0c;等待安装完毕2、应用程序显示软件图标&#xff0c;运行软件&#xff0c;点击安装&#xff0c;软件页面打开表示安装成功 三、运行测试1、打开软件&#xff0c;恢复一个…

Spark核心知识要点(三)

1、为什么要进行序列化序列化&#xff1f; 可以减少数据的体积&#xff0c;减少存储空间&#xff0c;高效存储和传输数据&#xff0c;不好的是使用的时候要反序列化&#xff0c;非常消耗CPU。 2、Yarn中的container是由谁负责销毁的&#xff0c;在Hadoop Mapreduce中containe…

kafka底层原理性能优化详解:大案例解析(第29天)

系列文章目录 一、Kafka简介 二、Kafka架构设计 三、消息传递机制 四、实例说明&#xff08;案例解析&#xff09; 五、kafka性能优化(案例解析) 文章目录 系列文章目录前言一、Kafka简介二、Kafka架构设计1. Producer&#xff08;生产者&#xff09;&#xff1a;2. Broker&am…

本地VSCode连接远程linux环境服务器的docker

目录 1、安装远程SSH 2、连接远程主机 3、远程中安装docker 4、查看容器 &#xff08;1&#xff09;直接查看容器和镜像 &#xff08;2&#xff09;使用命令查看 最近在新服务器中执行程序&#xff0c;要用到远程的docker。但是命令行环境下查看代码非常不方便&#xff0…

ActiViz实战:自定义状态按钮控件

文章目录 一、需求二、实现源码一、需求 自定义状态按钮控件,拥有选中和非选中两种状态拥有鼠标覆盖效果实现按钮点击事件按钮贴图二、实现源码 private void button1_Click(object sender, EventArgs e){CreateStatePngButton("E:\\PNGImage\\AP1.png", "E:\\…

【数值计算方法】数值积分微分-python实现-p2

原文链接&#xff1a;https://www.cnblogs.com/aksoam/p/18279394 更多精彩&#xff0c;关注博客园主页&#xff0c;不断学习&#xff01;不断进步&#xff01; 我的主页 csdn很少看私信&#xff0c;有事请b站私信 博客园主页-发文字笔记-常用 有限元鹰的主页 内容&#xf…

NodePort:固定端口

NodePort:固定端口 ## ************************************************** # 测试固定端口 # ## ************************************************* apiVersion: apps/v1 kind: Deployment metadata:name: kevin-fixed-portnamespace: default spec:# 副本数量#replicas: …