Centos7单机部署Flink13.6及测试FinkCDC同步MySQL

news/2024/12/29 21:39:11/

一、背景

公司CDH6.3.2里面的版本是Flink1.12.0。而因为FlinkCDC2.0.0只支持Flink1.13.0以后,版本不匹配,所以只能升级版本。但是升级版本是个大工程,要编译、要parcel制作工具,而且是生产环境的升级,没办法因为要测试FlinkCDC,所以只能搭建个单机测试,等后面生产环境升级后再迁移;

二、软件安装
2.1 安装Hadoop单节点
具体的就不介绍了。。。。。
可参照:https://blog.csdn.net/J080624/article/details/67638594

2.2 安装Flink1.13.6
官网下载:https://flink.apache.org/downloads.html
2.2.1 解压安装文件到/opt/module

tar -zxvf flink-1.13.6-bin-scala_2.12.tgz -C /opt/module/

2.2.2 添加Flink到环境变量

[root@localhost ~]# vim /etc/profile.d/my_env.sh//在my_env.sh文件末尾添加如下内容:
export FLINK_HOME=/opt/module/flink-1.13.6
export PATH=$FLINK_HOME/bin:$PATH[root@localhost ~]# source /etc/profile

2.2.3 测试

[root@localhost  flink-1.13.6]# flink --version
Version: 1.13.6, Commit ID: b2ca390

2.2.4 开启8081端口
打开${flink}/conf/flink-conf.yaml文件,修改一下信息。
在这里插入图片描述
在这里插入图片描述
taskmanager.host: localhost要添加,不然会报以下的错:
TaskExecutor akka.tcp://xxx has no more allocated slots for job
在这里插入图片描述
原因:flink部署到集群上,standalone模式,需要指定TaskManager主机的地址:修改flink-conf.yaml配置并添加配置 taskmanager.host: localhost

2.2.5 启动Flink和Hadoop。

[root@locahost hadoop-3.1.3]# ./sbin/start-dfs.sh
[root@locahost flink-1.13.6]# ./bin/start-cluster.sh 
[root@locahost bin]# jps
9408 DataNode
21504 TaskManagerRunner
9633 SecondaryNameNode
9285 NameNode
23399 Jps
21210 StandaloneSessionClusterEntrypoint
[root@locahost  bin]#

2.2.6 打开UI页面
发现无法打开,原因是防火墙未关闭,联系运维开放8081端口。重新打开;
在这里插入图片描述

三、Flink自带代码测试

