Flink加载维度数据

embedded/2024/9/24 3:37:11/

Flink加载维度数据

1、为何要加载维度数据?

在我们构建实时数仓时,不能光有事实数据,也需要加载维度数据来标明这些事实数据的具体含义。若只含有事实数据的话,就相当于只有数据本身在不断地变化,而并不知道这些数据具体表示什么意思。因此,我们应当加载维度数据进来。

2、加载维度数据的方式

此处,将提供两种常见的用于加载维度数据的方式。

方式一:缓存文件

district.txt文件:存放于resources资源目录下

1   nanjing
2   suzhou
3   changzhou
4   xuzhou

主体代码

package recovery;import modules.env.Environments;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.Tuple3;import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;/*** 缓存文件的注册与获取*/
public class TestCache {public static void main(String[] args) throws Exception {// 创建环境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 1.注册缓存文件String path = Thread.currentThread().getContextClassLoader().getResource("district.txt").getPath();// 获取静态文件district.txt的路径see.registerCachedFile(path,"district"); // 缓存至环境中// 2.注册任务侦听器see.registerJobListener(new JobListener() {@Overridepublic void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {// 任务提交时// 任务正常:输出jobClient,任务异常:throwableif (Objects.nonNull(jobClient)) {// 输出IDSystem.out.println(jobClient.getJobID().toString());// 输出状态try {System.err.println(jobClient.getJobStatus().get(10, TimeUnit.SECONDS).name());} catch (Exception e) {System.err.println(e.getMessage());}}else if (Objects.nonNull(throwable)) {// 异常不为空System.err.println(throwable.getMessage());}}@Overridepublic void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {// 任务执行// 任务正常:输出jobExecutionResult,任务异常:throwableif (Objects.nonNull(jobExecutionResult)) {System.out.println(jobExecutionResult);}else if (Objects.nonNull(throwable)){System.err.println(throwable.getMessage());}}});// 3.数据:ID,温度,时间戳// 生成水位线TimestampAssignerSupplier<Tuple3> supplier = new TimestampAssignerSupplier<Tuple3>() {@Overridepublic TimestampAssigner<Tuple3> createTimestampAssigner(Context context) {return (element,recordTimestamp) -> (Long) element._3();}};WatermarkStrategy<Tuple3> watermark = WatermarkStrategy.<Tuple3>forMonotonousTimestamps().withTimestampAssigner(supplier);// 数据see.fromCollection(Arrays.asList(new Tuple3(1,34,System.currentTimeMillis()),new Tuple3(2,36,System.currentTimeMillis()+1000),new Tuple3(1,35,System.currentTimeMillis()+2000),new Tuple3(3,32,System.currentTimeMillis()+3000),new Tuple3(2,33,System.currentTimeMillis()+4000)))// 4.将缓存文件中地址内容来替代数据中的ID号【通过ID关联】.setParallelism(1).assignTimestampsAndWatermarks(watermark).map(new RichMapFunction<Tuple3, Tuple3>() {Map<Integer,String> idName = new HashMap<>(); // 全局Map// 初始化资源@Overridepublic void open(Configuration parameters) throws Exception {// 读取缓存文件File district = getRuntimeContext().getDistributedCache().getFile("district");try(BufferedReader br = new BufferedReader(new FileReader(district))){ // 会自动释放()内资源String line;while (Objects.nonNull(line = br.readLine())) {String[] s = line.split("\\s+");idName.put(Integer.valueOf(s[0]),s[1]);}}catch (Exception ex){ex.printStackTrace();}}@Overridepublic Tuple3 map(Tuple3 value) throws Exception {return new Tuple3(idName.get(value._1()),value._2(),value._3());}// 释放资源@Overridepublic void close() throws Exception {idName.clear();}}).print();see.execute("cache-test");}
}

结果展示

(nanjing,34,1727094791401)
(suzhou,36,1727094792401)
(nanjing,35,1727094793401)
(changzhou,32,1727094794401)
(suzhou,33,1727094795401)

方式二:广播变量

主要代码

package recovery;import modules.env.Environments;
import modules.time.Timer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowStagger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple2;
import scala.Tuple3;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;/*** 广播变量的发送与获取* 连接流 connect*/
public class TestBroadcastConnect {public static void main(String[] args) throws Exception {// 1.创建环境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 2.广播变量MapStateDescriptor desc1 = new MapStateDescriptor("idCity", Integer.class, String.class); // 描述特征BroadcastStream<Tuple2> broadcastStream = see.fromCollection(Arrays.asList(// 广播出去的内容new Tuple2(1, "nanjing"),new Tuple2(2, "suzhou"),new Tuple2(3, "wuxi"))).broadcast(desc1); // 广播流// 3.数据:ID,温度,时间戳see.fromCollection(Arrays.asList(new Tuple3(1,34,System.currentTimeMillis()),new Tuple3(2,36,System.currentTimeMillis()+1000),new Tuple3(1,35,System.currentTimeMillis()+2000),new Tuple3(3,32,System.currentTimeMillis()+3000),new Tuple3(2,33,System.currentTimeMillis()+4000))).setParallelism(1).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<Tuple3>) (element,recordTimestamp) -> (Long) element._3()))// 4.连接流:与广播流数据进行连接(获取广播变量,变为广播连接流).connect(broadcastStream)// 5.将广播变量中地址内容来替代数据中的ID号【通过ID关联】.process(new BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>() {@Overridepublic void processElement(Tuple3 value, BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>.ReadOnlyContext ctx, Collector<Tuple3> out) throws Exception {Object v = ctx.getBroadcastState(desc1).get(value._1()); // 取out.collect(new Tuple3(v,value._2(),value._3()));}@Overridepublic void processBroadcastElement(Tuple2 value, BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>.Context ctx, Collector<Tuple3> out) throws Exception {ctx.getBroadcastState(desc1).put(value._1,value._2); // 存}})// 6.业务: 平均温度.keyBy(t3->t3._1().toString()).window(Timer.tumbling(5,0,TimeUnit.SECONDS ,WindowStagger.NATURAL)).process(new ProcessWindowFunction<Tuple3, Tuple2, String, TimeWindow>() {@Overridepublic void process(String city, ProcessWindowFunction<Tuple3, Tuple2, String, TimeWindow>.Context context, Iterable<Tuple3> elements, Collector<Tuple2> out) throws Exception {float avg = 0.0f;int count = 0;Iterator<Tuple3> it = elements.iterator();while(it.hasNext()){count++;avg += (Integer) it.next()._2();}avg /= count;// 将平均温度往后送out.collect(new Tuple2(city,avg));}})// 相当于print()操作.addSink(new SinkFunction<Tuple2>() {@Overridepublic void invoke(Tuple2 value, Context context) throws Exception {System.out.println(value);}});see.execute("broadcast-connect");}
}

结果展示

(nanjing,34.5)
(suzhou,36.0)
(wuxi,32.0)
(suzhou,33.0)

http://www.ppmy.cn/embedded/115881.html

相关文章

faiss安装 (CPU版本)

faiss版本 faiss-v1.7.4 cd faiss-v1.7.4cmake -B build . -DBUILD_TESTINGOFF -DFAISS_ENABLE_GPUOFF -DFAISS_ENABLE_PYTHONOFFmake -C build -j faiss&#xff1b; 默认安装路径如下 -- Installing: /usr/local/lib64/libfaiss.a -- Installing: /usr/local/include/faiss…

SQLAlchemy打印ORM的SQL

SQLAlchemy打印ORM的SQL from sqlalchemy.dialects import mysqlquery = print("原生SQL:", query.statement.compile(dialect=mysql.dialect

iTerm2下载并配置

一、iTerm2下载 安装下载&#xff1a;https://iterm2.com/downloads.html 二、oh-my-zsh下载 oh-my-zsh 用于主题安装&#xff0c;优化iTerm2界面显示 简单粗暴安装&#xff1a; brew install zsh git 三、利用oh-my-zsh配置iTerm2主题 1.配置文件位置 在oh-my-zsh的配置文件中…

【delphi】正则判断windows完整合法文件名,包括路径

在 Delphi 中&#xff0c;可以使用正则表达式来检查 Windows 文件名称或路径是否合法。合法的文件名和路径要求符合以下几点&#xff1a; 禁止的字符&#xff1a;文件名和路径不能包含以下字符&#xff1a;<, >, :, ", /, \, |, ?, *。文件名不能以空格或点结束。…

【C++】list详解及模拟实现

目录 1. list介绍 2. list使用 2.1 修改相关 2.2 遍历 2.3 构造 2.4 迭代器 2.5 容量相关 2.6 元素访问 2.7 操作相关 3. 模拟实现 3.1 节点类 3.1.1 初始结构 3.1.2 节点的构造函数 3.2 迭代器类 3.2.1 初始结构 3.2.2 迭代器 3.2.3 迭代器-- 3.2.4 解引…

vue 案例使用

el-switch 按键的使用 <el-switchclass"switchStyle" v-model"boolValue" :active-value"1" :inactive-value"0" active-text"ON" inactive-text"OFF" active-color"#13ce66" inactive-color&qu…

基于等保2.0标准——区块链安全扩展要求探讨

在数字经济时代&#xff0c;区块链作为新技术&#xff0c;能够推进经济社会规则体系重构&#xff0c;在经济金融、司法审判、食品追溯、商业贸易、公共信用等领域已有广泛应用。但在规划、建设、运维区块链的同时&#xff0c;也需要全面评估与防范区块链应用带来的安全隐患。 …

C语言编译四大阶段

目录 一、引言 二、预处理阶段 三、编译阶段 四、汇编阶段 五、链接阶段 六、总结 本文将详细介绍C语言编译的四个阶段&#xff0c;包括预处理、编译、汇编和链接。通过学习这些阶段&#xff0c;读者可以更好地理解C语言程序的编译过程&#xff0c;提高编程效率。 一、引…