flink cdc2.2.1同步postgresql表

embedded/2025/2/12 6:20:20/

目录

  • 简要说明
  • maven依赖
  • 样例代码

简要说明

flink1.14.4 和 flink cdc2.2.1下,采用flink sql方式,postgresql同步表数据,本文采用的是上传jar包,利用flink REST api的方式进行sql执行。

maven依赖

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.4</flink.version><flink-cdc.version>2.2.1</flink-cdc.version><scala.binary.version>2.12</scala.binary.version></properties>
<dependencies><!-- flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.14.4</version><!--<scope>provided</scope>--></dependency><!-- flink cdc --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-oracle-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-postgres-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-sqlserver-cdc</artifactId><version>${flink-cdc.version}</version></dependency><!-- database driver --><!-- postgresql --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.5</version></dependency><!-- json --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.9.3</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><!-- log --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><!-- junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency>

样例代码

sql:
CREATE TABLE `new_table1_37877` (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'debezium.database.tablename.case.insensitive'='false',
'debezium.log.mining.continuous.mine'='true',
'password'='*****',
'hostname'='***.**.**.***',
'debezium.log.mining.strategy'='online_catalog',
'connector'='postgres-cdc',
'port'='5432',
'schema-name'='public',
'database-name'='test',
'table-name'='new_table1',
'username'='******',
'slot.name'='flink_slot',
'decoding.plugin.name'='pgoutput'
);
CREATE TABLE `new_table1_bak_37877` (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'password'='*****',
'connector'='jdbc',
'table-name'='public.new_table1_bak',
'url'='jdbc:postgresql://地址:5432/test',
'username'='用户'
);
insert into new_table1_bak_37877 select * from new_table1_37877;
参数类:
@Data
public class InputOutputParams {/*** 作业名称*/private String jobName;/*** 代码文本,分号分隔的flink sql语句*/private String codeText;}
main方法:
public class FlinkMain {/*** flink job 运行入口** @param args 运行参数*/public static void main(String[] args) throws IOException {if (args == null || args.length == 0) {throw new RuntimeException("运行参数为空");}// 取第一个参数(必须是json字符串)为运行参数String json = args[0];ObjectMapper objectMapper =new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);InputOutputParams params = objectMapper.readValue(json, InputOutputParams.class);// 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启快照点,每 3 * 60秒保存一次快照env.enableCheckpointing(3 * 60 * 1000L);//检查点可容忍失败阈值env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);//检查点超时时间env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 开启在 job 中止后仍然保留的 externalized checkpointsenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 重启策略,最多尝试重启3次,每次重启的时间间隔为20秒env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(20L, TimeUnit.SECONDS)));env.setParallelism(1);EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();// 获取表执行环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);tEnv.getConfig().getConfiguration().setString("pipeline.name", params.getJobName());// 执行操作sqlString codeText = params.getCodeText();if (codeText == null || codeText.trim().isEmpty()) {throw new RuntimeException("flink sql is empty");}String[] flinkSqlArr = codeText.split(";");for (String flinkSql : flinkSqlArr) {if (flinkSql != null && !flinkSql.trim().isEmpty()) {tEnv.executeSql(flinkSql);}}}
}

将项目打包成不带依赖的jar

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>2.10</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><!-- 复制依赖jar包 --><goal>copy-dependencies</goal></goals><configuration><!-- 依赖jar包输出目录 --><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.4</version><configuration><archive><manifest><!-- main方法所在主类 --><mainClass>com.test.FlinkMain</mainClass></manifest></archive></configuration></plugin></plugins></build>

然后将lib下的依赖全部拷贝到flink的lib下,将刚才打包好的jar界面上传
在这里插入图片描述
然后通过postman调用flink的REST api接口提交sql,接口文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/
在这里插入图片描述


http://www.ppmy.cn/embedded/161528.html

相关文章

AWS云设施攻击

AWS云设施攻击 云设施攻击列举AWS云基础设施互联网上云资源的侦擦域和子域侦察服务特定域名 通过云服务提供商的api进行侦察AWS CLI使用云服务认证公共共享资源从S3存储桶获取账户ID在其他账户中枚举IAM账户 初始IAM侦察检查受损害的凭证检查IAM权限 IAM资源枚举枚举IAM资源使用…

利用ETL工具进行数据挖掘

ETL的基本概念 数据抽取&#xff08;Extraction&#xff09;&#xff1a;从不同源头系统中获取所需数据的步骤。比如从mysql中拿取数据就是一种简单的抽取动作&#xff0c;从API接口拿取数据也是。 数据转换&#xff08;Transformation&#xff09;&#xff1a;清洗、整合和转…

如何在Vscode中接入Deepseek

一、获取Deepseek APIKEY 首先&#xff0c;登录Deepseek官网的开放平台&#xff1a;DeepSeek 选择API开放平台&#xff0c;然后登录Deepseek后台。 点击左侧菜单栏“API keys”&#xff0c;并创建API key。 需要注意的是&#xff0c;生成API key复制保存到本地&#xff0c;丢失…

Spring Boot牵手Redisson:分布式锁实战秘籍

一、引言 在当今的分布式系统架构中,随着业务规模的不断扩大和系统复杂度的日益增加,如何确保多个服务节点之间的数据一致性和操作的原子性成为了一个至关重要的问题。在单机环境下,我们可以轻松地使用线程锁或进程锁来控制对共享资源的访问,但在分布式系统中,由于各个服务…

Docker 1. 基础使用

1. Docker Docker 是一个 基于容器的虚拟化技术&#xff0c;它能够将应用及其依赖打包成 轻量级、可移植 的容器&#xff0c;并在不同的环境中运行。 2. Docker指令 &#xff08;1&#xff09;查看已有镜像 docker images &#xff08;2&#xff09;删除镜像 docker rmi …

EXCEL数据解析与加密处理方法

在 Excel 中&#xff0c;你可以通过以下步骤将字符串 h1,-109218;h10,-103431;h101,-102563;... 解析为两列数据&#xff08;一列为 h 变量&#xff0c;另一列为对应的数字&#xff09;&#xff1a; 步骤 1&#xff1a;准备数据 将字符串 h1,-109218;h10,-103431;h101,-102563…

effective-Objective-C 第五章学习笔记

内存管理 文章目录 内存管理理解引用计数属性存取方法的内存管理自动释放池要点 以ARC简化引用计数使用ARC时必须遵循的方法命名规则变量的内存管理语义ARC清理实例变量覆写内存管理方法要点 在dealloc方法中只释放引用并解除监听要点 编写异常安全代码时留意内存管理问题要点 …

用语言模型探索语音风格空间:无需情感标签的情 感TTS

用语言模型探索语音风格空间&#xff1a;无需情感标签的情感TTS 引言 研究背景 许多情感文本转语音&#xff08;E-TTS&#xff09;框架依赖于人工标注的情感标签&#xff0c;这些标签往往不准确且难以获取。情感韵律的学习具有挑战性&#xff0c;因为情感本身具有主观性。 …