[root@localhost bin]#  ./flink run ../examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID aed77c0e3c8d6a7abc0d7ffbd9f86e16
Program execution finished
Job with JobID aed77c0e3c8d6a7abc0d7ffbd9f86e16 has finished.
Job Runtime: 369 ms
Accumulator Results: 
- 748bb343c29c89864924c9572dc09c07 (java.util.ArrayList) [170 elements](a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
(bear,3)
(bodkin,1)
(bourn,1)
(but,1)
(by,2)
(calamity,1)
(cast,1)
(coil,1)
(come,1)
(conscience,1)
(consummation,1)
(contumely,1)
(country,1)
(cowards,1)
(currents,1)
(d,4)
(death,2)
(delay,1)
(despis,1)
(devoutly,1)
(die,2)
(does,1)
(dread,1)
(dream,1)
(dreams,1)
(end,2)
(enterprises,1)
(er,1)
(fair,1)
(fardels,1)
(flesh,1)
(fly,1)
(for,2)
(fortune,1)
(from,1)
(give,1)
(great,1)
(grunt,1)
(have,2)
(he,1)
(heartache,1)
(heir,1)
(himself,1)
(his,1)
(hue,1)
(ills,1)
(in,3)
(insolence,1)
(is,3)
(know,1)
(law,1)
(life,2)
(long,1)
(lose,1)
(love,1)
(make,2)
(makes,2)
(man,1)
(may,1)
(merit,1)
(might,1)
(mind,1)
(moment,1)
(more,1)
(mortal,1)
(must,1)
(my,1)
(name,1)
(native,1)
(natural,1)
(no,2)
(nobler,1)
(not,2)
(now,1)
(nymph,1)
(o,1)
(of,15)
(off,1)
(office,1)
(ophelia,1)
(opposing,1)
(oppressor,1)
(or,2)
(orisons,1)
(others,1)
(outrageous,1)
(pale,1)
(pangs,1)
(patient,1)
(pause,1)
(perchance,1)
(pith,1)
(proud,1)
(puzzles,1)
(question,1)
(quietus,1)
(rather,1)
(regard,1)
(remember,1)
(resolution,1)
(respect,1)
(returns,1)
(rub,1)
(s,5)
(say,1)
(scorns,1)
(sea,1)
(shocks,1)
(shuffled,1)
(sicklied,1)
(sins,1)
(sleep,5)
(slings,1)
(so,1)
(soft,1)
(something,1)
(spurns,1)
(suffer,1)
(sweat,1)
(take,1)
(takes,1)
(than,1)
(that,7)
(the,22)
(their,1)
(them,1)
(there,2)
(these,1)
(this,2)
(those,1)
(thought,1)
(thousand,1)
(thus,2)
(thy,1)
(time,1)
(tis,2)
(to,15)
(traveller,1)
(troubles,1)
(turn,1)
(under,1)
(undiscover,1)
(unworthy,1)
(us,3)
(we,4)
(weary,1)
(what,1)
(when,2)
(whether,1)
(whips,1)
(who,2)
(whose,1)
(will,1)
(wish,1)
(with,3)
(would,2)
(wrong,1)
(you,1)
[root@flink0 bin]# 

四、自定义wordcount测试

package com.lzl;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author lzl* @create 2023-05-18 15:53* @name WordCount*/
public class WordCount {public static void main(String[] args) throws Exception{//TODO 1.创建环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//TODO 2.输入流窗口的信息配置DataStream<String> dataStream = env.socketTextStream("10.110.17.182", 9999, "\n");//TODO 3.数据转换DataStream<Tuple2<String,Integer>> countData = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.toLowerCase().split("\\W+");for(String word:words) {if (word.length() > 0) {out.collect(new Tuple2<>(word,1));}}}}).keyBy(value -> value.f0).sum(1);//TODO 4.数据打印到控制台countData.print();//TODO 5.执行任务env.execute("CountSocketWord");}
}

本地代码测试报错:
在这里插入图片描述
原因:又是防火墙的问题,防火墙没开通9999端口。

[root@flink0 software]# firewall-cmd --zone=public --add-port=9999/tcp --permanent
success
[root@flink0 software]# systemctl restart firewalld
[root@flink0 software]# 

在这里插入图片描述
在这里插入图片描述
本地测试ok~!

4.2 上传Flink Web测试
填写Entry Class(注意格式,可复制),提交。
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
报错:是因为9999端口没启动。
在这里插入图片描述
重新提交!
在这里插入图片描述
显示running。
输入单词之后,有收到字节大小,以及在标准输出控制台,可以看到它的输出。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
测试ok!

五、FlinkCDC同步MySQL数据测试
5.1 自定义反序列MyDeserializationSchema

package com.lzl;import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;/*** @author lzl* @create 2023-05-18 15:41* @name MyDeserializationSchema*/
public class MyDeserializationSchema implements DebeziumDeserializationSchema<JSONObject> {private static final long serialVersionUID = -3168848963265670603L;public MyDeserializationSchema() {}@Overridepublic void deserialize(SourceRecord record, Collector<JSONObject> out) {Struct dataRecord = (Struct) record.value();Struct afterStruct = dataRecord.getStruct("after");Struct beforeStruct = dataRecord.getStruct("before");/*todo 1,同时存在 beforeStruct 跟 afterStruct数据的话,就代表是update的数据2,只存在 beforeStruct 就是delete数据3,只存在 afterStruct数据 就是insert数据*/JSONObject logJson = new JSONObject();String canal_type = "";List<Field> fieldsList = null;if (afterStruct != null && beforeStruct != null) {System.out.println("这是修改数据");canal_type = "update";fieldsList = afterStruct.schema().fields();//todo 字段与值for (Field field : fieldsList) {String fieldName = field.name();Object fieldValue = afterStruct.get(fieldName);
//            System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);logJson.put(fieldName, fieldValue);}} else if (afterStruct != null) {System.out.println("这是新增数据");canal_type = "insert";fieldsList = afterStruct.schema().fields();//todo 字段与值for (Field field : fieldsList) {String fieldName = field.name();Object fieldValue = afterStruct.get(fieldName);
//            System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);logJson.put(fieldName, fieldValue);}} else if (beforeStruct != null) {System.out.println("这是删除数据");canal_type = "delete";fieldsList = beforeStruct.schema().fields();//todo 字段与值for (Field field : fieldsList) {String fieldName = field.name();Object fieldValue = beforeStruct.get(fieldName);
//            System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);logJson.put(fieldName, fieldValue);}} else {System.out.println("一脸蒙蔽了");}//todo 拿到databases table信息Struct source = dataRecord.getStruct("source");Object db = source.get("db");Object table = source.get("table");Object ts_ms = source.get("ts_ms");logJson.put("canal_database", db);logJson.put("canal_table", table);logJson.put("canal_ts", ts_ms);logJson.put("canal_type", canal_type);//todo 拿到topicString topic = record.topic();System.out.println("topic = " + topic);//todo 主键字段Struct pk = (Struct) record.key();List<Field> pkFieldList = pk.schema().fields();int partitionerNum = 0;for (Field field : pkFieldList) {Object pkValue = pk.get(field.name());partitionerNum += pkValue.hashCode();}int hash = Math.abs(partitionerNum) % 3;logJson.put("pk_hashcode", hash);out.collect(logJson);}@Overridepublic TypeInformation<JSONObject> getProducedType() {return BasicTypeInfo.of(JSONObject.class);}
}

5.2 写入MySQL类

package com.lzl;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** @author lzl* @create 2023-05-12 18:24* @name Writer*/public class MysqlWriter extends RichSinkFunction<JSONObject> {private static final Logger LOGGER = LoggerFactory.getLogger(MysqlWriter.class);private Connection connection = null;private PreparedStatement insertStatement = null;private PreparedStatement updateStatement = null;private PreparedStatement deleteStatement= null;//目标库的信息@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);if (connection == null) {Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动connection = DriverManager.getConnection("jdbc:mysql://10.110.17.37:3306/flink_cdc?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8","root","xxb@5196");//获取连接}insertStatement = connection.prepareStatement(  // 获取执行语句"insert into flink_cdc.student_2 values (?,?,?,?)");  //插入数据updateStatement = connection.prepareStatement(  // 获取执行语句"update flink_cdc.student_2 set name=?,age=?,dt=? where id=?");  //更新数据deleteStatement = connection.prepareStatement(  // 获取执行语句"delete from flink_cdc.student_2 where id=?");  //删除数据}//执行插入和更新语句@Overridepublic void invoke(JSONObject value, Context context) throws Exception {// 获取binlogInteger id = (Integer) value.get("id");String name = (String) value.get("name");Integer age = (Integer) value.get("age");String dt = (String) value.get("dt");String canal_type =(String) value.get("canal_type");if(canal_type =="insert"){insertStatement.setInt(1, id);insertStatement.setString(2, name);insertStatement.setInt(3, age);insertStatement.setString(4, dt);insertStatement.execute();
//            LOGGER.info(insertStatement.toString();}if (canal_type =="update"){// 每条数据到来后,直接执行更新语句  这里强调注意:1,2,3,4序号必须与占位符(?)对应起来,比如第一位是name,id最后一位updateStatement.setString(1,name);updateStatement.setInt(2, age);updateStatement.setString(3, dt);updateStatement.setInt(4, id);updateStatement.execute();  // 执行更新语句
//            LOGGER.info(updateStatement.toString());}if (canal_type =="delete"){deleteStatement.setInt(1, id);deleteStatement.execute();}
//        // 每条数据到来后,直接执行更新语句  这里强调注意:1,2,3,4序号必须与占位符(?)对应起来,比如第一位是name,id最后一位
//        updateStatement.setString(1,name);
//        updateStatement.setInt(2, age);
//        updateStatement.setString(3, dt);
//        updateStatement.setInt(4, id);
//        updateStatement.execute();  // 执行更新语句
//        LOGGER.info(updateStatement.toString());//如果更新数为0,则执行插入语句
//        if(updateStatement.getUpdateCount() == 0)
//            insertStatement.setInt(1, id);
//            insertStatement.setString(2, name);
//            insertStatement.setInt(3, age);
//            insertStatement.setString(4, dt);
//            insertStatement.execute();
//            LOGGER.info(insertStatement.toString());
//}//关闭数据库连接@Overridepublic void close() throws Exception {super.close();if (connection != null) {connection.close();}if (updateStatement != null) {updateStatement.close();}if (insertStatement != null) {insertStatement.close();}
//        super.close();}
}

5.3 主类
Flink_CDC_To_MySqlBinlog

package com.lzl;import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;/*** @author lzl* @create 2023-05-18 17:33* @name Flink_CDC_To_MySqlBinlog*/
public class Flink_CDC_To_MySqlBinlog {public static void main(String[] args) throws Exception{//TODO 1.获取Flink的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//TODO 2.Flink-CDC将读取 binlog的位置信息以状态的方式保存在 CK,如果想要做到断点续传 ,需要从 Checkpoint或者 Savepoint启动程序//2.1 开启 Checkpoint,每隔 5秒钟做一次 CKenv.enableCheckpointing(5000L);//2.2 指定 CK的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://flink0:8020/flinkCDCTest"));//2.6 设置访问 HDFS的用户名System.setProperty("HADOOP_USER_NAME", "root");//TODO 3.通过FlinkCDC构建SourceFunction
//        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()SourceFunction<JSONObject> sourceFunction = MySqlSource.<JSONObject>builder().hostname("10.110.17.52").port(3306).username("root").password("xxb@5196").databaseList("flink_cdc").tableList("flink_cdc.student")  //表前一定要加上库名.deserializer(new MyDeserializationSchema())  //自己定义反序列.startupOptions(StartupOptions.initial()) //全量同步.build();//TODO 4.使用 CDC Source从 MySQL读取数据
//        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);DataStreamSource<JSONObject> dataStream = env.addSource(sourceFunction);//TODO 5.数据打印dataStream.print("=====>>>>>>>");//TODO 6.写入另一个MySQL中dataStream.addSink(new MysqlWriter());//TODO 7.启动任务env.execute("Flink_CDC_To_MySqlBinlog");}
}

5.4 打包上传
打包顺序:先clean–>再build–>package
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
查看控制台标准输出:
在这里插入图片描述
5.5 验证MySQL的增删改同步
新增:
源数据库:
在这里插入图片描述
Flink控制台:
在这里插入图片描述
目标库:
在这里插入图片描述

更改:(张良年龄改为18)
在这里插入图片描述
在这里插入图片描述
目标库:
在这里插入图片描述
删除:(删掉张良这一条数据)
在这里插入图片描述
目标库:
在这里插入图片描述
实现了数据的增删改同步~!。

5.6 、查看Hadoop上的CK检查点存储~!
在这里插入图片描述
CK也存储成功!


http://www.ppmy.cn/news/74965.html

相关文章

ubuntu中安装autogpt,python虚拟环境安装使用

ubuntu中安装autogpt&#xff0c;python虚拟环境安装使用 git安装 https://gitforwindows.org python3.10安装&#xff1a; autogpt支持python版本是3.10&#xff0c;ubuntu20.04中默认版本3.8是不支持的。 安装虚拟环境 sudo add-apt-repository ppa:deadsnakes/ppa sudo…

数据结构(C语言):一元多项式的操作(链表实现)

一、题目 一元多项式的操作 设有两个一元多项式&#xff1a; p(x)p0p1xp2x2pnxn q(x)q0q1xq2x2qmxm 多项式项的系数为实数&#xff0c;指数为整数&#xff0c;设计实现一元多项式操作的程序&#xff1a; ① 多项式链表建立&#xff1a;以&#xff08;系数&#xff0c;指数…

亚马逊开放个人卖家验证入口?亚马逊卖家验证到底怎么搞?

亚马逊卖家账户的安全对于所有卖家来说都非常重要。如果卖家想要在亚马逊上长期稳定地发展&#xff0c;赚取更多的钱并推出更多热卖产品&#xff0c;就必须确保他们的亚马逊卖家账户安全&#xff0c;特别是一直存在的亚马逊账户验证问题。 近期&#xff0c;根据亚马逊官方披露的…

蓝精灵协会启动第二阶段的 NFT 连续发售活动

四个月前&#xff0c;蓝精灵协会推出了一款完全上链的 NFT 游戏&#xff0c;参与的钱包数量超过 85,000 个&#xff0c;并进入了前 100 Dapps 排名&#xff0c;成为了 Web3 领域的一匹黑马。 两周前&#xff0c;我们开始了第二阶段的连续销售活动&#xff0c;旨在建立一个前沿 …

Pytorch从零开始实现Vision Transformer (from scratch)

Pytorch从零开始实现Vision Transformer 前言一、Vision Transformer架构介绍1. Patch Embedding2. Multi-Head Attention3. Transformer BlockFeed Forward 二、预备知识1. Einsum2. Einops 三、Vision Transformer代码实现0. 导入库1. Patch Embedding2. Residual & Norm…

WebSocket的那些事(2-实操篇)

目录 一、概述二、Websocket API1、引入相关依赖2、配置WebSocket处理器3、WebSocket配置4、测试 三、总结 一、概述 在上一节 WebSocket的那些事&#xff08;1-概念篇&#xff09;中我们简单的介绍了关于WebSocket协议的相关概念、与HTTP的联系区别等等。 这一节将会带来Web…

MySQL表设计原则

前言 这里简单整理一些常用的数据库表设计原则以及常用字段的使用范围。 表的设计准则 1、命名规范 表名、字段名必须使用小写字母或者数字&#xff0c;禁止使用数字开头&#xff0c;禁止使用拼音&#xff0c;并且一般不使用英文缩写。主键索引名为 pk_字段名&#xff1b;唯…

linux 安装 ffmpeg

linux 安装 ffmpeg windows上安装&#xff0c;直接下载压缩包解压。linux安装&#xff0c;找了半天各种技术文章&#xff0c;说最好编译安装&#xff0c;按照步骤安装编译环境编译成功了&#xff0c;但是使用的时候总要安装各种外部库&#xff0c;转码转不了等等问题...... 最…