flink判断两个事件之间有没有超时(不使用CEP)

ops/2025/2/13 3:59:46/

1.为啥不使用cep呢,cep的超时时间设置不好配置化,无法满足扩展要求

2.超时怎么界定。A事件发生后,过了N时间,还没有收到B事件,算超时。

代码如下:


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;@Slf4j
public class AsyncModelTimeoutHandler extends KeyedProcessFunction<String, JSONObject, JSONObject> {private static final long serialVersionUID = -61608451659272532L;private transient ValueState<Long> firstDataTime;private transient ValueState<Long> secondDataTime;private transient ValueState<String> eventType;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Long> firstDataDescriptor = new ValueStateDescriptor<>("firstDataTime", Long.class);firstDataTime = getRuntimeContext().getState(firstDataDescriptor);ValueStateDescriptor<Long> secondDataDescriptor = new ValueStateDescriptor<>("secondDataTime", Long.class);secondDataTime = getRuntimeContext().getState(secondDataDescriptor);ValueStateDescriptor<String> eventTypeDescriptor = new ValueStateDescriptor<>("eventType", String.class);eventType = getRuntimeContext().getState(eventTypeDescriptor);}@Overridepublic void processElement(JSONObject value, KeyedProcessFunction<String, JSONObject, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {Long currentTimestamp = value.getLong("ts");if (value.containsKey("timeout")) {//异步请求消息long timeout = value.getLong("timeout");firstDataTime.update(currentTimestamp + timeout);eventType.update(value.getString("event"));ctx.timerService().registerProcessingTimeTimer(currentTimestamp + timeout);} else {secondDataTime.update(currentTimestamp);}}@Overridepublic void onTimer(long timestamp, KeyedProcessFunction<String, JSONObject, JSONObject>.OnTimerContext ctx, Collector<JSONObject> out) throws Exception {Long firstTime = firstDataTime.value();Long lastTime = secondDataTime.value();if (lastTime == null || (firstTime != null && lastTime >= firstTime)) {//超时了log.info("AsyncModelTimeoutHandler onTimer handle triggerTime={}, firstTime={}, secondTime={},key={}", timestamp, firstTime, lastTime, ctx.getCurrentKey());JSONObject r = new JSONObject();r.put("id", ctx.getCurrentKey());r.put("judgeTime", timestamp);r.put("event", eventType.value());out.collect(r);}firstDataTime.clear();secondDataTime.clear();eventType.clear();}
}

http://www.ppmy.cn/ops/157951.html

相关文章

webstorm 右下角git分支组件不显示如何恢复

1、在上方工具栏点击view 2、选择Appearance 3、选择 Status Bar Widgets 4、勾选Git Branch 目录如下图所示

DeepSeek 关联 Word 使用教程:解锁办公新效率

在当今数字化办公时代&#xff0c;将强大的人工智能模型与常用办公软件相结合&#xff0c;能显著提升工作效率。DeepSeek 作为一款先进的人工智能工具&#xff0c;若能与广泛使用的办公软件 Word 实现关联&#xff0c;可在文档撰写、编辑、内容优化等诸多方面为用户带来极大便利…

《pytorch》——优化器的解析和使用

优化器简介 在 PyTorch 中&#xff0c;优化器&#xff08;Optimizer&#xff09;是用于更新模型参数以最小化损失函数的关键组件。在机器学习和深度学习领域&#xff0c;优化器是一个至关重要的工具&#xff0c;主要用于在模型训练过程中更新模型的参数&#xff0c;其目标是最…

算法跟练第九弹——栈与队列

文章目录 part01 用栈实现队列part02 用队列实现栈part03 有效的括号part04 删除字符串中的所有相邻重复项归纳栈队列 跟着代码随想录刷题的第九天。 代码随想录链接&#xff1a;代码随想录 part01 用栈实现队列 题目链接&#xff1a;232.用栈实现队列 代码&#xff1a; class…

datasets: PyTorch version 2.5.1+cu124 available 这句话是什么意思

这句话的意思是&#xff1a; datasets&#xff1a;可能是 Python datasets 库的日志信息&#xff0c;说明它检测到了 PyTorch 的安装信息。PyTorch version 2.5.1cu124 available&#xff1a; PyTorch version 2.5.1&#xff1a;表示你的 PyTorch 版本是 2.5.1。cu124&#xf…

在大型语言模型(LLM)框架内Transformer架构与混合专家(MoE)策略的概念整合

文章目录 传统的神经网络框架存在的问题一. Transformer架构综述1.1 transformer的输入1.1.1 词向量1.1.2 位置编码&#xff08;Positional Encoding&#xff09;1.1.3 编码器与解码器结构1.1.4 多头自注意力机制 二.Transformer分步详解2.1 传统词向量存在的问题2.2 详解编解码…

Windows逆向工程入门之汇编环境搭建

公开视频 -> 链接点击跳转公开课程博客首页 -> ​​​链接点击跳转博客主页 Visual Studio逆向工程配置 基础环境搭建 Visual Studio 官方下载地址安装配置选项(后期可随时通过VS调整) 使用C的桌面开发 拓展可选选项 MASM汇编框架 配置MASM汇编项目 创建新项目 选择空…

【问题处理】【Mysql】mysqld进程CPU占用高排查思路

一、问题背景 Linux服务器CPU占用极高&#xff0c;经过排查&#xff0c;是mysqld占用了大部分的CPU资源。需要进一步排查是什么原因导致mysqld占用飙升。 因当前并没有大量正在执行的业务&#xff0c;所以初步排除业务量过大导致的Mysql资源飙升。 二、原因 Mysql服务端中可…