Flink加载维度数据

server/2024/9/24 7:44:24/

Flink加载维度数据

1、为何要加载维度数据?

在我们构建实时数仓时,不能光有事实数据,也需要加载维度数据来标明这些事实数据的具体含义。若只含有事实数据的话,就相当于只有数据本身在不断地变化,而并不知道这些数据具体表示什么意思。因此,我们应当加载维度数据进来。

2、加载维度数据的方式

此处,将提供两种常见的用于加载维度数据的方式。

方式一:缓存文件

district.txt文件:存放于resources资源目录下

1   nanjing
2   suzhou
3   changzhou
4   xuzhou

主体代码

package recovery;import modules.env.Environments;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.Tuple3;import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;/*** 缓存文件的注册与获取*/
public class TestCache {public static void main(String[] args) throws Exception {// 创建环境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 1.注册缓存文件String path = Thread.currentThread().getContextClassLoader().getResource("district.txt").getPath();// 获取静态文件district.txt的路径see.registerCachedFile(path,"district"); // 缓存至环境中// 2.注册任务侦听器see.registerJobListener(new JobListener() {@Overridepublic void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {// 任务提交时// 任务正常:输出jobClient,任务异常:throwableif (Objects.nonNull(jobClient)) {// 输出IDSystem.out.println(jobClient.getJobID().toString());// 输出状态try {System.err.println(jobClient.getJobStatus().get(10, TimeUnit.SECONDS).name());} catch (Exception e) {System.err.println(e.getMessage());}}else if (Objects.nonNull(throwable)) {// 异常不为空System.err.println(throwable.getMessage());}}@Overridepublic void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {// 任务执行// 任务正常:输出jobExecutionResult,任务异常:throwableif (Objects.nonNull(jobExecutionResult)) {System.out.println(jobExecutionResult);}else if (Objects.nonNull(throwable)){System.err.println(throwable.getMessage());}}});// 3.数据:ID,温度,时间戳// 生成水位线TimestampAssignerSupplier<Tuple3> supplier = new TimestampAssignerSupplier<Tuple3>() {@Overridepublic TimestampAssigner<Tuple3> createTimestampAssigner(Context context) {return (element,recordTimestamp) -> (Long) element._3();}};WatermarkStrategy<Tuple3> watermark = WatermarkStrategy.<Tuple3>forMonotonousTimestamps().withTimestampAssigner(supplier);// 数据see.fromCollection(Arrays.asList(new Tuple3(1,34,System.currentTimeMillis()),new Tuple3(2,36,System.currentTimeMillis()+1000),new Tuple3(1,35,System.currentTimeMillis()+2000),new Tuple3(3,32,System.currentTimeMillis()+3000),new Tuple3(2,33,System.currentTimeMillis()+4000)))// 4.将缓存文件中地址内容来替代数据中的ID号【通过ID关联】.setParallelism(1).assignTimestampsAndWatermarks(watermark).map(new RichMapFunction<Tuple3, Tuple3>() {Map<Integer,String> idName = new HashMap<>(); // 全局Map// 初始化资源@Overridepublic void open(Configuration parameters) throws Exception {// 读取缓存文件File district = getRuntimeContext().getDistributedCache().getFile("district");try(BufferedReader br = new BufferedReader(new FileReader(district))){ // 会自动释放()内资源String line;while (Objects.nonNull(line = br.readLine())) {String[] s = line.split("\\s+");idName.put(Integer.valueOf(s[0]),s[1]);}}catch (Exception ex){ex.printStackTrace();}}@Overridepublic Tuple3 map(Tuple3 value) throws Exception {return new Tuple3(idName.get(value._1()),value._2(),value._3());}// 释放资源@Overridepublic void close() throws Exception {idName.clear();}}).print();see.execute("cache-test");}
}

结果展示

(nanjing,34,1727094791401)
(suzhou,36,1727094792401)
(nanjing,35,1727094793401)
(changzhou,32,1727094794401)
(suzhou,33,1727094795401)

方式二:广播变量

主要代码

package recovery;import modules.env.Environments;
import modules.time.Timer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowStagger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple2;
import scala.Tuple3;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;/*** 广播变量的发送与获取* 连接流 connect*/
public class TestBroadcastConnect {public static void main(String[] args) throws Exception {// 1.创建环境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 2.广播变量MapStateDescriptor desc1 = new MapStateDescriptor("idCity", Integer.class, String.class); // 描述特征BroadcastStream<Tuple2> broadcastStream = see.fromCollection(Arrays.asList(// 广播出去的内容new Tuple2(1, "nanjing"),new Tuple2(2, "suzhou"),new Tuple2(3, "wuxi"))).broadcast(desc1); // 广播流// 3.数据:ID,温度,时间戳see.fromCollection(Arrays.asList(new Tuple3(1,34,System.currentTimeMillis()),new Tuple3(2,36,System.currentTimeMillis()+1000),new Tuple3(1,35,System.currentTimeMillis()+2000),new Tuple3(3,32,System.currentTimeMillis()+3000),new Tuple3(2,33,System.currentTimeMillis()+4000))).setParallelism(1).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<Tuple3>) (element,recordTimestamp) -> (Long) element._3()))// 4.连接流:与广播流数据进行连接(获取广播变量,变为广播连接流).connect(broadcastStream)// 5.将广播变量中地址内容来替代数据中的ID号【通过ID关联】.process(new BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>() {@Overridepublic void processElement(Tuple3 value, BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>.ReadOnlyContext ctx, Collector<Tuple3> out) throws Exception {Object v = ctx.getBroadcastState(desc1).get(value._1()); // 取out.collect(new Tuple3(v,value._2(),value._3()));}@Overridepublic void processBroadcastElement(Tuple2 value, BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>.Context ctx, Collector<Tuple3> out) throws Exception {ctx.getBroadcastState(desc1).put(value._1,value._2); // 存}})// 6.业务: 平均温度.keyBy(t3->t3._1().toString()).window(Timer.tumbling(5,0,TimeUnit.SECONDS ,WindowStagger.NATURAL)).process(new ProcessWindowFunction<Tuple3, Tuple2, String, TimeWindow>() {@Overridepublic void process(String city, ProcessWindowFunction<Tuple3, Tuple2, String, TimeWindow>.Context context, Iterable<Tuple3> elements, Collector<Tuple2> out) throws Exception {float avg = 0.0f;int count = 0;Iterator<Tuple3> it = elements.iterator();while(it.hasNext()){count++;avg += (Integer) it.next()._2();}avg /= count;// 将平均温度往后送out.collect(new Tuple2(city,avg));}})// 相当于print()操作.addSink(new SinkFunction<Tuple2>() {@Overridepublic void invoke(Tuple2 value, Context context) throws Exception {System.out.println(value);}});see.execute("broadcast-connect");}
}

结果展示

(nanjing,34.5)
(suzhou,36.0)
(wuxi,32.0)
(suzhou,33.0)

http://www.ppmy.cn/server/121242.html

相关文章

对条件语言模型(Conditional Language Model)的目标函数的理解

在翻看LORA这篇论文的时候&#xff0c;忽然对条件语言模型优化的目标函数产生了一些疑问&#xff0c;下面是理解。 这个目标函数描述了条件语言模型&#xff08;Conditional Language Model&#xff09;的目标&#xff0c;即通过最大化对数似然估计来学习参数( Φ \Phi Φ)&a…

Stable Diffusion 使用详解(13)--- 3D纹理增强

目录 背景 Normal Map 描述 原理 使用心得 例子 描述 原图 参数设置 底模 ​编辑 正负相关性提示词 其他参数 controlnet 效果 还能做点啥 调整 效果 背景 实际上&#xff0c;在stable diffusion 中&#xff0c;你获取发现很多controlnet 其实功能有点类似&…

PHP智慧教育新篇章优校管理系统小程序源码

智慧教育新篇章 —— 优校管理系统 &#x1f680;【开篇启航&#xff1a;智慧教育的浪潮已至】 在这个日新月异的时代&#xff0c;教育也在悄然发生着变革。随着科技的飞速发展&#xff0c;智慧教育已成为教育领域的新风尚。而“优校管理系统”&#xff0c;正是这股浪潮中的佼…

JavaSE高级(3)——lombok、juint单元测试、断言

一、lombok的使用 默认jvm不解析第三方注解&#xff0c;需要手动开启 链式调用 二、juint单元测试 下载juint包 public class TestDemo {// 在每一个单元测试方法执行之前执行Beforepublic void before() {// 例如可以在before部分创建IO流System.out.println("befor…

Python爬虫之requests模块(一)

Python爬虫之requests模块&#xff08;一&#xff09; 学完urllib之后对爬虫应该有一定的了解了&#xff0c;随后就来学习鼎鼎有名的requests模块吧。 一、requests简介。 1、什么是request模块&#xff1f; requests其实就是py原生的一个基于网络请求的模块&#xff0c;模拟…

腾讯 IEG 游戏前沿技术 一面复盘

前言 投了个实习内推后台开发&#xff0c;本来要电话先交流的那天直接走流程下午面试了&#xff0c;对面两人&#xff0c;面了有一个小时&#xff0c;游戏本的构思续航忘记插电了最后还掉线了一下&#xff0c;趁着还记得面试内容复盘一下 自我介绍一下 答&#xff1a; 您好…

2024.9.23 数据分析

数据脱敏&#xff1a;由于一些数据涉及商业、安全等&#xff0c;不方便公开&#xff0c;所以对隐私数据进行有策略的修改、隐藏等&#xff0c;创建一个与原始数据相似但不含真正敏感细节的数据副本&#xff0c;再由于后续的数据分析、开发测试等操作&#xff08;例如用户的姓名…

React基础教程(10):React Hooks

9.1 使用hooks理由 高阶组件为了复用,导致代码层级复杂。生命周期的复杂。写成函数组件,无状态组件,因为需要状态,又写成了class,成本高9.2 useState(保存组件状态) const [state, setState] = useState(initialState);案例:点击按钮修改name