Flink CDC 实时mysql到mysql

news/2024/12/13 0:51:32/

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。

mysqlcdc需要mysql开启binlog,找到my.cnf,在[mysqld]中加入如下信息

[mysqld]

server-id=1

log-bin=mysql-bin

binlog-format=row

重启数据库。

2.创建springboot项目,pom添加依赖

<properties>
<java.version>1.8</java.version>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.6</version>
<!-- <scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.1</version>
</dependency>

</dependencies>

<build>
<plugins>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Flink cdc实现mysql到mysql代码

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkMysqlToMysql {

public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 注册源表和目标表
tEnv.executeSql("create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'database-name' = 'quarant_db',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'password' = 'admin'\n" +
")");
// Table result = tEnv.sqlQuery("SELECT id, name,card_num,phone,address FROM quarantine");
// tEnv.registerTable("sourceTable",result);
tEnv.executeSql("create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'password' = 'admin'\n" +
")");
// 执行CDC过程
String query = "INSERT INTO targetTable SELECT * FROM sourceTable";
tEnv.executeSql(query).print();
}
}

运行Main方法

Flink会同步源表数据到目标表,后续源表的增删改都会实时同步至目标表中。

3.将程序打包成flink jar

idea使用快捷键control+alt+shift+s,点击Artifacts->JAR

 选择Main class,点击ok

 然后选择上面菜单栏Build Artifacts

 点击build

 生成的jar在项目目录下面有个out目录

至此,flink jar程序就写好了,可以把jar丢到flink上运行了 


http://www.ppmy.cn/news/295376.html

相关文章

Camera2 闪光灯梳理

Camera2 闪光灯 在Android Camrea2中与Flash有关的只有两个字段&#xff1a; CaptureRequest.FLAH_MODECaptureRequest.CONTROL_AE_MODE 所有在使用闪光灯时&#xff0c;必须先保证CaptureRequest.CONTROL_MODE为ATUO&#xff0c;而且必须保证CameraCharacteristics.FLASH_IN…

闪光灯的频闪模式

现在&#xff0c;一般的闪光灯都具有频闪闪光功能&#xff0c;通常标志为MULTI。 频闪闪光&#xff0c;也称多次闪光&#xff0c;是指通过让闪光灯多次闪光&#xff0c;将被摄体的连续动作拍摄到同一张照片中的功能。 频闪摄影&#xff0c;通俗来讲&#xff0c;就是在快门开启状…

Flutter闪光灯

打开手机闪光灯 PS&#xff1a;找到了一个别人写好的一个package&#xff0c;也就是说&#xff0c;不保证以后都一直能用&#xff0c;万一别人删了这个package就用不了了 1.添加依赖 在pubspec.yaml中添加&#xff1a; dev_dependencies:flutter_test:sdk: flutter#lamplamp…

Android_控制闪光灯

Android控制闪光灯 最近想做一个手电筒&#xff0c;在网上搜到一点资料 首先闪光灯可以用android.hardware.camera来控制 1. 在Manifest.xml文件中添加权限<uses-permission android:name"android.permission.CAMERA" /> 2. 打开闪光灯 try{ m_Camera Ca…

闪光灯的工作原理

闪光灯的工作原理 一、普通型闪光灯的基本工作电路 普通型闪光灯是指闪光输出的能量是不可调的闪光灯&#xff0c;即闪光灯的标称闪光指 数GN为一恒定值。其基本工作电路见图1-6-7。 电路由四部分组成&#xff1a;振荡升压部分、整流充电部分、电压指示部分和脉冲触发闪光…

什么是闪光灯

什么是闪光灯 闪光灯的英文学名为Flash Light。闪光灯也是加强曝光量的方式之一&#xff0c;尤其在昏暗的地方&#xff0c;打闪光灯有助于让景物更明亮。目前数码摄像机拥有闪光灯功能的机型并不多见。 使用闪光灯也会出现弊端&#xff0c;例如在拍人物时&#xff0c;闪光灯…

Camera:前后闪光灯

手机Camera的前后摄闪光灯虽然是同一个feature&#xff0c;但是从软硬件任意角度来讲&#xff0c;都有着很大的区别。后摄闪光灯是有专门的物理空间支持的&#xff0c;就是手机的手电筒物理控件&#xff0c;当后摄开启闪光灯拍照时候&#xff0c;会触发俩次打闪&#xff0c;第一…

Android如何打开闪光灯

Android如何打开闪光灯 在android中打开闪光灯的方法有两种&#xff0c;一种是获取硬件服务&#xff0c;通过反射的方式来操作闪光灯。另外一种是获得Camera对象&#xff0c;通过设置Camera的参数来操作闪光灯。一下是一个操作闪光灯的工具类&#xff1a;实现了两种方式操作闪…