Flink(八):DataStream API (五) Join

news/2025/1/18 5:18:26/

1. Window Join

Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunction,用户可以用它们输出符合 join 要求的结果。常见的用例可以总结为以下代码

stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>);

语义上有一些值得注意的地方:

  • 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
  • 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为 [5, 10) 窗口中的元素在 join 之后的 timestamp 为 9。

1.1 滚动 Window Join

使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。因为这个行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!

如图所示,定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ... 的窗口。图中展示了如何将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction。注意,滚动窗口 [6,7] 将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

1.2 滑动 Window Join

当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!注意,在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。

本例中定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction 的元素。图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3] 中 join,但没有与窗口 [1,2] 中任何元素 join。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

1.3 会话 Window Join

使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。这个操作同样是 inner join,所以如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!

这里我们定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction。而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

2. Interval Join

Interval join 组合元素的条件为:两个流(我们暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内。这个条件可以更加正式地表示为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] 或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里的 a 和 b 为 A 和 B 中共享相同 key 的元素。上界和下界可正可负,只要下界永远小于等于上界即可。 Interval join 目前仅执行 inner join。当一对元素被传递给 ProcessJoinFunction,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context 访问)。Interval join 目前仅支持 event time。

上例中,我们 join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。 默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive() 和 .upperBoundExclusive() 可以将它们排除在外。

图中三角形所表示的条件也可以写成更加正式的表达式:orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String>(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});


http://www.ppmy.cn/news/1564059.html

相关文章

医药新零售的下半场,叮当健康找到增长搭子

2025年&#xff0c;医药新零售已成为人们生活中必不可少的行业。 与之相伴的是&#xff0c;医药新零售正发展到新的阶段&#xff0c;整个行业面临着不小的增长瓶颈。 回顾过去一年&#xff0c;港股医药市场下行压力巨大&#xff0c;场跌幅最大的十只个股股价累计跌幅均在六成…

洛谷 P2392 kkksc03考前临时抱佛脚 刷题笔记 dfs

P2392 kkksc03考前临时抱佛脚 - 洛谷 | 计算机科学教育新生态 题目分析 左右脑双核 当我们给左右脑各自分配一道题时 消耗的时间为两者中耗时较长的一道题 我们尝试把每一道题都分配给左右脑试一试 即可遍历所有答案 关键在于答案怎么取保证耗时最短 if(step>a[x])…

“深入浅出”系列之设计模式篇:(0)什么是设计模式

设计模式六大原则 1. 单一职责原则&#xff1a;一个类或者一个方法只负责一项职责&#xff0c;尽量做到类的只有一个行为原因引起变化。 核心思想&#xff1a;控制类的粒度大小&#xff0c;将对象解耦&#xff0c;提高其内聚性。 2. 开闭原则&#xff1a;对扩展开放&#xf…

力扣解题汇总_JAVA

文章目录 数学_简单13_罗马数字转整数66_ 加一9_回文数70_爬楼梯69_x的平方根509_斐波那契数列2235_两整数相加67_二进制求和415_字符串相加2413_最小偶倍数2469_温度转换704_二分查找(重点) 数组_简单1_两数之和88_合并两个有序数组 链表_简单21_合并两个有序链表203_移除链表…

Flink的优化技巧

前言 在大数据处理领域&#xff0c;Apache Flink以其高吞吐量、低延迟和强大的状态管理能力&#xff0c;成为了实时流处理的首选框架。然而&#xff0c;随着数据量的不断增长和业务复杂性的提高&#xff0c;如何在Flink开发中实施有效的优化方案&#xff0c;成为了一个亟待解决…

23- TIME-LLM: TIME SERIES FORECASTING BY REPRO- GRAMMING LARGE LANGUAGE MODELS

解决问题 用LLM来解决时序预测问题&#xff0c;并且能够将时序数据映射&#xff08;reprogramming&#xff09;为NLP token&#xff0c;并且保持backbone的大模型是不变的。解决了时序序列数据用于大模型训练数据稀疏性的问题。 方法 Input Embedding 输入&#xff1a; X …

wow-agent 学习笔记

wow-agent-课程详情 | Datawhale 前两课比较基础&#xff0c;无笔记 第三课 阅卷智能体这一块&#xff0c;曾经做过一点和AI助教相关的内容&#xff0c;也是用了一个prompt去进行CoT&#xff0c;但是风格和课程中的不太相同&#xff0c;在下面附上我的prompt 你是一名资深教…

2025年01月15日Github流行趋势

1. 项目名称&#xff1a;tabby - 项目地址url&#xff1a;https://github.com/TabbyML/tabby - 项目语言&#xff1a;Rust - 历史star数&#xff1a;25764 - 今日star数&#xff1a;1032 - 项目维护者&#xff1a;wsxiaoys, apps/autofix-ci, icycodes, liangfung, boxbeam - 项…