flink实现复杂kafka数据读取

news/2024/12/22 22:30:35/

接上文:一文说清flink从编码到部署上线
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

常见的文章中,kafka数据结构相对简单,本文根据实际项目数据,说明怎样读取解析复杂kafka数据。并将解析的数据输出到控制台。

1.模拟数据

1.1 模拟数据

{"reportFormat": "2","reportVersion": 1,"reports": [{"filename": "1733277155032RvReport","c": {"objStationInfo": {"sStationName": "LLP入口","ucStationDir": 1,"sStationID": 500001},"objVehicle": {"sUUID": "fdabd178-a169-11eb-9483-b95959072a9d","w64Timestamp": "1733881971628","objRfidInfo": {"sReaderID": "10","objTagData": {"sTID": "1234567891","sEPC": "1234567890"}},"ucReportType": "8","ucVehicleType": "1"}}}]
}

kafka_39">1.2 添加到kafka

使用kafka工具,kafkatool2,具体操作如下:
连接到kafka
在这里插入图片描述
连接成功:
在这里插入图片描述
添加数据:
在这里插入图片描述
在这里插入图片描述
添加成功:
在这里插入图片描述

2.代码实现

2.1 EnvUtil实现

EnvUtil用于创建flink的运行环境。

package com.zl.utils;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;/*** EnvUtil* @description:*/
public class EnvUtil {/*** 设置flink执行环境* @param parallelism 并行度*/public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {// System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为rootSystem.setProperty("HADOOP_USER_NAME", "root");Configuration conf = new Configuration();conf.setInteger("rest.port", 1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);if (parallelism >0 ){//设置并行度env.setParallelism(parallelism);} else {env.setParallelism(1);// 默认1}// 添加重启机制env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));// 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);//rocksdb状态后端,启用增量checkpointenv.setStateBackend(new EmbeddedRocksDBStateBackend(true));//设置checkpoint路径CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 同一时间只允许一个 checkpoint 进行(默认)checkpointConfig.setMaxConcurrentCheckpoints(1);//最小间隔,10*60*1000=60000checkpointConfig.setMinPauseBetweenCheckpoints(60000);// 取消任务后,checkpoint仍然保存checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//checkpoint容忍失败的次数checkpointConfig.setTolerableCheckpointFailureNumber(5);//checkpoint超时时间 默认10分钟checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));//禁用operator chain(方便排查反压)env.disableOperatorChaining();return env;}public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//设置时区 东八tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));Configuration configuration = tenv.getConfig().getConfiguration();// 开启miniBatchconfiguration.setString("table.exec.mini-batch.enabled", "true");// 批量输出的间隔时间configuration.setString("table.exec.mini-batch.allow-latency", "5 s");// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条configuration.setString("table.exec.mini-batch.size", "20000");// 开启LocalGlobalconfiguration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");//设置TTL API指定tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));return tenv;}}

2.2 FlinkSourceUtil实现

FlinkSourceUtil用于连接kafka

package com.zl.kafka.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;/*** @desc:*/
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {private String uniqueId;//flink生成的唯一键private long reportTime;// 过车时间private String dt;                           // 分区字段private String dh;                           // 小时private String reportFormat;private int reportVersion;private String filename;public String sStationName;    // 采集点名称public String ucStationDir;     // 采集点方向编号public String sStationID;      // 采集点编号private String sUUID;private long w64Timestamp;     //事件时间(毫秒级别)private String sReaderID;//射频设备(模块)代码private String sTIDR;private String sEPCR;private int ucReportType;//8->视频 2->射频 138,202->视频+射频private int ucVehicleType;public void parseTableColunm() {this.reportTime = this.w64Timestamp;this.uniqueId = this.sUUID;}
}

2.3 RvTable实现

RvTable解析数据最后存储的model。

package com.zl.kafka.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;/*** @desc:*/
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {private String uniqueId;//flink生成的唯一键private long reportTime;// 过车时间private String dt;                           // 分区字段private String dh;                           // 小时private String reportFormat;private int reportVersion;private String filename;public String sStationName;    // 采集点名称public String ucStationDir;     // 采集点方向编号public String sStationID;      // 采集点编号private String sUUID;private long w64Timestamp;     //事件时间(毫秒级别)private String sReaderID;//射频设备(模块)代码private String sTIDR;private String sEPCR;private int ucReportType;//8->视频 2->射频 138,202->视频+射频private int ucVehicleType;public void parseTableColunm() {this.reportTime = this.w64Timestamp;this.uniqueId = this.sUUID;}}

2.4 核心逻辑实现

package com.zl.kafka;import com.alibaba.fastjson.JSON;
import com.zl.kafka.domain.RvTable;
import com.zl.utils.EnvUtil;
import com.zl.utils.FlinkSourceUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
import java.util.Date;public class KafkaExample {public static void main(String[] args) throws Exception {// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExample");/// 读取kafka数据SingleOutputStreamOperator<String> rvSourceStream = env.addSource(FlinkSourceUtil.getKafkaSource("rvGroup","rv-test","10.86.97.21:9092","earliest"))// earliest/latest.setParallelism(1).uid("getRV").name("getRV");// 解析转换数据格式SingleOutputStreamOperator<String> rvParseStream = null;try {rvParseStream = rvSourceStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) {if (StringUtils.isEmpty(value)) {return;}parseRVData(value, out);}}).setParallelism(1).uid("rvParse").name("rvParse");} catch (Exception e) {e.printStackTrace();}rvParseStream.print();env.execute("rvParseJob");}// mainpublic static void parseRVData(String jsonStr, Collector<String> out) {try {if (StringUtils.isEmpty(jsonStr) || !isJSON(jsonStr)) {return;}JSONObject in = JSONObject.parseObject(jsonStr);// =====报告头信息 =====String reportFormat = stringDefaultIfEmpty(in.getString("reportFormat"));int reportVersion = intDefaultIfEmpty(in.getInteger("reportVersion"));JSONArray reports = in.getJSONArray("reports");if (reports != null) {for (int i = 0; i < reports.size(); i++) {RvTable rvTable = new RvTable();JSONObject record = reports.getJSONObject(i);if (record != null) {String filename = stringDefaultIfEmpty(record.getString("filename"));JSONObject c = record.getJSONObject("c");if (c != null) {// ===== 采集点信息 =====JSONObject objStationInfo = c.getJSONObject("objStationInfo");if(objStationInfo != null) {rvTable.setSStationID(stringDefaultIfEmpty(objStationInfo.getString("sStationID")));rvTable.setSStationName(stringDefaultIfEmpty(objStationInfo.getString("sStationName")));rvTable.setUcStationDir(stringDefaultIfEmpty(objStationInfo.getString("ucStationDir")));}JSONObject objVehicle = c.getJSONObject("objVehicle");if (objVehicle != null) {// ===== 车辆报告信息 =====rvTable.setSUUID(stringDefaultIfEmpty(objVehicle.getString("sUUID")));rvTable.setW64Timestamp(objVehicle.getLong("w64Timestamp"));rvTable.setUcReportType(intDefaultIfEmpty(objVehicle.getInteger("ucReportType")));rvTable.setUcVehicleType(intDefaultIfEmpty(objVehicle.getInteger("ucVehicleType")));// ===== 车辆报告信息/射频车辆信息 =====JSONObject objRfidInfo = objVehicle.getJSONObject("objRfidInfo");if (objRfidInfo != null) {rvTable.setSReaderID(stringDefaultIfEmpty(objRfidInfo.getString("sReaderID")));JSONObject objTagData = objRfidInfo.getJSONObject("objTagData");if (objTagData != null) {rvTable.setSTIDR(stringDefaultIfEmpty(objTagData.getString("sTID")));rvTable.setSEPCR(stringDefaultIfEmpty(objTagData.getString("sEPC")));}}// ===== 自加特殊处理字段 =====long timestamp = rvTable.getW64Timestamp();SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");Date date = new Date(timestamp);String[] s = simpleDateFormat.format(date).split(" ");rvTable.setDt(s[0]);rvTable.setDh(s[1]);out.collect(JSONObject.toJSONString(rvTable));}// if (objVehicle != null)}// if (c != null)}// if (record != null)}// for 循环}} catch (Exception e) {e.printStackTrace();// 此处把解析后的数据存储到数据库……}}// parseRVDatapublic static boolean isJSON(String str) {boolean result;try {JSON.parse(str);result = true;} catch (Exception e) {result = false;}return result;}public static int intDefaultIfEmpty(Integer num) {if (num == null) {num = 0;return num;}return num;}public static String stringDefaultIfEmpty(String str) {return StringUtils.defaultIfEmpty(str, "ENULL");}public static Long longDefaultIfEmpty(Long num) {if (num == null) {num = 0l;return num;}return num;}public static Double doubleDefaultIfEmpty(Double num) {if (num == null) {num = 0.0;return num;}return num;}
}

2.5 pom.xml

注意修改此处:
在这里插入图片描述

3.运行效果

3.1 运行日志

在这里插入图片描述

3.2 web UI

访问:http://IP:1000/
在这里插入图片描述
在这里插入图片描述

4.部署

相关构建、部署,参考:一文说清flink从编码到部署上线
部署脚本:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcKafka"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.kafka.KafkaExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

5. 完整代码

完整代码见:https://gitee.com/core815/flink-cdc-mysql


http://www.ppmy.cn/news/1557309.html

相关文章

轻松拿捏Spring

目录 Spring基础 什么是Spring框架 Spring 包含的模块有哪些? Core Container AOP Data Access/Integration Spring Web Messaging Spring Test Spring,Spring MVC,Spring Boot 之间什么关系? Spring基础 什么是Spring框架 Spring 是一款开源的轻量级 Java 开发框…

skyler实战渗透笔记—Kioptrix-1

0x00 前言 This Kioptrix VM Image are easy challenges. The object of the game is to acquire root access via any means possible (except actually hacking the VM server or player). The purpose of these games are to learn the basic tools and techniques in vuln…

Pytorch | 从零构建MobileNet对CIFAR10进行分类

Pytorch | 从零构建MobileNet对CIFAR10进行分类 CIFAR10数据集MobileNet设计理念网络结构技术优势应用领域 MobileNet结构代码详解结构代码代码详解DepthwiseSeparableConv 类初始化方法前向传播 forward 方法 MobileNet 类初始化方法前向传播 forward 方法 训练过程和测试结果…

三次翻转实现数组元素的旋转

给定一个数组&#xff0c;将数组中的元素向右移动 k 个位置。 示例 1: 输入: [1,2,3,4,5,6,7] 和 k 3 输出: [5,6,7,1,2,3,4] 解释: 向右旋转 1 步: [7,1,2,3,4,5,6] 向右旋转 2 步: [6,7,1,2,3,4,5] 向右旋转 3 步: [5,6,7,1,2,3,4]示例 2: 输入: [-1,-100,3,99] 和 k 2 …

UE5 移植Editor或Developer模块到Runtime

要将源码中的非运行时模块移植到Runtime下使用&#xff0c;个人理解就是一个解决编译报错的过程&#xff0c;先将目标模块复制到项目的source目录内&#xff0c;然后修改模块文件夹名称&#xff0c;修改模块.build.cs与文件夹名称保持一致 修改build.cs内的类名 &#xff0c;每…

8K+Red+Raw+ProRes422分享5个影视级视频素材网站

Hello&#xff0c;大家好&#xff0c;我是后期圈&#xff01; 在视频创作中&#xff0c;电影级的视频素材能够为作品增添专业质感&#xff0c;让画面更具冲击力。无论是广告、电影短片&#xff0c;还是品牌宣传&#xff0c;高质量的视频素材都是不可或缺的资源。然而&#xff…

Golong中无缓冲的 channel 和 有缓冲的 channel 的区别

在Golang中&#xff0c;channel是用于goroutine之间通信的并发原语&#xff0c;它可以是无缓冲的&#xff0c;也可以是有缓冲的。无缓冲的channel和有缓冲的channel之间存在显著的区别&#xff0c;主要体现在以下几个方面&#xff1a; 一、缓冲区大小与存储能力 无缓冲channe…

JVM和数据库面试知识点

JVM内存结构 主要有几部分&#xff1a;堆、栈、方法区和程序计数器 堆是JVM中最大的一块内存区域&#xff0c;用于存储对象实例&#xff0c;一般通过new创建的对象都存放在堆中。堆被所有的线程共享&#xff0c;但是它的访问时线程不安全的&#xff0c;通常通过锁的机制来保证线…