接上文:一文说清flink从编码到部署上线
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。
常见的文章中,kafka数据结构相对简单,本文根据实际项目数据,说明怎样读取解析复杂kafka数据。并将解析的数据输出到控制台。
1.模拟数据
1.1 模拟数据
{"reportFormat": "2","reportVersion": 1,"reports": [{"filename": "1733277155032RvReport","c": {"objStationInfo": {"sStationName": "LLP入口","ucStationDir": 1,"sStationID": 500001},"objVehicle": {"sUUID": "fdabd178-a169-11eb-9483-b95959072a9d","w64Timestamp": "1733881971628","objRfidInfo": {"sReaderID": "10","objTagData": {"sTID": "1234567891","sEPC": "1234567890"}},"ucReportType": "8","ucVehicleType": "1"}}}]
}
kafka_39">1.2 添加到kafka
使用kafka工具,kafkatool2,具体操作如下:
连接到kafka:
连接成功:
添加数据:
添加成功:
2.代码实现
2.1 EnvUtil实现
EnvUtil用于创建flink的运行环境。
package com.zl.utils;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;/*** EnvUtil* @description:*/
public class EnvUtil {/*** 设置flink执行环境* @param parallelism 并行度*/public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {// System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为rootSystem.setProperty("HADOOP_USER_NAME", "root");Configuration conf = new Configuration();conf.setInteger("rest.port", 1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);if (parallelism >0 ){//设置并行度env.setParallelism(parallelism);} else {env.setParallelism(1);// 默认1}// 添加重启机制env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));// 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);//rocksdb状态后端,启用增量checkpointenv.setStateBackend(new EmbeddedRocksDBStateBackend(true));//设置checkpoint路径CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 同一时间只允许一个 checkpoint 进行(默认)checkpointConfig.setMaxConcurrentCheckpoints(1);//最小间隔,10*60*1000=60000checkpointConfig.setMinPauseBetweenCheckpoints(60000);// 取消任务后,checkpoint仍然保存checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//checkpoint容忍失败的次数checkpointConfig.setTolerableCheckpointFailureNumber(5);//checkpoint超时时间 默认10分钟checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));//禁用operator chain(方便排查反压)env.disableOperatorChaining();return env;}public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//设置时区 东八tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));Configuration configuration = tenv.getConfig().getConfiguration();// 开启miniBatchconfiguration.setString("table.exec.mini-batch.enabled", "true");// 批量输出的间隔时间configuration.setString("table.exec.mini-batch.allow-latency", "5 s");// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条configuration.setString("table.exec.mini-batch.size", "20000");// 开启LocalGlobalconfiguration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");//设置TTL API指定tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));return tenv;}}
2.2 FlinkSourceUtil实现
FlinkSourceUtil用于连接kafka。
package com.zl.kafka.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;/*** @desc:*/
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {private String uniqueId;//flink生成的唯一键private long reportTime;// 过车时间private String dt; // 分区字段private String dh; // 小时private String reportFormat;private int reportVersion;private String filename;public String sStationName; // 采集点名称public String ucStationDir; // 采集点方向编号public String sStationID; // 采集点编号private String sUUID;private long w64Timestamp; //事件时间(毫秒级别)private String sReaderID;//射频设备(模块)代码private String sTIDR;private String sEPCR;private int ucReportType;//8->视频 2->射频 138,202->视频+射频private int ucVehicleType;public void parseTableColunm() {this.reportTime = this.w64Timestamp;this.uniqueId = this.sUUID;}
}
2.3 RvTable实现
RvTable解析数据最后存储的model。
package com.zl.kafka.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;/*** @desc:*/
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {private String uniqueId;//flink生成的唯一键private long reportTime;// 过车时间private String dt; // 分区字段private String dh; // 小时private String reportFormat;private int reportVersion;private String filename;public String sStationName; // 采集点名称public String ucStationDir; // 采集点方向编号public String sStationID; // 采集点编号private String sUUID;private long w64Timestamp; //事件时间(毫秒级别)private String sReaderID;//射频设备(模块)代码private String sTIDR;private String sEPCR;private int ucReportType;//8->视频 2->射频 138,202->视频+射频private int ucVehicleType;public void parseTableColunm() {this.reportTime = this.w64Timestamp;this.uniqueId = this.sUUID;}}
2.4 核心逻辑实现
package com.zl.kafka;import com.alibaba.fastjson.JSON;
import com.zl.kafka.domain.RvTable;
import com.zl.utils.EnvUtil;
import com.zl.utils.FlinkSourceUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
import java.util.Date;public class KafkaExample {public static void main(String[] args) throws Exception {// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExample");/// 读取kafka数据SingleOutputStreamOperator<String> rvSourceStream = env.addSource(FlinkSourceUtil.getKafkaSource("rvGroup","rv-test","10.86.97.21:9092","earliest"))// earliest/latest.setParallelism(1).uid("getRV").name("getRV");// 解析转换数据格式SingleOutputStreamOperator<String> rvParseStream = null;try {rvParseStream = rvSourceStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) {if (StringUtils.isEmpty(value)) {return;}parseRVData(value, out);}}).setParallelism(1).uid("rvParse").name("rvParse");} catch (Exception e) {e.printStackTrace();}rvParseStream.print();env.execute("rvParseJob");}// mainpublic static void parseRVData(String jsonStr, Collector<String> out) {try {if (StringUtils.isEmpty(jsonStr) || !isJSON(jsonStr)) {return;}JSONObject in = JSONObject.parseObject(jsonStr);// =====报告头信息 =====String reportFormat = stringDefaultIfEmpty(in.getString("reportFormat"));int reportVersion = intDefaultIfEmpty(in.getInteger("reportVersion"));JSONArray reports = in.getJSONArray("reports");if (reports != null) {for (int i = 0; i < reports.size(); i++) {RvTable rvTable = new RvTable();JSONObject record = reports.getJSONObject(i);if (record != null) {String filename = stringDefaultIfEmpty(record.getString("filename"));JSONObject c = record.getJSONObject("c");if (c != null) {// ===== 采集点信息 =====JSONObject objStationInfo = c.getJSONObject("objStationInfo");if(objStationInfo != null) {rvTable.setSStationID(stringDefaultIfEmpty(objStationInfo.getString("sStationID")));rvTable.setSStationName(stringDefaultIfEmpty(objStationInfo.getString("sStationName")));rvTable.setUcStationDir(stringDefaultIfEmpty(objStationInfo.getString("ucStationDir")));}JSONObject objVehicle = c.getJSONObject("objVehicle");if (objVehicle != null) {// ===== 车辆报告信息 =====rvTable.setSUUID(stringDefaultIfEmpty(objVehicle.getString("sUUID")));rvTable.setW64Timestamp(objVehicle.getLong("w64Timestamp"));rvTable.setUcReportType(intDefaultIfEmpty(objVehicle.getInteger("ucReportType")));rvTable.setUcVehicleType(intDefaultIfEmpty(objVehicle.getInteger("ucVehicleType")));// ===== 车辆报告信息/射频车辆信息 =====JSONObject objRfidInfo = objVehicle.getJSONObject("objRfidInfo");if (objRfidInfo != null) {rvTable.setSReaderID(stringDefaultIfEmpty(objRfidInfo.getString("sReaderID")));JSONObject objTagData = objRfidInfo.getJSONObject("objTagData");if (objTagData != null) {rvTable.setSTIDR(stringDefaultIfEmpty(objTagData.getString("sTID")));rvTable.setSEPCR(stringDefaultIfEmpty(objTagData.getString("sEPC")));}}// ===== 自加特殊处理字段 =====long timestamp = rvTable.getW64Timestamp();SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");Date date = new Date(timestamp);String[] s = simpleDateFormat.format(date).split(" ");rvTable.setDt(s[0]);rvTable.setDh(s[1]);out.collect(JSONObject.toJSONString(rvTable));}// if (objVehicle != null)}// if (c != null)}// if (record != null)}// for 循环}} catch (Exception e) {e.printStackTrace();// 此处把解析后的数据存储到数据库……}}// parseRVDatapublic static boolean isJSON(String str) {boolean result;try {JSON.parse(str);result = true;} catch (Exception e) {result = false;}return result;}public static int intDefaultIfEmpty(Integer num) {if (num == null) {num = 0;return num;}return num;}public static String stringDefaultIfEmpty(String str) {return StringUtils.defaultIfEmpty(str, "ENULL");}public static Long longDefaultIfEmpty(Long num) {if (num == null) {num = 0l;return num;}return num;}public static Double doubleDefaultIfEmpty(Double num) {if (num == null) {num = 0.0;return num;}return num;}
}
2.5 pom.xml
注意修改此处:
3.运行效果
3.1 运行日志
3.2 web UI
访问:http://IP:1000/
4.部署
相关构建、部署,参考:一文说清flink从编码到部署上线
部署脚本:
flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcKafka" -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.kafka.KafkaExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar